|
|
|
|
import os
|
|
|
|
|
import datetime
|
|
|
|
|
import threading
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
|
|
import can # 需要安装python-can库 pip install python-can
|
|
|
|
|
from can import bus
|
|
|
|
|
import geardb
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# def option():
|
|
|
|
|
# global auto_acc, gear_cmd, brk_open, brk_change, stop
|
|
|
|
|
# auto_acc = 0 # 加速踏板行程0~100%
|
|
|
|
|
# brk_open = 100 # 刹车深度0-100%
|
|
|
|
|
# gear_cmd = 4 # 需求档位 1:P 2:R 3:N 4:D
|
|
|
|
|
# brk_change = 0 # 默认为0 变更为1
|
|
|
|
|
# stop = 0 # 紧急停止 默认为0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# can1 = can.interface.Bus(channel = 'can0', bustype = 'socketcan') #python-can 4.0需要用这个
|
|
|
|
|
def update():
|
|
|
|
|
global gear_cmd,auto_acc,brk_open,brk_change,stop
|
|
|
|
|
while True:
|
|
|
|
|
date = geardb.table_quer()
|
|
|
|
|
gear_cmd = int(date[0][1]) # 需求档位 1:P 2:R 3:N 4:D
|
|
|
|
|
auto_acc = int(date[0][2]) # 加速踏板行程0~100%
|
|
|
|
|
brk_open = int(date[0][3]) # 刹车深度0-100%
|
|
|
|
|
brk_change = int(date[0][4]) # 默认为0 变更为1
|
|
|
|
|
stop = int(date[0][5]) # 紧急停止 默认为0
|
|
|
|
|
time.sleep(0.05)
|
|
|
|
|
# 获取当前档位
|
|
|
|
|
def can4b8():
|
|
|
|
|
global gear_act
|
|
|
|
|
|
|
|
|
|
bus.set_filters = [{"can_id": 0x04b8, "can_mask": 0xFFFF, "extended": False}]
|
|
|
|
|
# 连接can1
|
|
|
|
|
|
|
|
|
|
can1 = can.interface.Bus(channel='can1', bustype='socketcan', can_filters=bus.set_filters) # python-can 4.0需要用这个
|
|
|
|
|
|
|
|
|
|
# 一直循环接收数据
|
|
|
|
|
while True:
|
|
|
|
|
msg = can1.recv(3.0) # recv()中定义超时接收时间
|
|
|
|
|
# print(msg)
|
|
|
|
|
can4b81 = str(msg).replace(' ', '')
|
|
|
|
|
can4b8id = can4b81.find('ID')
|
|
|
|
|
can4b8dl = can4b81.find('DL')
|
|
|
|
|
can_id = can4b81[can4b8id + 3:can4b8id + 7] # id
|
|
|
|
|
# print(can_id)
|
|
|
|
|
# print(can4b81)
|
|
|
|
|
# print(can4b8id)
|
|
|
|
|
gear_act0 = can4b81[can4b8dl + 10:can4b8dl + 11] # gear
|
|
|
|
|
a = int(gear_act0)
|
|
|
|
|
if can_id == '04b8' and a > 1:
|
|
|
|
|
|
|
|
|
|
gear_act1 = bin(int(gear_act0))
|
|
|
|
|
gear_act2 = gear_act1[:-1]
|
|
|
|
|
gear_act = int(gear_act2, 2)
|
|
|
|
|
# if gear_act == 1:
|
|
|
|
|
# print('实际档位:为P档')
|
|
|
|
|
# elif gear_act==2:
|
|
|
|
|
# print('实际档位:为R档')
|
|
|
|
|
# elif gear_act==3:
|
|
|
|
|
# print('实际档位:为N档')
|
|
|
|
|
# elif gear_act==4:
|
|
|
|
|
# print('实际档位:为D档')
|
|
|
|
|
# else:
|
|
|
|
|
# print('档位识别失败')
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
print('未获取到实际挡位')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 刹车转换
|
|
|
|
|
def brk_open_conversion(brk_open10):
|
|
|
|
|
global brk_open_low, brk_open_high
|
|
|
|
|
if 100 >= brk_open10 >= 0:
|
|
|
|
|
brk_open = brk_open10 * 100
|
|
|
|
|
brk_open16 = hex(brk_open)
|
|
|
|
|
# print(brk_open16)
|
|
|
|
|
brk_open_all = brk_open16[2:].upper()
|
|
|
|
|
if len(str(brk_open_all)) == 1:
|
|
|
|
|
brk_open_low = "0%s" % brk_open_all
|
|
|
|
|
brk_open_high = '00'
|
|
|
|
|
elif len(str(brk_open_all)) == 2:
|
|
|
|
|
brk_open_low = brk_open_all
|
|
|
|
|
brk_open_high = '00'
|
|
|
|
|
elif len(str(brk_open_all)) == 3:
|
|
|
|
|
brk_open_low = brk_open_all[1: 3]
|
|
|
|
|
brk_open_high = "0%s" % brk_open_all[0: 1]
|
|
|
|
|
elif len(str(brk_open_all)) == 4:
|
|
|
|
|
brk_open_low = brk_open_all[2: 4]
|
|
|
|
|
brk_open_high = brk_open_all[0: 2]
|
|
|
|
|
# print(brk_open_low, brk_open_high)
|
|
|
|
|
else:
|
|
|
|
|
print('刹车深度输入范围0~100')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 油门转换
|
|
|
|
|
def acc_conversion(auto_acc10):
|
|
|
|
|
global auto_acc_all
|
|
|
|
|
if 100 >= auto_acc10 >= 0:
|
|
|
|
|
auto_acc16 = hex(auto_acc10)
|
|
|
|
|
# print(auto_acc16)
|
|
|
|
|
auto_acc_all16 = auto_acc16[2:].upper()
|
|
|
|
|
# print(auto_acc_all16)
|
|
|
|
|
if len(str(auto_acc_all16)) == 1:
|
|
|
|
|
auto_acc_all = "0%s" % auto_acc_all16
|
|
|
|
|
else:
|
|
|
|
|
auto_acc_all = auto_acc_all16
|
|
|
|
|
# print(auto_acc_all)
|
|
|
|
|
else:
|
|
|
|
|
print('加速踏板深度输入范围0~100')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 设置刹车
|
|
|
|
|
def brk_open_option():
|
|
|
|
|
global brk_change
|
|
|
|
|
while True:
|
|
|
|
|
brk_open_conversion(brk_open)
|
|
|
|
|
if brk_change == 1:
|
|
|
|
|
print('cansend can2 121#%s%s010000000000' % (brk_open_low, brk_open_high))
|
|
|
|
|
os.system('cansend can2 121#%s%s010000000000' % (brk_open_low, brk_open_high))
|
|
|
|
|
brk_change = 0
|
|
|
|
|
time.sleep(0.01)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 设置P档
|
|
|
|
|
def gear_P():
|
|
|
|
|
os.system('cansend can0 161#7200000500000000') # 挂p档切自动驾驶模式
|
|
|
|
|
# print('cansend can0 161#7200000500000000') # 挂p档切自动驾驶模式
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 设置N档
|
|
|
|
|
def gear_N():
|
|
|
|
|
os.system('cansend can0 161#7200000000000000') # 挂N档切自动驾驶模式
|
|
|
|
|
# print('cansend can0 161#7200000000000000') # 挂N
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 保持D档
|
|
|
|
|
def gear_keep_D():
|
|
|
|
|
acc_conversion(int(auto_acc))
|
|
|
|
|
os.system('cansend can0 161#72%s000400000000' % auto_acc_all) # 挂D档切自动驾驶模式
|
|
|
|
|
# print('cansend can0 161#72%s000400000000' % auto_acc_all) # 挂D档切自动驾驶模式
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 保持R档
|
|
|
|
|
def gear_keep_R():
|
|
|
|
|
acc_conversion(int(auto_acc))
|
|
|
|
|
os.system('cansend can0 161#72%s000700000000' % auto_acc_all) # 挂R档切自动驾驶模式
|
|
|
|
|
# print('cansend can0 161#72%s000700000000' % auto_acc_all) # 挂R档切自动驾驶模式
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 切换D档
|
|
|
|
|
def gear_D():
|
|
|
|
|
os.system('cansend can2 121#1027010000000000') # 踩刹车
|
|
|
|
|
acc_conversion(int(auto_acc))
|
|
|
|
|
os.system('cansend can0 161#72%s000400000000' % auto_acc_all) # 挂D档切自动驾驶模式
|
|
|
|
|
os.system('cansend can2 121#0000010000000000') # 松刹车
|
|
|
|
|
# print('cansend can0 161#72%s000400000000' % auto_acc_all) # 挂D档切自动驾驶模式
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def gear_R():
|
|
|
|
|
os.system('cansend can2 121#1027010000000000') # 踩刹车
|
|
|
|
|
acc_conversion(int(auto_acc))
|
|
|
|
|
os.system('cansend can0 161#72%s000700000000' % auto_acc_all) # 挂R档切自动驾驶模式
|
|
|
|
|
os.system('cansend can2 121#0000010000000000') # 松刹车
|
|
|
|
|
# print('cansend can0 161#72%s000700000000' % auto_acc_all) # 挂R档切自动驾驶模式
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
while True:
|
|
|
|
|
if stop == 0:
|
|
|
|
|
print('当前档位:',gear_act)
|
|
|
|
|
if gear_cmd == 1:
|
|
|
|
|
if gear_act == 1:
|
|
|
|
|
print('当前已是P档')
|
|
|
|
|
else:
|
|
|
|
|
print('切换P档')
|
|
|
|
|
gear_P()
|
|
|
|
|
elif gear_cmd == 3:
|
|
|
|
|
if gear_act == 3:
|
|
|
|
|
print('当前已是N档')
|
|
|
|
|
else:
|
|
|
|
|
print('切换N档')
|
|
|
|
|
gear_N()
|
|
|
|
|
elif gear_cmd == 4:
|
|
|
|
|
if gear_act == 4:
|
|
|
|
|
print('当前已是D档')
|
|
|
|
|
gear_keep_D()
|
|
|
|
|
else:
|
|
|
|
|
print('切换D档')
|
|
|
|
|
gear_D()
|
|
|
|
|
# os.system('cansend can2 121#0000010000000000') #测试用
|
|
|
|
|
elif gear_cmd == 2:
|
|
|
|
|
if gear_act == 2:
|
|
|
|
|
print('当前已是R档')
|
|
|
|
|
gear_keep_R()
|
|
|
|
|
else:
|
|
|
|
|
print('切换R档')
|
|
|
|
|
gear_R()
|
|
|
|
|
else:
|
|
|
|
|
print('未设定档位')
|
|
|
|
|
else:
|
|
|
|
|
print('紧急停止')
|
|
|
|
|
os.system('cansend can2 121#1027010000000000')
|
|
|
|
|
break
|
|
|
|
|
time.sleep(0.01)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 刹车初始值 100%
|
|
|
|
|
brk_open_low = '10'
|
|
|
|
|
brk_open_high = '27'
|
|
|
|
|
# 初始加速踏板行程0
|
|
|
|
|
auto_acc16 = '00'
|
|
|
|
|
|
|
|
|
|
gear_act = 0 # 实际挡位
|
|
|
|
|
gear_cmd = 4 # 需求档位 1:P 2:R 3:N 4:D
|
|
|
|
|
auto_acc = 0 # 加速踏板行程0~100%
|
|
|
|
|
brk_open = 100 # 刹车深度0-100%
|
|
|
|
|
brk_change = 0 # 默认为0 变更为1
|
|
|
|
|
stop = 0 # 紧急停止 默认为0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# if __name__ == '__main__':
|
|
|
|
|
# # t1 = threading.Thread(target=option)
|
|
|
|
|
# t2 = threading.Thread(target=brk_open_option)
|
|
|
|
|
# t3 = threading.Thread(target=can4b8)
|
|
|
|
|
# t4 = threading.Thread(target=main)
|
|
|
|
|
# t5 = threading.Thread(target=update)
|
|
|
|
|
# # option()
|
|
|
|
|
# # t1.start()
|
|
|
|
|
# t2.start()
|
|
|
|
|
# t3.start()
|
|
|
|
|
# t4.start()
|
|
|
|
|
# t5.start()
|