|
|
import threading
|
|
|
import time
|
|
|
import os
|
|
|
import datetime
|
|
|
import datetime
|
|
|
import turndb
|
|
|
import can #需要安装python-can库 pip install python-can
|
|
|
from can import bus
|
|
|
|
|
|
def update():
|
|
|
global angle_target
|
|
|
while True:
|
|
|
date = turndb.table_quer()
|
|
|
angle_target=int(date[0][1])
|
|
|
time.sleep(0.01)
|
|
|
def add_0x(initial):
|
|
|
global initial16
|
|
|
initial16 = '0x%s' % initial
|
|
|
return initial16
|
|
|
|
|
|
|
|
|
def conversion(data10):
|
|
|
global data16
|
|
|
data100 = hex(data10)[2:].upper()
|
|
|
if len(str(data100)) == 1:
|
|
|
data16 = "0%s" % data100
|
|
|
else:
|
|
|
data16 = data100
|
|
|
return data16
|
|
|
|
|
|
|
|
|
def angle_separation(angle):
|
|
|
global angle_low, angle_high, angle1
|
|
|
angle1 = int(angle / 0.0625)
|
|
|
|
|
|
buma = angle1 & 0xFFFF # 补码
|
|
|
buma16 = hex(buma)
|
|
|
if angle1 >= 0:
|
|
|
fanma = buma
|
|
|
else:
|
|
|
fanma = buma - 1
|
|
|
fanma16 = hex(fanma)
|
|
|
#
|
|
|
angle_all = fanma16[2:].upper()
|
|
|
# angle_all = buma16[2:].upper()
|
|
|
if len(str(angle_all)) == 1:
|
|
|
angle_low = "0%s" % angle_all
|
|
|
angle_high = '00'
|
|
|
elif len(str(angle_all)) == 2:
|
|
|
angle_low = angle_all
|
|
|
angle_high = '00'
|
|
|
elif len(str(angle_all)) == 3:
|
|
|
angle_low = angle_all[1: 3]
|
|
|
angle_high = "0%s" % angle_all[0: 1]
|
|
|
elif len(str(angle_all)) == 4:
|
|
|
angle_low = angle_all[2: 4]
|
|
|
angle_high = angle_all[0: 2]
|
|
|
|
|
|
# print("换算角度:" + str(angle))
|
|
|
# print("总:" + angle_all)
|
|
|
# print("低位:" + angle_low)
|
|
|
# print("高位:" + angle_high)
|
|
|
return angle_low, angle_high, angle1
|
|
|
|
|
|
|
|
|
def can165():
|
|
|
global eps_statue, can_id
|
|
|
bus.set_filters = [{"can_id": 0x0165, "can_mask": 0xFFFF, "extended": False}]
|
|
|
# 连接can1
|
|
|
|
|
|
can1 = can.interface.Bus(channel='can1', bustype='socketcan', can_filters=bus.set_filters) # python-can 4.0需要用这个
|
|
|
# can1 = can.interface.Bus(channel = 'can0', bustype = 'socketcan') #python-can 4.0需要用这个
|
|
|
# 一直循环接收数据
|
|
|
|
|
|
while True:
|
|
|
msg = can1.recv(3.0) # recv()中定义超时接收时间
|
|
|
# print(msg)
|
|
|
can1651 = str(msg).replace(' ', '')
|
|
|
can165id = can1651.find('ID')
|
|
|
can165dl = can1651.find('DL')
|
|
|
can_id = can1651[can165id + 3:can165id + 7] # id
|
|
|
# print(can1651, '\n')
|
|
|
eps_statue = can1651[can165dl + 4:can165dl + 5] # eps状态
|
|
|
eps_check = can1651[can165dl + 6:can165dl + 8] # eps保护算法值
|
|
|
# print('eps状态:', eps_statue)
|
|
|
# return eps_statue, can_id
|
|
|
# time.sleep(0.015)
|
|
|
|
|
|
|
|
|
def can0a5():
|
|
|
global angle_low, angle_high, angle_now
|
|
|
bus.set_filters = [{"can_id": 0x0a5, "can_mask": 0xFFFF, "extended": False}]
|
|
|
# 连接can1
|
|
|
|
|
|
can1 = can.interface.Bus(channel='can1', bustype='socketcan', can_filters=bus.set_filters) # python-can 4.0需要用这个
|
|
|
# can1 = can.interface.Bus(channel = 'can0', bustype = 'socketcan') #python-can 4.0需要用这个
|
|
|
# 一直循环接收数据
|
|
|
|
|
|
while True:
|
|
|
msg = can1.recv(3.0) # recv()中定义超时接收时间
|
|
|
# print(msg)
|
|
|
can0a51 = str(msg).replace(' ', '')
|
|
|
can0a5id = can0a51.find('ID')
|
|
|
can0a5dl = can0a51.find('DL')
|
|
|
can_id = can0a51[can0a5id + 3:can0a5id + 7] # id
|
|
|
# print(can0a51, '\n')
|
|
|
# time.sleep(0.015)
|
|
|
if can_id == '00a5':
|
|
|
# print('接收到转向角报文0a5')
|
|
|
angle16_receive = can0a51[can0a5dl + 4:can0a5dl + 8] # 获取角度信息16进制
|
|
|
# print(angle16_receive)
|
|
|
# 分离角度的高低子节
|
|
|
angle_low = can0a51[can0a5dl + 6:can0a5dl + 8]
|
|
|
angle_high = can0a51[can0a5dl + 4:can0a5dl + 6]
|
|
|
# print(angle_high, angle_low)
|
|
|
'''
|
|
|
转换成10进制
|
|
|
'''
|
|
|
angle10_receive = int(angle16_receive, 16)
|
|
|
if angle10_receive & 0x8000 == 0x8000:
|
|
|
angle10_receive = -(angle10_receive - 1 ^ 0xffff)
|
|
|
angle_now = angle10_receive * 0.1
|
|
|
# print('此时方向盘角度为:%s' % angle_now)
|
|
|
|
|
|
else:
|
|
|
print('未接收到转向角报文0a5')
|
|
|
# return angle_high, angle_low, angle_now
|
|
|
|
|
|
|
|
|
def turn():
|
|
|
global rolling_count
|
|
|
xcag = abs(angle_target - angle_now)
|
|
|
angle_target_temp=angle_target
|
|
|
if xcag >= 13:
|
|
|
print("超出10°")
|
|
|
if angle_target >= angle_now:
|
|
|
while True:
|
|
|
if int(angle_target_temp)!=int(angle_target):
|
|
|
break
|
|
|
elif int(angle_now)<int(angle_target_temp):
|
|
|
# print(i)
|
|
|
changeag = angle_now + chajiao
|
|
|
print("当前角度为:", angle_now)
|
|
|
print("变换角度:", changeag)
|
|
|
print("临时目标角为:", angle_target_temp)
|
|
|
angle_separation(changeag)
|
|
|
# print("变换后角度", angle1)
|
|
|
# print("角度高低字节",angle_high,angle_low)
|
|
|
|
|
|
# 总合
|
|
|
if rolling_count == 16:
|
|
|
rolling_count = 0
|
|
|
rolling_count16 = hex(rolling_count)[2:].upper()
|
|
|
# angle_separation(angle)
|
|
|
# print(angle_low,angle_high)
|
|
|
# 字符串转16进制
|
|
|
rolling_count161 = int('%s%s' % (add_0x(rolling_count16), Angle_request), 16)
|
|
|
angle_high16 = int(add_0x(angle_high), 16)
|
|
|
angle_low16 = int(add_0x(angle_low), 16)
|
|
|
# print("前三字节16进制:", '%s%s' % (add_0x(rolling_count16), Angle_request), add_0x(angle_high),
|
|
|
# add_0x(angle_low))
|
|
|
# print("前三字节10进制:", rolling_count161, angle_high16, angle_low16)
|
|
|
|
|
|
# 按位异或 10进制
|
|
|
protection_value10 = rolling_count161 ^ angle_high16 ^ angle_low16
|
|
|
# print("校验值10进制:",protection_value10)
|
|
|
# 高低子节转换
|
|
|
protection_value16 = conversion(protection_value10)
|
|
|
# print("校验值16进制:", protection_value16)
|
|
|
print(
|
|
|
'cansend can1 160#%s%s%s%s%s00000000' % (rolling_count16, Angle_request, angle_high, angle_low,
|
|
|
protection_value16))
|
|
|
os.system(
|
|
|
'cansend can1 160#%s%s%s%s%s00000000' % (rolling_count16, Angle_request, angle_high, angle_low,
|
|
|
protection_value16))
|
|
|
print('\n')
|
|
|
rolling_count = rolling_count + 1
|
|
|
time.sleep(0.015)
|
|
|
else:
|
|
|
break
|
|
|
|
|
|
else:
|
|
|
while True:
|
|
|
if int(angle_target_temp) != int(angle_target):
|
|
|
break
|
|
|
elif int(angle_now)>int(angle_target_temp):
|
|
|
# print(i)
|
|
|
changeag = angle_now - chajiao
|
|
|
print("当前角度为:", angle_now)
|
|
|
print("变换角度:", changeag)
|
|
|
print("临时目标角为:", angle_target_temp)
|
|
|
angle_separation(changeag)
|
|
|
# print("变换后角度", angle1)
|
|
|
# print("角度高低字节",angle_high,angle_low)
|
|
|
|
|
|
# 总合
|
|
|
if rolling_count == 16:
|
|
|
rolling_count = 0
|
|
|
rolling_count16 = hex(rolling_count)[2:].upper()
|
|
|
# angle_separation(angle)
|
|
|
# print(angle_low,angle_high)
|
|
|
# 字符串转16进制
|
|
|
rolling_count161 = int('%s%s' % (add_0x(rolling_count16), Angle_request), 16)
|
|
|
angle_high16 = int(add_0x(angle_high), 16)
|
|
|
angle_low16 = int(add_0x(angle_low), 16)
|
|
|
# print("前三字节16进制:", '%s%s' % (add_0x(rolling_count16), Angle_request), add_0x(angle_high),
|
|
|
# add_0x(angle_low))
|
|
|
# print("前三字节10进制:", rolling_count161, angle_high16, angle_low16)
|
|
|
|
|
|
# 按位异或 10进制
|
|
|
protection_value10 = rolling_count161 ^ angle_high16 ^ angle_low16
|
|
|
# print("校验值10进制:",protection_value10)
|
|
|
# 高低子节转换
|
|
|
protection_value16 = conversion(protection_value10)
|
|
|
# print("校验值16进制:", protection_value16)
|
|
|
print(
|
|
|
'cansend can1 160#%s%s%s%s%s00000000' % (rolling_count16, Angle_request, angle_high, angle_low,
|
|
|
protection_value16))
|
|
|
os.system(
|
|
|
'cansend can1 160#%s%s%s%s%s00000000' % (rolling_count16, Angle_request, angle_high, angle_low,
|
|
|
protection_value16))
|
|
|
print('\n')
|
|
|
rolling_count = rolling_count + 1
|
|
|
time.sleep(0.015)
|
|
|
else:
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
changeag = angle_target_temp
|
|
|
print("当前角度为:", angle_now)
|
|
|
print("变换角度:", changeag)
|
|
|
print('临时目标角为:',angle_target_temp)
|
|
|
# print(changeag)
|
|
|
angle_separation(changeag)
|
|
|
# print("变换后角度", angle1)
|
|
|
# print(angle_low,angle_high)
|
|
|
# 总合
|
|
|
if rolling_count == 16:
|
|
|
rolling_count = 0
|
|
|
rolling_count16 = hex(rolling_count)[2:].upper()
|
|
|
# angle_separation(angle)
|
|
|
# print(angle_low,angle_high)
|
|
|
# 字符串转16进制
|
|
|
rolling_count161 = int('%s%s' % (add_0x(rolling_count16), Angle_request), 16)
|
|
|
angle_high16 = int(add_0x(angle_high), 16)
|
|
|
angle_low16 = int(add_0x(angle_low), 16)
|
|
|
# print("前三字节16进制:", '%s%s' % (add_0x(rolling_count16), Angle_request), add_0x(angle_high), add_0x(angle_low))
|
|
|
# print("前三字节10进制:", rolling_count161, angle_high16, angle_low16)
|
|
|
|
|
|
# 按位异或 10进制
|
|
|
protection_value10 = rolling_count161 ^ angle_high16 ^ angle_low16
|
|
|
# print("校验值10进制:",protection_value10)
|
|
|
# 高低子节转换
|
|
|
protection_value16 = conversion(protection_value10)
|
|
|
# print("校验值16进制:", protection_value16)
|
|
|
|
|
|
print('cansend can1 160#%s%s%s%s%s00000000' % (rolling_count16, Angle_request, angle_high, angle_low,
|
|
|
protection_value16))
|
|
|
os.system('cansend can1 160#%s%s%s%s%s00000000' % (rolling_count16, Angle_request, angle_high, angle_low,
|
|
|
protection_value16))
|
|
|
print('\n')
|
|
|
rolling_count = rolling_count + 1
|
|
|
time.sleep(0.015)
|
|
|
|
|
|
|
|
|
def turn_keep():
|
|
|
global rolling_count
|
|
|
angle_separation(angle_now)
|
|
|
if rolling_count == 16:
|
|
|
rolling_count = 0
|
|
|
rolling_count16 = hex(rolling_count)[2:].upper()
|
|
|
# angle_separation(angle)
|
|
|
# print(angle_low,angle_high)
|
|
|
# 字符串转16进制
|
|
|
rolling_count161 = int('%s%s' % (add_0x(rolling_count16), 0), 16)
|
|
|
angle_high16 = int(add_0x(angle_high), 16)
|
|
|
angle_low16 = int(add_0x(angle_low), 16)
|
|
|
print("前三字节16进制:", '%s%s' % (add_0x(rolling_count16), 0), add_0x(angle_high),
|
|
|
add_0x(angle_low))
|
|
|
# print("前三字节10进制:", rolling_count161, angle_high16, angle_low16)
|
|
|
|
|
|
# 按位异或 10进制
|
|
|
protection_value10 = rolling_count161 ^ angle_high16 ^ angle_low16
|
|
|
# print("校验值10进制:",protection_value10)
|
|
|
# 高低子节转换
|
|
|
protection_value16 = conversion(protection_value10)
|
|
|
print("校验值16进制:", protection_value16)
|
|
|
print('cansend can1 160#%s0%s%s%s00000000' % (rolling_count16, angle_high, angle_low,
|
|
|
protection_value16))
|
|
|
os.system('cansend can1 160#%s0%s%s%s00000000' % (rolling_count16, angle_high, angle_low,
|
|
|
protection_value16))
|
|
|
print('\n')
|
|
|
rolling_count = rolling_count + 1
|
|
|
time.sleep(0.015)
|
|
|
def turn_keep2():
|
|
|
global rolling_count
|
|
|
angle_separation(angle_now)
|
|
|
if rolling_count == 16:
|
|
|
rolling_count = 0
|
|
|
rolling_count16 = hex(rolling_count)[2:].upper()
|
|
|
# angle_separation(angle)
|
|
|
# print(angle_low,angle_high)
|
|
|
# 字符串转16进制
|
|
|
rolling_count161 = int('%s%s' % (add_0x(rolling_count16), Angle_request), 16)
|
|
|
angle_high16 = int(add_0x(angle_high), 16)
|
|
|
angle_low16 = int(add_0x(angle_low), 16)
|
|
|
# print("前三字节16进制:", '%s%s' % (add_0x(rolling_count16), Angle_request), add_0x(angle_high),
|
|
|
# add_0x(angle_low))
|
|
|
# print("前三字节10进制:", rolling_count161, angle_high16, angle_low16)
|
|
|
|
|
|
# 按位异或 10进制
|
|
|
protection_value10 = rolling_count161 ^ angle_high16 ^ angle_low16
|
|
|
# print("校验值10进制:",protection_value10)
|
|
|
# 高低子节转换
|
|
|
protection_value16 = conversion(protection_value10)
|
|
|
# print("校验值16进制:", protection_value16)
|
|
|
print('cansend can1 160#%s1%s%s%s00000000' % (rolling_count16, angle_high, angle_low,
|
|
|
protection_value16))
|
|
|
os.system('cansend can1 160#%s1%s%s%s00000000' % (rolling_count16, angle_high, angle_low,
|
|
|
protection_value16))
|
|
|
print('\n')
|
|
|
rolling_count = rolling_count + 1
|
|
|
time.sleep(0.015)
|
|
|
|
|
|
def main():
|
|
|
while True:
|
|
|
global can_id
|
|
|
#can165()
|
|
|
if can_id == '0165':
|
|
|
'''
|
|
|
eps_statue=c 驾驶员未干预方向盘;驾驶员干预方向盘检测有效;eps永久失效
|
|
|
eps_statue=0 驾驶员未干预方向盘;驾驶员干预方向盘检测有效;eps暂时不可控
|
|
|
eps_statue=4 驾驶员未干预方向盘;驾驶员干预方向盘检测有效;eps可控
|
|
|
eps_statue=8 驾驶员未干预方向盘;驾驶员干预方向盘检测有效;eps正处于被控
|
|
|
|
|
|
eps_statue=d 驾驶员干预方向盘;驾驶员干预方向盘检测有效;eps永久失效
|
|
|
eps_statue=0 驾驶员干预方向盘;驾驶员干预方向盘检测有效;eps暂时不可控
|
|
|
eps_statue=5 驾驶员干预方向盘;驾驶员干预方向盘检测有效;eps可控
|
|
|
eps_statue=9 驾驶员干预方向盘;驾驶员干预方向盘检测有效;eps正处于被控
|
|
|
'''
|
|
|
# print('接收到EPS报文:0165')
|
|
|
|
|
|
if eps_statue == '0':
|
|
|
print("eps暂时不可控")
|
|
|
# can0a5()
|
|
|
print("当前角度为:", angle_now)
|
|
|
turn_keep()
|
|
|
|
|
|
elif eps_statue == '4':
|
|
|
print("当前角度为:", angle_now)
|
|
|
print("eps可控")
|
|
|
turn_keep2()
|
|
|
elif eps_statue == '8':
|
|
|
print('eps正处于被控')
|
|
|
# update()
|
|
|
if angle_now == angle_target:
|
|
|
print('目标角度和当前角度相同',angle_now)
|
|
|
turn_keep2()
|
|
|
else:
|
|
|
# can0a5()
|
|
|
turn()
|
|
|
print("当前角度为:",angle_now)
|
|
|
elif eps_statue == 'c':
|
|
|
kk=1
|
|
|
print('eps永久失效,请重启')
|
|
|
else:
|
|
|
print('eps正处于其他状态详见报文:', eps_statue)
|
|
|
else:
|
|
|
print('未接收到165报文')
|
|
|
|
|
|
|
|
|
rolling_count = 0
|
|
|
angle_high = '00'
|
|
|
angle_low = '00'
|
|
|
# protection_value=10
|
|
|
# angle=-2047.9375 #调整转向角
|
|
|
# angle = 30 # 调整转向角
|
|
|
angle_target = 0
|
|
|
angle_now = 0 # 当前
|
|
|
# angle1=2047.9375
|
|
|
Angle_request = 1
|
|
|
eps_statue = 0
|
|
|
chajiao=7.9
|
|
|
can_id ='0165'
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
#创建线程
|
|
|
t1 = threading.Thread(target=can165) # 读取eps状态
|
|
|
t2 = threading.Thread(target=can0a5) # 读取目前方向盘角度
|
|
|
t3 = threading.Thread(target=main)
|
|
|
t4=threading.Thread(target=update)
|
|
|
#启动线程
|
|
|
t1.start()
|
|
|
t2.start()
|
|
|
t3.start()
|
|
|
t4.start()
|