import os import datetime import threading import time import can # 需要安装python-can库 pip install python-can from can import bus import geardb # def option(): # global auto_acc, gear_cmd, brk_open, brk_change, stop # auto_acc = 0 # 加速踏板行程0~100% # brk_open = 100 # 刹车深度0-100% # gear_cmd = 4 # 需求档位 1:P 2:R 3:N 4:D # brk_change = 0 # 默认为0 变更为1 # stop = 0 # 紧急停止 默认为0 # can1 = can.interface.Bus(channel = 'can0', bustype = 'socketcan') #python-can 4.0需要用这个 def update(): global gear_cmd,auto_acc,brk_open,brk_change,stop while True: date = geardb.table_quer() gear_cmd = int(date[0][1]) # 需求档位 1:P 2:R 3:N 4:D auto_acc = int(date[0][2]) # 加速踏板行程0~100% brk_open = int(date[0][3]) # 刹车深度0-100% brk_change = int(date[0][4]) # 默认为0 变更为1 stop = int(date[0][5]) # 紧急停止 默认为0 time.sleep(0.05) # 获取当前档位 def can4b8(): global gear_act bus.set_filters = [{"can_id": 0x04b8, "can_mask": 0xFFFF, "extended": False}] # 连接can1 can1 = can.interface.Bus(channel='can1', bustype='socketcan', can_filters=bus.set_filters) # python-can 4.0需要用这个 # 一直循环接收数据 while True: msg = can1.recv(3.0) # recv()中定义超时接收时间 # print(msg) can4b81 = str(msg).replace(' ', '') can4b8id = can4b81.find('ID') can4b8dl = can4b81.find('DL') can_id = can4b81[can4b8id + 3:can4b8id + 7] # id # print(can_id) # print(can4b81) # print(can4b8id) gear_act0 = can4b81[can4b8dl + 10:can4b8dl + 11] # gear a = int(gear_act0) if can_id == '04b8' and a > 1: gear_act1 = bin(int(gear_act0)) gear_act2 = gear_act1[:-1] gear_act = int(gear_act2, 2) # if gear_act == 1: # print('实际档位:为P档') # elif gear_act==2: # print('实际档位:为R档') # elif gear_act==3: # print('实际档位:为N档') # elif gear_act==4: # print('实际档位:为D档') # else: # print('档位识别失败') else: print('未获取到实际挡位') # 刹车转换 def brk_open_conversion(brk_open10): global brk_open_low, brk_open_high if 100 >= brk_open10 >= 0: brk_open = brk_open10 * 100 brk_open16 = hex(brk_open) # print(brk_open16) brk_open_all = brk_open16[2:].upper() if len(str(brk_open_all)) == 1: brk_open_low = "0%s" % brk_open_all brk_open_high = '00' elif len(str(brk_open_all)) == 2: brk_open_low = brk_open_all brk_open_high = '00' elif len(str(brk_open_all)) == 3: brk_open_low = brk_open_all[1: 3] brk_open_high = "0%s" % brk_open_all[0: 1] elif len(str(brk_open_all)) == 4: brk_open_low = brk_open_all[2: 4] brk_open_high = brk_open_all[0: 2] # print(brk_open_low, brk_open_high) else: print('刹车深度输入范围0~100') # 油门转换 def acc_conversion(auto_acc10): global auto_acc_all if 100 >= auto_acc10 >= 0: auto_acc16 = hex(auto_acc10) # print(auto_acc16) auto_acc_all16 = auto_acc16[2:].upper() # print(auto_acc_all16) if len(str(auto_acc_all16)) == 1: auto_acc_all = "0%s" % auto_acc_all16 else: auto_acc_all = auto_acc_all16 # print(auto_acc_all) else: print('加速踏板深度输入范围0~100') # 设置刹车 def brk_open_option(): global brk_change while True: brk_open_conversion(brk_open) if brk_change == 1: print('cansend can2 121#%s%s010000000000' % (brk_open_low, brk_open_high)) os.system('cansend can2 121#%s%s010000000000' % (brk_open_low, brk_open_high)) brk_change = 0 time.sleep(0.01) # 设置P档 def gear_P(): os.system('cansend can0 161#7200000500000000') # 挂p档切自动驾驶模式 # print('cansend can0 161#7200000500000000') # 挂p档切自动驾驶模式 # 设置N档 def gear_N(): os.system('cansend can0 161#7200000000000000') # 挂N档切自动驾驶模式 # print('cansend can0 161#7200000000000000') # 挂N # 保持D档 def gear_keep_D(): acc_conversion(int(auto_acc)) os.system('cansend can0 161#72%s000400000000' % auto_acc_all) # 挂D档切自动驾驶模式 # print('cansend can0 161#72%s000400000000' % auto_acc_all) # 挂D档切自动驾驶模式 # 保持R档 def gear_keep_R(): acc_conversion(int(auto_acc)) os.system('cansend can0 161#72%s000700000000' % auto_acc_all) # 挂R档切自动驾驶模式 # print('cansend can0 161#72%s000700000000' % auto_acc_all) # 挂R档切自动驾驶模式 # 切换D档 def gear_D(): os.system('cansend can2 121#1027010000000000') # 踩刹车 acc_conversion(int(auto_acc)) os.system('cansend can0 161#72%s000400000000' % auto_acc_all) # 挂D档切自动驾驶模式 os.system('cansend can2 121#0000010000000000') # 松刹车 # print('cansend can0 161#72%s000400000000' % auto_acc_all) # 挂D档切自动驾驶模式 def gear_R(): os.system('cansend can2 121#1027010000000000') # 踩刹车 acc_conversion(int(auto_acc)) os.system('cansend can0 161#72%s000700000000' % auto_acc_all) # 挂R档切自动驾驶模式 os.system('cansend can2 121#0000010000000000') # 松刹车 # print('cansend can0 161#72%s000700000000' % auto_acc_all) # 挂R档切自动驾驶模式 def main(): while True: if stop == 0: print('当前档位:',gear_act) if gear_cmd == 1: if gear_act == 1: print('当前已是P档') else: print('切换P档') gear_P() elif gear_cmd == 3: if gear_act == 3: print('当前已是N档') else: print('切换N档') gear_N() elif gear_cmd == 4: if gear_act == 4: print('当前已是D档') gear_keep_D() else: print('切换D档') gear_D() # os.system('cansend can2 121#0000010000000000') #测试用 elif gear_cmd == 2: if gear_act == 2: print('当前已是R档') gear_keep_R() else: print('切换R档') gear_R() else: print('未设定档位') else: print('紧急停止') os.system('cansend can2 121#1027010000000000') break time.sleep(0.01) # 刹车初始值 100% brk_open_low = '10' brk_open_high = '27' # 初始加速踏板行程0 auto_acc16 = '00' gear_act = 0 # 实际挡位 gear_cmd = 4 # 需求档位 1:P 2:R 3:N 4:D auto_acc = 0 # 加速踏板行程0~100% brk_open = 100 # 刹车深度0-100% brk_change = 0 # 默认为0 变更为1 stop = 0 # 紧急停止 默认为0 # if __name__ == '__main__': # # t1 = threading.Thread(target=option) # t2 = threading.Thread(target=brk_open_option) # t3 = threading.Thread(target=can4b8) # t4 = threading.Thread(target=main) # t5 = threading.Thread(target=update) # # option() # # t1.start() # t2.start() # t3.start() # t4.start() # t5.start()