133 lines
3.8 KiB
Python

from serial import Serial
# from PyCRC.CRCCCITT import CRCCCITT
import time
import struct
def lo8(b): return b & 0xff
def hi8(b): return (b >> 8) & 0xff
def crc16(buf):
'''
uint16_t
crc_ccitt_update (uint16_t crc, uint8_t data)
{
data ^= lo8 (crc);
data ^= data << 4;
return ((((uint16_t)data << 8) | hi8 (crc)) ^ (uint8_t)(data >> 4)
^ ((uint16_t)data << 3));
}
'''
crc = 0xffff
for data in buf:
data ^= (crc) & 0xff
data ^= (data << 4) & 0xff
crc = (((data << 8) | (crc >> 8) & 0xff) ^ (data >> 4) ^ (data << 3)) & 0xffff
return crc
HDLC_START = b'\x7e'
HDLC_ESC = b'\x7d'
class UBus(Serial):
def _send(self, bar):
self.write(HDLC_START)
checksum = crc16(bar)
# CRCCCITT("FFFF").calculate(bar)
for b in bar + struct.pack('<H', checksum):
if bytes([b]) in [HDLC_START, HDLC_ESC]:
self.write(HDLC_ESC)
self.write(bytes([b ^ 0x20]))
else:
self.write(bytes([b]))
self.write(HDLC_START)
self.flush()
def request(self, target, msg_type, payload=bytearray(), retry=0):
for n in range(retry + 1):
try:
self._send(bytearray([target, msg_type]) + bytearray(payload))
return self.read_message()[2:]
except TimeoutError as exc:
if n == retry:
raise exc
def request_broadcast(self, msg_type, payload=bytearray()):
self._send(bytearray([0xff, msg_type]) + bytearray(payload))
responses = []
while True:
try:
yield self.read_message()
except TimeoutError:
return
buf = bytearray()
escape = False
def read_message(self):
res = None
while res is None:
res = self._read()
return res
def _read(self):
b = self.read(1)
if not b:
raise TimeoutError
if b == HDLC_ESC:
self.escape = True
return None
if b == HDLC_START:
res = None
if len(self.buf) >= 2:
if crc16(self.buf) == 0:
res = self.buf[:-2]
else:
print('INVALID CRC:', self.buf, crc16(self.buf))
self.escape = False
self.buf = bytearray()
return res
self.buf.append(ord(b) ^ 0x20 if self.escape else ord(b))
self.escape = False
if __name__ == '__main__':
# bus = UBus('/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0', 115200)
bus = UBus('/dev/serial/by-id/usb-1a86_USB2.0-Ser_-if00-port0', 115200, timeout=0.1)
# bus = UBus('/dev/serial/by-id/usb-FTDI_TTL232R_FTF48YOZ-if00-port0', 115200)
while True:
#try:
# print(bus.request(0x01,0x01))
#except Exception as exc:
# print(repr(exc))
#time.sleep(1.0)
#for m in list(bus.request_broadcast(0x00)):
# print(m)
#print(list(bus.request_broadcast(0x00))) # , bytearray([13]))))
#print(list(bus.request_broadcast(0x11, bytearray([13]))))
try:
#print(list(bus.request_broadcast(0x00))) # , bytearray([13]))))
resp = bus.request(0x04, 0x11, [9, 10,11,12])
print(resp)
colors = sum([[255, 0, 0] if b else [0, 0, 0] for b in resp], [])
print(colors)
bus.request(0x02, 0x12, colors) # [32] * 24, 3) # [0,0,0,0,0,0,0,0,0,255,0,255,255,255,0,0,255,255])
except TimeoutError as exc:
print(repr(exc))
#print('bep')
#bus._send(bytearray([0xff, 0x00]))
#time.sleep(0.1)
#while bus.inWaiting():
# frame = bus._read()
# if frame:
# print(frame)