diff --git a/docs/reference/can.rst b/docs/reference/can.rst index 817163f78..84d6b1de1 100644 --- a/docs/reference/can.rst +++ b/docs/reference/can.rst @@ -290,6 +290,22 @@ byte length. | Status) | | | | +------------+--------------+----------------+-------------------------------------------------------------------------+ +Exceptions +~~~~~~~~~~ + +Exception Hierarchy +""""""""""""""""""" +.. currentmodule:: mil_usb_to_can.sub9 + +.. exception_hierarchy:: + + - :exc:`ChecksumException` + +Exception List +""""""""""""""""""" +.. autoclass:: mil_usb_to_can.sub9.ChecksumException + :members: + CANDeviceHandle ~~~~~~~~~~~~~~~ .. attributetable:: mil_usb_to_can.sub9.CANDeviceHandle diff --git a/mil_common/drivers/mil_usb_to_can/mil_usb_to_can/sub9/__init__.py b/mil_common/drivers/mil_usb_to_can/mil_usb_to_can/sub9/__init__.py index 2501f364e..fcfe4bd5c 100644 --- a/mil_common/drivers/mil_usb_to_can/mil_usb_to_can/sub9/__init__.py +++ b/mil_common/drivers/mil_usb_to_can/mil_usb_to_can/sub9/__init__.py @@ -1,2 +1,2 @@ from .device import CANDeviceHandle, SimulatedCANDeviceHandle -from .packet import AckPacket, NackPacket, Packet +from .packet import AckPacket, ChecksumException, NackPacket, Packet diff --git a/mil_common/drivers/mil_usb_to_can/mil_usb_to_can/sub9/packet.py b/mil_common/drivers/mil_usb_to_can/mil_usb_to_can/sub9/packet.py index 8b123bfc9..500aa0706 100644 --- a/mil_common/drivers/mil_usb_to_can/mil_usb_to_can/sub9/packet.py +++ b/mil_common/drivers/mil_usb_to_can/mil_usb_to_can/sub9/packet.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from enum import Enum from functools import lru_cache -from typing import ClassVar, get_type_hints +from typing import ClassVar, TypeVar, get_type_hints SYNC_CHAR_1 = 0x37 SYNC_CHAR_2 = 0x01 @@ -12,6 +12,9 @@ _packet_registry: dict[int, dict[int, type[Packet]]] = {} +PacketSelf = TypeVar("PacketSelf", bound="Packet") + + def hexify(data: bytes) -> str: return ":".join(f"{c:02x}" for c in data) @@ -22,12 +25,22 @@ def get_cache_hints(cls): class ChecksumException(OSError): + """ + An invalid checksum appeared. + """ + def __init__( self, packet: type[Packet], received: tuple[int, int], expected: tuple[int, int], ): + """ + Attributes: + packet (Type[:class:`~.Packet`]): The packet with the invalid checksum. + received (Tuple[int, int]): The received Fletcher's checksum. + expected (Tuple[int, int]): The expected Fletcher's checksum. + """ super().__init__( f"Invalid checksum in packet of type {packet.__qualname__}: received {received}, expected {expected}", ) @@ -129,12 +142,18 @@ def __bytes__(self): return data + struct.pack(" Packet: + def from_bytes(cls: type[PacketSelf], packed: bytes) -> PacketSelf: """ Constructs a packet from a packed packet in a :class:`bytes` object. If a packet is found with the corresponding message and subclass ID, - then an instance of that packet class will be returned, else :class:`Packet` - will be returned. + then an instance (or subclass) of that packet class will be returned. + + Raises: + ChecksumException: The checksum is invalid. + LookupError: No packet with the specified class and subclass IDs exist. + + Returns: + An instance of the appropriate packet subclass. """ msg_id = packed[2] subclass_id = packed[3] @@ -150,7 +169,12 @@ def from_bytes(cls, packed: bytes) -> Packet: cls._calculate_checksum(packed[2:-2]), ) unpacked = struct.unpack(subclass.payload_format, payload) - return subclass(*unpacked) + packet = subclass(*unpacked) + if not isinstance(packet, cls): + raise RuntimeError( + f"Attempted to resolve packet of type {cls.__qualname__}, but found {packet.__class__.__qualname__} for bytes: {hexify(payload)}", + ) + return packet raise LookupError( f"Attempted to reconstruct packet with msg_id 0x{msg_id:02x} and subclass_id 0x{subclass_id:02x}, but no packet with IDs was found.", ) diff --git a/mil_common/drivers/mil_usb_to_can/test/test_packets_sub9.py b/mil_common/drivers/mil_usb_to_can/test/test_packets_sub9.py index ba20cd34b..7d1509800 100755 --- a/mil_common/drivers/mil_usb_to_can/test/test_packets_sub9.py +++ b/mil_common/drivers/mil_usb_to_can/test/test_packets_sub9.py @@ -15,6 +15,11 @@ class TestPacket(Packet, msg_id=0x47, subclass_id=0x44, payload_format="?Hd"): example_float: float +@dataclass +class TestPacketTwo(Packet, msg_id=0x47, subclass_id=0x45, payload_format=""): + pass + + class BasicApplicationPacketTest(unittest.IsolatedAsyncioTestCase): """ Tests basic application packet functionality. @@ -43,6 +48,12 @@ def test_format(self): TestPacket.from_bytes(TestPacket.__bytes__(packet)), packet, ) + self.assertEqual( + Packet.from_bytes(TestPacket.__bytes__(packet)), + packet, + ) + with self.assertRaises(RuntimeError): + TestPacketTwo.from_bytes(bytes(packet)) def test_comparisons(self): packet = TestPacket(False, 42, 3.14)