Skip to content
Snippets Groups Projects
Commit f1d0185e authored by Charles Javerliat's avatar Charles Javerliat
Browse files

Change error handling to use exceptions

parent 938b1f7a
No related branches found
No related tags found
No related merge requests found
...@@ -4,76 +4,83 @@ ESC_FLAG = 0x7d ...@@ -4,76 +4,83 @@ ESC_FLAG = 0x7d
SOF_FLAG = 0x7e SOF_FLAG = 0x7e
EOF_FLAG = 0x7f EOF_FLAG = 0x7f
ERR_MISSING_SOF_DELIMITER = 1
ERR_MISSING_EOF_DELIMITER = 2
ERR_CORRUPTED_DATA = 3
ERR_TOO_LARGE = 4 def frame_data(data: bytearray):
def frame_data(data: bytearray, dst: bytearray):
"""Frame data in the dst buffer """Frame data in the dst buffer
Frame data in the dst buffer under the pattern |SOF_FLAG|dataSize|data|crc16|EOF_FLAG|. Frame data in the dst buffer under the pattern |SOF_FLAG|dataSize|data|crc16|EOF_FLAG|.
:param data: Byte buffer containing the data to frame. :param data: Byte buffer containing the data to frame.
:param dst: Destination byte buffer in which the framed data will be stored. :return: Byte array containing the framed data
""" """
dst.clear() framed_data = bytearray()
if len(data) > 65535: if len(data) > 65535:
print("Data too large, 65535 maximum.") raise Exception("Data is too large, maximum length supported is 65535 bytes.")
return ERR_TOO_LARGE
crc = crc16(data) crc = crc16(data)
dst.append(SOF_FLAG) framed_data.append(SOF_FLAG)
dst.append(len(data) >> 8 & 0xFF) framed_data.append(len(data) >> 8 & 0xFF)
dst.append(len(data) & 0xFF) framed_data.append(len(data) & 0xFF)
# Pack the data, escape any flag in the data (SOF, EOF, ESC)
for d in data: for d in data:
if d == EOF_FLAG or d == SOF_FLAG or d == ESC_FLAG: if d == EOF_FLAG or d == SOF_FLAG or d == ESC_FLAG:
dst.append(ESC_FLAG) framed_data.append(ESC_FLAG)
dst.append(d ^ 0x20) framed_data.append(d ^ 0x20)
else: else:
dst.append(d) framed_data.append(d)
dst.append(crc >> 8 & 0xFF) framed_data.append(crc >> 8 & 0xFF)
dst.append(crc & 0xFF) framed_data.append(crc & 0xFF)
dst.append(EOF_FLAG) framed_data.append(EOF_FLAG)
return 0 return framed_data
def unframe_data(framed_data: bytearray, dst: bytearray): def unframe_data(framed_data: bytearray):
"""Unframe data from the framedData buffer """Unframe data from the framedData buffer
Unframe the data from the frameData buffer, expecting data under the pattern |SOF_FLAG|dataSize|data|crc16|EOF_FLAG|. Unframe the data from the frameData buffer, expecting data under the pattern |SOF_FLAG|dataSize|data|crc16|EOF_FLAG|.
:param framed_data: Framed data byte buffer (should follow the pattern |SOF_FLAG|dataSize|data|crc16|EOF_FLAG|). :param framed_data: Framed data byte buffer (should follow the pattern |SOF_FLAG|dataSize|data|crc16|EOF_FLAG|).
:param dst: Destination byte buffer in which the unframed data will be stored. :param dst: Destination byte buffer in which the unframed data will be stored.
:return: Result code, either ERR_MISSING_SOF_DELIMITER, ERR_MISSING_EOF_DELIMITER or ERR_CORRUPTED_DATA . :return: Byte array containing the unframed data.
""" """
dst.clear() unframed_data = bytearray()
if len(framed_data) == 0 or framed_data[0] != SOF_FLAG: # The frame must be at least 6 bytes long:
return ERR_MISSING_SOF_DELIMITER # SOF (1byte) + Size (2bytes) + Data (0bytes at least) + CRC (2bytes) + EOF (1byte)
if len(framed_data) < 6:
raise Exception(
"Wrong frame size. Should be at least 6 bytes. Check the frame format to know more about this issue.")
if framed_data[0] != SOF_FLAG:
raise Exception("Missing start of frame delimiter (SOF_FLAG).")
size = int.from_bytes(framed_data[1:3], byteorder="big") size = int.from_bytes(framed_data[1:3], byteorder="big")
if len(framed_data) < 6 + size:
raise Exception(
"Missing data: expecting {} but got {} bytes of data instead.".format(size, len(framed_data) - 6))
# Unpack the data, un-escape any escaped character
for i in range(3, 3 + size): for i in range(3, 3 + size):
if framed_data[i] == ESC_FLAG: if framed_data[i] == ESC_FLAG:
dst.append(framed_data[i + 1] ^ 0x20) unframed_data.append(framed_data[i + 1] ^ 0x20)
i += 1 i += 1
else: else:
dst.append(framed_data[i]) unframed_data.append(framed_data[i])
crc = framed_data[3 + size:3 + size + 2] crc = int.from_bytes(framed_data[3 + size:3 + size + 2], byteorder="big")
new_crc = crc16(unframed_data)
if crc != crc16(dst): # Recompute the cyclic redundancy check
return ERR_CORRUPTED_DATA if crc != new_crc:
raise Exception("Corrupted data. The cyclic redundancy check doesn't match with frame data.")
if framed_data[3 + size + 3] != EOF_FLAG: if framed_data[3 + size + 2] != EOF_FLAG:
return ERR_MISSING_EOF_DELIMITER raise Exception("Missing end of frame delimiter (EOF_FLAG).")
return 0 return unframed_data
...@@ -5,10 +5,8 @@ from sfp.sfp import frame_data, unframe_data ...@@ -5,10 +5,8 @@ from sfp.sfp import frame_data, unframe_data
arr = bytearray() arr = bytearray()
arr.extend([0x12, 0x34, 0x56]) arr.extend([0x12, 0x34, 0x56])
framed_data = bytearray() framed_data = frame_data(arr)
frame_data(arr, framed_data)
print(binascii.hexlify(framed_data)) print(binascii.hexlify(framed_data))
unframed_data = bytearray() unframed_data = unframe_data(framed_data)
unframe_data(framed_data, unframed_data)
print(binascii.hexlify(unframed_data)) print(binascii.hexlify(unframed_data))
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment