From 598d7c30abec647bd913304a298b428a641f8630 Mon Sep 17 00:00:00 2001 From: Stefan Tatschner Date: Tue, 20 Feb 2024 12:37:07 +0100 Subject: [PATCH] chore: make fmt --- src/cursed_hr/cursed_hr.py | 143 ++------- src/gallia/cli.py | 16 +- src/gallia/command/base.py | 8 +- src/gallia/command/uds.py | 24 +- src/gallia/commands/discover/doip.py | 75 ++--- src/gallia/commands/discover/find_xcp.py | 16 +- src/gallia/commands/discover/uds/isotp.py | 8 +- src/gallia/commands/fuzz/uds/pdu.py | 20 +- src/gallia/commands/primitive/uds/dtc.py | 11 +- src/gallia/commands/primitive/uds/iocbi.py | 22 +- src/gallia/commands/primitive/uds/rmba.py | 4 +- src/gallia/commands/primitive/uds/rtcl.py | 12 +- src/gallia/commands/primitive/uds/wmba.py | 4 +- src/gallia/commands/primitive/uds/xcp.py | 4 +- src/gallia/commands/scan/uds/identifiers.py | 18 +- src/gallia/commands/scan/uds/memory.py | 4 +- src/gallia/commands/scan/uds/reset.py | 12 +- src/gallia/commands/scan/uds/sa_dump_seeds.py | 12 +- src/gallia/commands/scan/uds/services.py | 30 +- src/gallia/commands/scan/uds/sessions.py | 43 +-- src/gallia/db/handler.py | 36 +-- src/gallia/dumpcap.py | 16 +- src/gallia/log.py | 8 +- src/gallia/services/uds/core/client.py | 97 ++---- src/gallia/services/uds/core/exception.py | 40 +-- src/gallia/services/uds/core/service.py | 302 +++++------------- src/gallia/services/uds/core/utils.py | 15 +- src/gallia/services/uds/ecu.py | 58 +--- src/gallia/services/uds/helpers.py | 8 +- src/gallia/services/uds/server.py | 147 +++------ src/gallia/services/xcp/__init__.py | 8 +- src/gallia/services/xcp/types.py | 25 +- src/gallia/transports/can.py | 12 +- src/gallia/transports/doip.py | 57 +--- src/gallia/transports/isotp.py | 8 +- src/gallia/transports/tcp.py | 4 +- src/gallia/transports/unix.py | 8 +- src/opennetzteil/cli.py | 4 +- src/opennetzteil/devices/rs/hmc804.py | 4 +- tests/test_target_uris.py | 3 +- 40 files changed, 338 insertions(+), 1008 deletions(-) diff --git a/src/cursed_hr/cursed_hr.py b/src/cursed_hr/cursed_hr.py index d04522a04..5459b850a 100644 --- a/src/cursed_hr/cursed_hr.py +++ b/src/cursed_hr/cursed_hr.py @@ -199,9 +199,7 @@ def __init__( self.file = self.uncompressed_file() try: - with mmap.mmap( - self.file.fileno(), 0, access=mmap.ACCESS_READ - ) as mm_file: + with mmap.mmap(self.file.fileno(), 0, access=mmap.ACCESS_READ) as mm_file: self.parse_structure(mm_file) self.entries = EntryCache(mm_file, self.entry_positions) self.handle_io() @@ -244,9 +242,7 @@ def define_colors(self) -> dict[int, int]: curses.init_pair(InterpretationColor.DEFAULT, 8, -1) curses.init_pair(InterpretationColor.UDS_REQUEST, curses.COLOR_CYAN, -1) - curses.init_pair( - InterpretationColor.UDS_POSITIVE_RESPONSE, curses.COLOR_GREEN, -1 - ) + curses.init_pair(InterpretationColor.UDS_POSITIVE_RESPONSE, curses.COLOR_GREEN, -1) curses.init_pair(InterpretationColor.UDS_NEGATIVE_RESPONSE, 166, -1) return {prio: color[0] for prio, color in prio_colors.items()} @@ -265,9 +261,7 @@ def uncompressed_file(self) -> BinaryIO: try: if self.in_file.suffix in [".zst", ".gz"]: self.window.erase() - self.window.addstr( - f"Loading contents from {self.in_file}: Decompressing file ..." - ) + self.window.addstr(f"Loading contents from {self.in_file}: Decompressing file ...") self.window.refresh() file = tempfile.TemporaryFile() @@ -314,9 +308,7 @@ def parse_structure(self, file: BinaryIO | mmap.mmap) -> None: file.seek(0) self.window.erase() - self.window.addstr( - f"Loading contents from {self.in_file}: Parsing structure ({0}%)" - ) + self.window.addstr(f"Loading contents from {self.in_file}: Parsing structure ({0}%)") self.window.refresh() prev_progress = 0 @@ -365,9 +357,7 @@ def configuration(self) -> Configuration: def new_configuration(self) -> None: self.configuration_index += 1 - self.configuration_history = self.configuration_history[ - : self.configuration_index - ] + self.configuration_history = self.configuration_history[: self.configuration_index] self.configuration_history.append(deepcopy(self.configuration_history[-1])) def entry_zone(self, entry_id: int) -> PriorityZone: @@ -439,9 +429,7 @@ def update_zones(self, new_zone: PriorityZone) -> None: if new_zone.start >= last_zone.start: self.configuration.priority_zones.insert( -1, - PriorityZone( - last_zone.start, new_zone.start - 1, last_zone.priority - ), + PriorityZone(last_zone.start, new_zone.start - 1, last_zone.priority), ) last_zone.start = new_zone.end + 1 @@ -461,9 +449,7 @@ def format_text(self, text: str, entry: PenlogEntry) -> FormattedText: :return: The formatted text. """ if entry.tags is not None and "JSON" in entry.tags: - return FormattedText( - text, curses.color_pair(self.color_ids[PenlogPriority.ERROR]) - ) + return FormattedText(text, curses.color_pair(self.color_ids[PenlogPriority.ERROR])) text_format = curses.color_pair(self.color_ids[entry.priority]) @@ -612,13 +598,9 @@ def interpret_entry(self, entry: PenlogEntry) -> None: entry.interpretation = repr(response) if isinstance(response, NegativeResponse): - entry.interpretation_color = ( - InterpretationColor.UDS_NEGATIVE_RESPONSE - ) + entry.interpretation_color = InterpretationColor.UDS_NEGATIVE_RESPONSE else: - entry.interpretation_color = ( - InterpretationColor.UDS_POSITIVE_RESPONSE - ) + entry.interpretation_color = InterpretationColor.UDS_POSITIVE_RESPONSE else: entry.interpretation = repr(UDSRequest.parse_dynamic(data)) entry.interpretation_color = InterpretationColor.UDS_REQUEST @@ -641,9 +623,7 @@ def check_filter(self, entry_id: int) -> bool: return True - def calculate_display_entries( - self, start_entry: int, entry_line: int - ) -> list[DisplayEntry]: + def calculate_display_entries(self, start_entry: int, entry_line: int) -> list[DisplayEntry]: """ Returns a list of display entries starting from the given start entry, which qualify to be displayed by having a sufficient priority as well as passing any other filter. @@ -675,11 +655,7 @@ def calculate_display_entries( fallback_entry = [ DisplayEntry( - [ - self.default_text( - "No entries found! You can select a new priority on this line" - ) - ], + [self.default_text("No entries found! You can select a new priority on this line")], 0, 0, ) @@ -750,9 +726,7 @@ def next_sufficient_entry(self, entry_id: int) -> int | None: pointer = self.priority_pointer(entry_id, prio) assert pointer is not None zone, pointer = self.next_sufficient_pointer(None, pointer, entry_id) - return ( - self.level_pointers[zone.priority][pointer] if pointer is not None else None - ) + return self.level_pointers[zone.priority][pointer] if pointer is not None else None def previous_sufficient_entry(self, entry_id: int) -> int | None: """ @@ -768,9 +742,7 @@ def previous_sufficient_entry(self, entry_id: int) -> int | None: pointer = self.priority_pointer(entry_id, prio) assert pointer is not None zone, pointer = self.previous_sufficient_pointer(None, pointer, entry_id) - return ( - self.level_pointers[zone.priority][pointer] if pointer is not None else None - ) + return self.level_pointers[zone.priority][pointer] if pointer is not None else None def next_sufficient_pointer( self, prio_zone: PriorityZone | None, pointer: int, entry_id: int @@ -787,17 +759,11 @@ def next_sufficient_pointer( :param entry_id: The index of the entry, to which the given pointer points, in self.entries. :return: The next priority pointer's index along the corresponding priority zone. """ - zone, ptr = self.next_sufficient_pointer_without_filters( - prio_zone, pointer, entry_id - ) + zone, ptr = self.next_sufficient_pointer_without_filters(prio_zone, pointer, entry_id) - while ptr is not None and not self.check_filter( - self.level_pointers[zone.priority][ptr] - ): + while ptr is not None and not self.check_filter(self.level_pointers[zone.priority][ptr]): entry_id = self.level_pointers[zone.priority][ptr] - zone, ptr = self.next_sufficient_pointer_without_filters( - prio_zone, ptr, entry_id - ) + zone, ptr = self.next_sufficient_pointer_without_filters(prio_zone, ptr, entry_id) return zone, ptr @@ -816,17 +782,11 @@ def previous_sufficient_pointer( :param entry_id: The index of the entry, to which the given pointer points, in self.entries. :return: The previous priority pointer's index along the corresponding priority zone. """ - zone, ptr = self.previous_sufficient_pointer_without_filters( - prio_zone, pointer, entry_id - ) + zone, ptr = self.previous_sufficient_pointer_without_filters(prio_zone, pointer, entry_id) - while ptr is not None and not self.check_filter( - self.level_pointers[zone.priority][ptr] - ): + while ptr is not None and not self.check_filter(self.level_pointers[zone.priority][ptr]): entry_id = self.level_pointers[zone.priority][ptr] - zone, ptr = self.previous_sufficient_pointer_without_filters( - prio_zone, ptr, entry_id - ) + zone, ptr = self.previous_sufficient_pointer_without_filters(prio_zone, ptr, entry_id) return zone, ptr @@ -908,9 +868,7 @@ def previous_sufficient_pointer_without_filters( return self.configuration.priority_zones[0], None - def priority_pointer( - self, entry_id: int, prio: PenlogPriority, mode: int = 0 - ) -> int | None: + def priority_pointer(self, entry_id: int, prio: PenlogPriority, mode: int = 0) -> int | None: """ Returns the index of the level pointer of the entry with the given id, or depending on the mode the previous or next entry, in the level pointers with the given priority. @@ -990,9 +948,7 @@ def handle_io(self) -> None: filter_history = [self.configuration.filter] display_entries = self.calculate_display_entries(0, 0) - self.display( - display_entries, self.status(display_entries, (len(display_entries) - 1, 0)) - ) + self.display(display_entries, self.status(display_entries, (len(display_entries) - 1, 0))) prefix_length = 0 cursor = (0, prefix_length) self.window.move(*cursor) @@ -1006,9 +962,7 @@ def update_selected_zones(prio: PenlogPriority) -> None: if start_entry is None: return - stop_display_entry = display_entries[ - min(cursor[0], len(display_entries) - 1) - ] + stop_display_entry = display_entries[min(cursor[0], len(display_entries) - 1)] stop_entry = stop_display_entry.penlog_entry_number if start_entry == stop_entry: @@ -1024,9 +978,7 @@ def update_selected_zones(prio: PenlogPriority) -> None: stop_entry = stop_entry_ self.update_zones( - PriorityZone( - min(start_entry, stop_entry), max(start_entry, stop_entry), prio - ) + PriorityZone(min(start_entry, stop_entry), max(start_entry, stop_entry), prio) ) start_entry = None @@ -1048,9 +1000,7 @@ def page_up() -> None: if display_entries[0].entry_line_number == 0: if entry_start > 0: - entry_start = max( - 0, display_entries[0].penlog_entry_number - 1 - ) + entry_start = max(0, display_entries[0].penlog_entry_number - 1) line_start = -1 else: line_start = display_entries[0].entry_line_number - 1 @@ -1058,9 +1008,7 @@ def page_up() -> None: if old_entry_start == entry_start and old_line_start == line_start: break - display_entries = self.calculate_display_entries( - entry_start, line_start - ) + display_entries = self.calculate_display_entries(entry_start, line_start) def line_up() -> None: nonlocal entry_start @@ -1106,9 +1054,7 @@ def line_up() -> None: entry_start = len(self.entries) - 1 line_start = -1 - display_entries = self.calculate_display_entries( - entry_start, line_start - ) + display_entries = self.calculate_display_entries(entry_start, line_start) page_up() cursor = (len(display_entries) - 1, cursor[1]) case "KEY_LEFT": @@ -1152,9 +1098,7 @@ def line_up() -> None: update_selected_zones(prio) else: self.new_configuration() - self.configuration.priority_zones = [ - PriorityZone(0, None, prio) - ] + self.configuration.priority_zones = [PriorityZone(0, None, prio)] break case "u": @@ -1169,10 +1113,7 @@ def line_up() -> None: self.configuration.interpret = not self.configuration.interpret case "f": # fh is short for filter_history to reduce long unreadable lines - fh_tmp = [ - "; ".join(filter_commands) - for filter_commands in filter_history - ] + fh_tmp = ["; ".join(filter_commands) for filter_commands in filter_history] fh_tmp.append("") fh_index = len(fh_tmp) - 1 @@ -1241,9 +1182,7 @@ def line_up() -> None: display_entries, [FormattedText(fh_tmp[fh_index], input_format)], ) - self.window.move( - max_lines, min(filter_cursor, len(fh_tmp[fh_index])) - ) + self.window.move(max_lines, min(filter_cursor, len(fh_tmp[fh_index]))) case "x": self.use_prefix = not self.use_prefix case "?": @@ -1260,9 +1199,7 @@ def line_up() -> None: if display_help: display_entries = self.help_message(line_start) else: - display_entries = self.calculate_display_entries( - entry_start, line_start - ) + display_entries = self.calculate_display_entries(entry_start, line_start) previous_entry_start = 0 previous_line_start = 0 @@ -1279,9 +1216,7 @@ def line_up() -> None: if display_help: display_entries = self.help_message(line_start) else: - display_entries = self.calculate_display_entries( - entry_start, line_start - ) + display_entries = self.calculate_display_entries(entry_start, line_start) if start_entry is not None: stop_entry = display_entries[ @@ -1346,21 +1281,15 @@ def add_entries(message: str, prefix: str = "") -> None: add_entries("Log level keys (ordered from highest to lowest):") for level in self.priority_keys: - add_entries( - f"{PenlogPriority(self.priority_keys[level]).name}", f" {level}: " - ) + add_entries(f"{PenlogPriority(self.priority_keys[level]).name}", f" {level}: ") add_entries("") add_entries("Filtering information:") add_entries( "Beware that filters are arbitrary Python statements executed with eval on each line!" ) - add_entries( - "This can lead to poor performance on large files as well as side effects!" - ) - add_entries( - "Existing filters can be traversed using the up and down arrow keys" - ) + add_entries("This can lead to poor performance on large files as well as side effects!") + add_entries("Existing filters can be traversed using the up and down arrow keys") add_entries("The following attributes of each line are exposed as a variable:") for attr in self.entries[0].__dict__: @@ -1436,9 +1365,7 @@ def parse_filter(text: str) -> list[str]: tags=[], ) - commands = [ - command.strip() for command in text.split(";") if len(command.strip()) > 0 - ] + commands = [command.strip() for command in text.split(";") if len(command.strip()) > 0] with warnings.catch_warnings(): warnings.simplefilter("ignore", SyntaxWarning) diff --git a/src/gallia/cli.py b/src/gallia/cli.py index 21c93c174..5c38b009f 100644 --- a/src/gallia/cli.py +++ b/src/gallia/cli.py @@ -170,9 +170,7 @@ def build_cli( continue if cls.SUBGROUP is not None: - subparsers = parsers["siblings"][cls.GROUP]["siblings"][cls.SUBGROUP][ - "subparsers" - ] + subparsers = parsers["siblings"][cls.GROUP]["siblings"][cls.SUBGROUP]["subparsers"] else: subparsers = parsers["siblings"][cls.GROUP]["subparsers"] @@ -224,9 +222,7 @@ def _get_cli_defaults(parser: argparse.ArgumentParser, out: dict[str, Any]) -> N else: continue - keys = ( - f"{parser.prog} {opts_str.removeprefix('--').replace('-', '_')}".split() - ) + keys = f"{parser.prog} {opts_str.removeprefix('--').replace('-', '_')}".split() value = action.default d = out @@ -318,9 +314,7 @@ def _print_plugin(description: str, eps: list[EntryPoint]) -> None: def cmd_show_plugins() -> None: - _print_plugin( - "initialization callbacks (gallia_cli_init)", load_cli_init_plugin_eps() - ) + _print_plugin("initialization callbacks (gallia_cli_init)", load_cli_init_plugin_eps()) _print_plugin("commands (gallia_commands)", load_command_plugin_eps()) _print_plugin("transports (gallia_transports)", load_transport_plugin_eps()) _print_plugin("ecus (gallia_ecus)", load_ecu_plugin_eps()) @@ -419,9 +413,7 @@ def main() -> None: setup_logging( level=get_log_level(args), - no_volatile_info=args.no_volatile_info - if hasattr(args, "no_volatile_info") - else True, + no_volatile_info=args.no_volatile_info if hasattr(args, "no_volatile_info") else True, ) sys.exit(args.cls_object.entry_point(args)) diff --git a/src/gallia/command/base.py b/src/gallia/command/base.py index d531aadc5..5ac019be5 100644 --- a/src/gallia/command/base.py +++ b/src/gallia/command/base.py @@ -286,9 +286,7 @@ async def _db_finish_run_meta(self) -> None: self.artifacts_dir, ) except Exception as e: - logger.warning( - f"Could not write the run meta to the database: {e!r}" - ) + logger.warning(f"Could not write the run meta to the database: {e!r}") try: await self.db_handler.disconnect() @@ -416,9 +414,7 @@ def entry_point(self, args: Namespace) -> int: if isinstance(e, t): # TODO: Map the exitcode to superclass of builtin exceptions. exit_code = exitcode.IOERR - logger.critical( - f"Caught expected exception, stack trace on debug level: {e!r}" - ) + logger.critical(f"Caught expected exception, stack trace on debug level: {e!r}") logger.debug(e, exc_info=True) break else: diff --git a/src/gallia/command/uds.py b/src/gallia/command/uds.py index fbbeedd1b..dfe9d4e14 100644 --- a/src/gallia/command/uds.py +++ b/src/gallia/command/uds.py @@ -76,9 +76,7 @@ def configure_class_parser(self) -> None: ) group.add_argument( "--tester-present-interval", - default=self.config.get_value( - "gallia.protocols.uds.tester_present_interval", 0.5 - ), + default=self.config.get_value("gallia.protocols.uds.tester_present_interval", 0.5), type=float, metavar="SECONDS", help="Modify the interval of the cyclic tester present packets", @@ -97,9 +95,7 @@ def configure_class_parser(self) -> None: ) group.add_argument( "--compare-properties", - default=self.config.get_value( - "gallia.protocols.uds.compare_properties", True - ), + default=self.config.get_value("gallia.protocols.uds.compare_properties", True), action=BooleanOptionalAction, help="Compare properties before and after the scan", ) @@ -167,14 +163,10 @@ async def setup(self, args: Namespace) -> None: if self.db_handler is not None: try: - await self.db_handler.insert_scan_run_properties_pre( - await self.ecu.properties() - ) + await self.db_handler.insert_scan_run_properties_pre(await self.ecu.properties()) self._apply_implicit_logging_setting() except Exception as e: - logger.warning( - f"Could not write the properties_pre to the database: {e!r}" - ) + logger.warning(f"Could not write the properties_pre to the database: {e!r}") async def teardown(self, args: Namespace) -> None: if args.properties is True and not self.ecu.transport.is_closed: @@ -192,9 +184,7 @@ async def teardown(self, args: Namespace) -> None: if self.db_handler is not None: try: - await self.db_handler.complete_scan_run( - await self.ecu.properties(False) - ) + await self.db_handler.complete_scan_run(await self.ecu.properties(False)) except Exception as e: logger.warning(f"Could not write the scan run to the database: {e!r}") @@ -225,6 +215,4 @@ async def setup(self, args: Namespace) -> None: try: await self.db_handler.insert_discovery_run(args.target.url.scheme) except Exception as e: - logger.warning( - f"Could not write the discovery run to the database: {e!r}" - ) + logger.warning(f"Could not write the discovery run to the database: {e!r}") diff --git a/src/gallia/commands/discover/doip.py b/src/gallia/commands/discover/doip.py index 533a6813c..6779b3c58 100644 --- a/src/gallia/commands/discover/doip.py +++ b/src/gallia/commands/discover/doip.py @@ -92,18 +92,12 @@ async def main2(self, args: Namespace) -> int: try: await self.db_handler.insert_discovery_run("doip") except Exception as e: - logger.warning( - f"Could not write the discovery run to the database: {e!r}" - ) + logger.warning(f"Could not write the discovery run to the database: {e!r}") # Discover Hostname and Port tgt_hostname: str tgt_port: int - if ( - target is not None - and target.hostname is not None - and target.port is not None - ): + if target is not None and target.hostname is not None and target.port is not None: logger.notice("[📋] Skipping host/port discovery because given by --target") tgt_hostname = target.hostname tgt_port = target.port @@ -113,9 +107,7 @@ async def main2(self, args: Namespace) -> int: hosts = await self.run_udp_discovery() if len(hosts) != 1: - logger.error( - "[🍃] Can only continue with a single DoIP host! Give me a --target!" - ) + logger.error("[🍃] Can only continue with a single DoIP host! Give me a --target!") return 11 tgt_hostname, tgt_port = hosts[0] @@ -124,9 +116,7 @@ async def main2(self, args: Namespace) -> int: rat_success: list[int] = [] rat_wrong_source: list[int] = [] if target is not None and "activation_type" in parse_qs(target.query): - logger.notice( - "[📋] Skipping RoutingActivationType discovery because given by --target" - ) + logger.notice("[📋] Skipping RoutingActivationType discovery because given by --target") rat_success = [int(parse_qs(target.query)["activation_type"][0], 0)] else: logger.notice("[🔍] Enumerating all RoutingActivationTypes") @@ -150,9 +140,7 @@ async def main2(self, args: Namespace) -> int: # Discovering correct source address for suitable RoutingActivationRequests if target is not None and "src_addr" in parse_qs(target.query): - logger.notice( - "[📋] Skipping SourceAddress discovery because given by --target" - ) + logger.notice("[📋] Skipping SourceAddress discovery because given by --target") targets = [ f"doip://{tgt_hostname}:{tgt_port}?activation_type={rat:#x}&src_addr={parse_qs(target.query)['src_addr'][0]}" for rat in rat_success @@ -228,14 +216,9 @@ async def enumerate_routing_activation_types( f"[🤯] Holy moly, it actually worked for activation_type {routing_activation_type:#x} and src_addr {src_addr:#x}!!!" ) except DoIPRoutingActivationDeniedError as e: - logger.info( - f"[🌟] splendid, {routing_activation_type:#x} yields {e.rac_code.name}" - ) + logger.info(f"[🌟] splendid, {routing_activation_type:#x} yields {e.rac_code.name}") - if ( - e.rac_code - != RoutingActivationResponseCodes.UnsupportedActivationType - ): + if e.rac_code != RoutingActivationResponseCodes.UnsupportedActivationType: rat_not_unsupported.append(routing_activation_type) if e.rac_code == RoutingActivationResponseCodes.UnknownSourceAddress: @@ -264,9 +247,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 responsive_targets = [] search_space = range(start, stop + 1) - conn = await self.create_DoIP_conn( - tgt_hostname, tgt_port, correct_rat, correct_src, 0xAFFE - ) + conn = await self.create_DoIP_conn(tgt_hostname, tgt_port, correct_rat, correct_src, 0xAFFE) for target_addr in search_space: logger.debug(f"[🚧] Attempting connection to {target_addr:#x}") @@ -280,9 +261,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 # If we reach this, the request was not denied due to unknown TargetAddress known_targets.append(current_target) - logger.notice( - f"[🥇] HEUREKA: target address {target_addr:#x} is valid! " - ) + logger.notice(f"[🥇] HEUREKA: target address {target_addr:#x} is valid! ") async with aiofiles.open( self.artifacts_dir.joinpath("3_valid_targets.txt"), "a" ) as f: @@ -293,8 +272,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 while True: pot_broadcast, data = await asyncio.wait_for( self.read_diag_request_custom(conn), - TimingAndCommunicationParameters.DiagnosticMessageMessageTimeout - / 1000 + TimingAndCommunicationParameters.DiagnosticMessageMessageTimeout / 1000 if timeout is None else timeout, ) @@ -312,9 +290,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 ) resp = TesterPresentResponse.parse_static(data) - logger.notice( - f"[🥳] It cannot get nicer: {target_addr:#x} responded: {resp}" - ) + logger.notice(f"[🥳] It cannot get nicer: {target_addr:#x} responded: {resp}") responsive_targets.append(current_target) async with aiofiles.open( self.artifacts_dir.joinpath("4_responsive_targets.txt"), "a" @@ -324,10 +300,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 await self.db_handler.insert_discovery_result(current_target) except DoIPNegativeAckError as e: - if ( - e.nack_code - == DiagnosticMessageNegativeAckCodes.UnknownTargetAddress - ): + if e.nack_code == DiagnosticMessageNegativeAckCodes.UnknownTargetAddress: logger.info(f"[🫥] {target_addr:#x} is an unknown target address") continue elif e.nack_code == DiagnosticMessageNegativeAckCodes.TargetUnreachable: @@ -349,9 +322,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 continue except asyncio.TimeoutError: # This triggers when DoIP ACK but no UDS reply - logger.info( - f"[🙊] Presumably no active ECU on target address {target_addr:#x}" - ) + logger.info(f"[🙊] Presumably no active ECU on target address {target_addr:#x}") async with aiofiles.open( self.artifacts_dir.joinpath("5_unresponsive_targets.txt"), "a" ) as f: @@ -360,9 +331,7 @@ async def enumerate_target_addresses( # noqa: PLR0913 except (ConnectionError, ConnectionResetError) as e: # Whenever this triggers, but sometimes connections are closed not by us - logger.warn( - f"[🫦] Sexy, but unexpected: {target_addr:#x} triggered {e}" - ) + logger.warn(f"[🫦] Sexy, but unexpected: {target_addr:#x} triggered {e}") async with aiofiles.open( self.artifacts_dir.joinpath("7_targets_with_errors.txt"), "a" ) as f: @@ -425,9 +394,7 @@ async def create_DoIP_conn( # noqa: PLR0913 continue return conn - async def read_diag_request_custom( - self, conn: DoIPConnection - ) -> tuple[int | None, bytes]: + async def read_diag_request_custom(self, conn: DoIPConnection) -> tuple[int | None, bytes]: while True: hdr, payload = await conn.read_frame() if not isinstance(payload, DiagnosticMessage): @@ -468,9 +435,7 @@ async def enumerate_source_addresses( try: await conn.write_routing_activation_request(routing_activation_type) except DoIPRoutingActivationDeniedError as e: - logger.info( - f"[🌟] splendid, {source_address:#x} yields {e.rac_code.name}" - ) + logger.info(f"[🌟] splendid, {source_address:#x} yields {e.rac_code.name}") if e.rac_code != RoutingActivationResponseCodes.UnknownSourceAddress: denied_sourceAddresses.append(source_address) @@ -522,9 +487,7 @@ async def run_udp_discovery(self) -> list[tuple[str, int]]: all_ips.append(ip) for ip in all_ips: - logger.info( - f"[💌] Sending DoIP VehicleIdentificationRequest to {ip.broadcast}" - ) + logger.info(f"[💌] Sending DoIP VehicleIdentificationRequest to {ip.broadcast}") sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.settimeout(2) @@ -551,9 +514,7 @@ async def run_udp_discovery(self) -> list[tuple[str, int]]: for item in found: url = f"doip://{item[0]}:{item[1]}" logger.notice(url) - async with aiofiles.open( - self.artifacts_dir.joinpath("0_valid_hosts.txt"), "a" - ) as f: + async with aiofiles.open(self.artifacts_dir.joinpath("0_valid_hosts.txt"), "a") as f: await f.write(f"{url}\n") return found diff --git a/src/gallia/commands/discover/find_xcp.py b/src/gallia/commands/discover/find_xcp.py index e1d4e61ed..abe1836ee 100644 --- a/src/gallia/commands/discover/find_xcp.py +++ b/src/gallia/commands/discover/find_xcp.py @@ -30,9 +30,7 @@ def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None: self.socket: socket.socket def configure_parser(self) -> None: - subparsers = self.parser.add_subparsers( - dest="mode", required=True, help="Transport mode" - ) + subparsers = self.parser.add_subparsers(dest="mode", required=True, help="Transport mode") sp = subparsers.add_parser("can") sp.add_argument( @@ -42,9 +40,7 @@ def configure_parser(self) -> None: required=True, help="CAN interface used for XCP communication", ) - sp.add_argument( - "--can-fd", action="store_true", default=False, help="use can FD" - ) + sp.add_argument("--can-fd", action="store_true", default=False, help="use can FD") sp.add_argument( "--extended", action="store_true", @@ -256,9 +252,7 @@ def test_eth_broadcast(self, args: Namespace) -> None: logger.info(f"xcp interface ip for multicast group: {addr}") self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.socket.setsockopt( - socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(addr) - ) + self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(addr)) self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32) self.socket.settimeout(2) @@ -276,6 +270,4 @@ def test_eth_broadcast(self, args: Namespace) -> None: except socket.timeout: logger.info("Timeout") - logger.result( - f"Finished; Found {len(endpoints)} XCP endpoints via multicast group" - ) + logger.result(f"Finished; Found {len(endpoints)} XCP endpoints via multicast group") diff --git a/src/gallia/commands/discover/uds/isotp.py b/src/gallia/commands/discover/uds/isotp.py index dacc3a0b4..7b6f08ae4 100644 --- a/src/gallia/commands/discover/uds/isotp.py +++ b/src/gallia/commands/discover/uds/isotp.py @@ -180,9 +180,7 @@ async def main(self, args: Namespace) -> None: try: addr, payload = await transport.recvfrom(timeout=0.1) if addr == ID: - logger.info( - f"The same CAN ID {can_id_repr(ID)} answered. Skipping…" - ) + logger.info(f"The same CAN ID {can_id_repr(ID)} answered. Skipping…") continue except asyncio.TimeoutError: continue @@ -214,9 +212,7 @@ async def main(self, args: Namespace) -> None: ) target_args = {} target_args["is_fd"] = str(transport.config.is_fd).lower() - target_args["is_extended"] = str( - transport.config.is_extended - ).lower() + target_args["is_extended"] = str(transport.config.is_extended).lower() if args.extended_addr: target_args["ext_address"] = hex(ID) diff --git a/src/gallia/commands/fuzz/uds/pdu.py b/src/gallia/commands/fuzz/uds/pdu.py index 2585cdf36..115f8a6b4 100644 --- a/src/gallia/commands/fuzz/uds/pdu.py +++ b/src/gallia/commands/fuzz/uds/pdu.py @@ -103,15 +103,11 @@ async def observe_can_messages(self, can_ids: list[int], args: Namespace) -> Non can_id, msg = await transport.recvfrom(timeout=1) if can_id in can_msgs: if msg != can_msgs[can_id]: - logger.result( - f"Message for {can_id:03x} changed to {msg.hex()}" - ) + logger.result(f"Message for {can_id:03x} changed to {msg.hex()}") can_msgs[can_id] = msg else: can_msgs[can_id] = msg - logger.result( - f"Observed new message from {can_id:03x}: {msg.hex()}" - ) + logger.result(f"Observed new message from {can_id:03x}: {msg.hex()}") except asyncio.TimeoutError: continue @@ -120,9 +116,7 @@ async def observe_can_messages(self, can_ids: list[int], args: Namespace) -> Non async def main(self, args: Namespace) -> None: if args.observe_can_ids: - recv_task = asyncio.create_task( - self.observe_can_messages(args.observe_can_ids, args) - ) + recv_task = asyncio.create_task(self.observe_can_messages(args.observe_can_ids, args)) logger.info(f"testing sessions {args.sessions}") @@ -135,9 +129,7 @@ async def main(self, args: Namespace) -> None: logger.notice(f"Switching to session 0x{session:02x}") resp: UDSResponse = await self.ecu.set_session(session) if isinstance(resp, NegativeResponse): - logger.warning( - f"Switching to session 0x{session:02x} failed: {resp}" - ) + logger.warning(f"Switching to session 0x{session:02x} failed: {resp}") continue logger.result(f"Starting scan in session: 0x{session:02x}") @@ -177,9 +169,7 @@ async def main(self, args: Namespace) -> None: illegal_resp += 1 # Temporary patch: Exception handler is deleted when it goes productive except ConnectionError: - logger.warning( - "isotp flow control frame missing. Reconnecting…" - ) + logger.warning("isotp flow control frame missing. Reconnecting…") flow_control_miss += 1 await self.ecu.reconnect() diff --git a/src/gallia/commands/primitive/uds/dtc.py b/src/gallia/commands/primitive/uds/dtc.py index 7d755d7ba..a510f5a97 100644 --- a/src/gallia/commands/primitive/uds/dtc.py +++ b/src/gallia/commands/primitive/uds/dtc.py @@ -76,8 +76,7 @@ def configure_parser(self) -> None: ) control_parser = sub_parser.add_parser( "control", - help="Stop or resume the setting of DTCs using the " - "ControlDTCSetting service", + help="Stop or resume the setting of DTCs using the " "ControlDTCSetting service", ) control_group = control_parser.add_mutually_exclusive_group(required=True) control_group.add_argument( @@ -92,9 +91,7 @@ def configure_parser(self) -> None: ) async def fetch_error_codes(self, mask: int, split: bool = True) -> dict[int, int]: - ecu_response = await self.ecu.read_dtc_information_report_dtc_by_status_mask( - mask - ) + ecu_response = await self.ecu.read_dtc_information_report_dtc_by_status_mask(mask) dtcs = {} if isinstance(ecu_response, NegativeResponse): @@ -113,9 +110,7 @@ async def fetch_error_codes(self, mask: int, split: bool = True) -> dict[int, in logger.info(f"Trying to fetch with mask {g_repr(sub_mask)}") dtcs.update(await self.fetch_error_codes(sub_mask, False)) else: - logger.critical( - f"Could not fetch error codes: {ecu_response}; exiting…" - ) + logger.critical(f"Could not fetch error codes: {ecu_response}; exiting…") sys.exit(1) else: dtcs = ecu_response.dtc_and_status_record diff --git a/src/gallia/commands/primitive/uds/iocbi.py b/src/gallia/commands/primitive/uds/iocbi.py index a277741a5..3e47beeac 100644 --- a/src/gallia/commands/primitive/uds/iocbi.py +++ b/src/gallia/commands/primitive/uds/iocbi.py @@ -71,9 +71,7 @@ async def main(self, args: Namespace) -> None: try: await self.ecu.check_and_set_session(args.session) except Exception as e: - logger.critical( - f"Could not change to session: {g_repr(args.session)}: {e!r}" - ) + logger.critical(f"Could not change to session: {g_repr(args.session)}: {e!r}") sys.exit(1) did = args.data_identifier @@ -81,26 +79,20 @@ async def main(self, args: Namespace) -> None: uses_control_parameter = True if args.control_parameter == "return-control-to-ecu": - resp = ( - await self.ecu.input_output_control_by_identifier_return_control_to_ecu( - did, control_enable_mask_record - ) + resp = await self.ecu.input_output_control_by_identifier_return_control_to_ecu( + did, control_enable_mask_record ) elif args.control_parameter == "reset-to-default": resp = await self.ecu.input_output_control_by_identifier_reset_to_default( did, control_enable_mask_record ) elif args.control_parameter == "freeze-current-state": - resp = ( - await self.ecu.input_output_control_by_identifier_freeze_current_state( - did, control_enable_mask_record - ) + resp = await self.ecu.input_output_control_by_identifier_freeze_current_state( + did, control_enable_mask_record ) elif args.control_parameter == "short-term-adjustment": - resp = ( - await self.ecu.input_output_control_by_identifier_short_term_adjustment( - did, args.new_state, control_enable_mask_record - ) + resp = await self.ecu.input_output_control_by_identifier_short_term_adjustment( + did, args.new_state, control_enable_mask_record ) elif args.control_parameter == "without-control-parameter": resp = await self.ecu.input_output_control_by_identifier( diff --git a/src/gallia/commands/primitive/uds/rmba.py b/src/gallia/commands/primitive/uds/rmba.py index 880b996c1..37ddd3b08 100644 --- a/src/gallia/commands/primitive/uds/rmba.py +++ b/src/gallia/commands/primitive/uds/rmba.py @@ -45,9 +45,7 @@ async def main(self, args: Namespace) -> None: try: await self.ecu.check_and_set_session(args.session) except Exception as e: - logger.critical( - f"Could not change to session: {g_repr(args.session)}: {e!r}" - ) + logger.critical(f"Could not change to session: {g_repr(args.session)}: {e!r}") sys.exit(1) resp = await self.ecu.read_memory_by_address(args.address, args.length) diff --git a/src/gallia/commands/primitive/uds/rtcl.py b/src/gallia/commands/primitive/uds/rtcl.py index e38bcf0ee..90097e4cb 100644 --- a/src/gallia/commands/primitive/uds/rtcl.py +++ b/src/gallia/commands/primitive/uds/rtcl.py @@ -94,9 +94,7 @@ async def main(self, args: Namespace) -> None: try: await self.ecu.check_and_set_session(args.session) except Exception as e: - logger.critical( - f"Could not change to session: {g_repr(args.session)}: {e!r}" - ) + logger.critical(f"Could not change to session: {g_repr(args.session)}: {e!r}") sys.exit(1) if args.start is False and args.stop is False and args.results is False: @@ -120,9 +118,7 @@ async def main(self, args: Namespace) -> None: delay = args.stop_delay if delay > 0: - logger.info( - f"Delaying the request for stopping the routine by {delay} seconds" - ) + logger.info(f"Delaying the request for stopping the routine by {delay} seconds") await asyncio.sleep(delay) resp = await self.ecu.routine_control_stop_routine( @@ -140,9 +136,7 @@ async def main(self, args: Namespace) -> None: delay = args.results_delay if delay > 0: - logger.info( - f"Delaying the request for the routine results by {delay} seconds" - ) + logger.info(f"Delaying the request for the routine results by {delay} seconds") await asyncio.sleep(delay) resp = await self.ecu.routine_control_request_routine_results( diff --git a/src/gallia/commands/primitive/uds/wmba.py b/src/gallia/commands/primitive/uds/wmba.py index b0c9c4487..e851f15f7 100644 --- a/src/gallia/commands/primitive/uds/wmba.py +++ b/src/gallia/commands/primitive/uds/wmba.py @@ -51,9 +51,7 @@ async def main(self, args: Namespace) -> None: try: await self.ecu.check_and_set_session(args.session) except Exception as e: - logger.critical( - f"Could not change to session: {g_repr(args.session)}: {e!r}" - ) + logger.critical(f"Could not change to session: {g_repr(args.session)}: {e!r}") sys.exit(1) if args.data is not None: diff --git a/src/gallia/commands/primitive/uds/xcp.py b/src/gallia/commands/primitive/uds/xcp.py index 559891b48..eaee13c6d 100644 --- a/src/gallia/commands/primitive/uds/xcp.py +++ b/src/gallia/commands/primitive/uds/xcp.py @@ -34,9 +34,7 @@ async def setup(self, args: Namespace) -> None: if isinstance(transport, RawCANTransport): if args.can_master is None or args.can_slave is None: - self.parser.error( - "For CAN interfaces, master and slave address are required!" - ) + self.parser.error("For CAN interfaces, master and slave address are required!") self.service = CANXCPSerivce(transport, args.can_master, args.can_slave) elif isinstance(transport, ISOTPTransport): diff --git a/src/gallia/commands/scan/uds/identifiers.py b/src/gallia/commands/scan/uds/identifiers.py index 26565b6ff..451063a2f 100644 --- a/src/gallia/commands/scan/uds/identifiers.py +++ b/src/gallia/commands/scan/uds/identifiers.py @@ -110,9 +110,7 @@ async def main(self, args: Namespace) -> None: else: sessions: list[int] = [ - s - for s in args.sessions - if s not in args.skip or args.skip[s] is not None + s for s in args.sessions if s not in args.skip or args.skip[s] is not None ] logger.info(f"testing sessions {g_repr(sessions)}") @@ -123,9 +121,7 @@ async def main(self, args: Namespace) -> None: logger.notice(f"Switching to session {g_repr(session)}") resp: UDSResponse = await self.ecu.set_session(session) if isinstance(resp, NegativeResponse): - logger.warning( - f"Switching to session {g_repr(session)} failed: {resp}" - ) + logger.warning(f"Switching to session {g_repr(session)} failed: {resp}") continue logger.result(f"Starting scan in session: {g_repr(session)}") @@ -159,18 +155,12 @@ async def perform_scan(self, args: Namespace, session: None | int = None) -> Non ) args.end = 0xFF - for DID, sub_function in product( - range(args.start, args.end + 1), sub_functions - ): + for DID, sub_function in product(range(args.start, args.end + 1), sub_functions): if session in args.skip and DID in args.skip[session]: logger.info(f"{g_repr(DID)}: skipped") continue - if ( - session is not None - and args.check_session - and DID % args.check_session == 0 - ): + if session is not None and args.check_session and DID % args.check_session == 0: # Check session and try to recover from wrong session (max 3 times), else skip session if not await self.ecu.check_and_set_session(session): logger.error( diff --git a/src/gallia/commands/scan/uds/memory.py b/src/gallia/commands/scan/uds/memory.py index 24c1e4aa8..376a9f8b7 100644 --- a/src/gallia/commands/scan/uds/memory.py +++ b/src/gallia/commands/scan/uds/memory.py @@ -97,9 +97,7 @@ async def scan_memory_address(self, args: Namespace, addr_offset: int = 0) -> No sys.exit(1) try: - resp = await self.ecu.send_raw( - pdu, config=UDSRequestConfig(tags=["ANALYZE"]) - ) + resp = await self.ecu.send_raw(pdu, config=UDSRequestConfig(tags=["ANALYZE"])) except asyncio.TimeoutError: logger.result(f"Address {g_repr(addr)}: timeout") continue diff --git a/src/gallia/commands/scan/uds/reset.py b/src/gallia/commands/scan/uds/reset.py index 1c2f18947..5b5c5c701 100644 --- a/src/gallia/commands/scan/uds/reset.py +++ b/src/gallia/commands/scan/uds/reset.py @@ -73,9 +73,7 @@ async def main(self, args: Namespace) -> None: logger.notice(f"Switching to session {g_repr(session)}") resp: UDSResponse = await self.ecu.set_session(session) if isinstance(resp, NegativeResponse): - logger.warning( - f"Switching to session {g_repr(session)} failed: {resp}" - ) + logger.warning(f"Switching to session {g_repr(session)} failed: {resp}") continue logger.result(f"Scanning in session: {g_repr(session)}") @@ -134,9 +132,7 @@ async def perform_scan(self, args: Namespace, session: None | int = None) -> Non except asyncio.TimeoutError: l_timeout.append(sub_func) if not args.power_cycle: - logger.error( - f"ECU did not respond after reset level {g_repr(sub_func)}; exit" - ) + logger.error(f"ECU did not respond after reset level {g_repr(sub_func)}; exit") sys.exit(1) logger.warning( @@ -163,9 +159,7 @@ async def perform_scan(self, args: Namespace, session: None | int = None) -> Non f"should be {g_repr(session)}" ) except UnexpectedNegativeResponse as e: - logger.warning( - f"Could not read current session: {e.RESPONSE_CODE.name}" - ) + logger.warning(f"Could not read current session: {e.RESPONSE_CODE.name}") if session is not None: logger.info(f"Setting session {g_repr(session)}") diff --git a/src/gallia/commands/scan/uds/sa_dump_seeds.py b/src/gallia/commands/scan/uds/sa_dump_seeds.py index ae0b1e1ef..60ab88be9 100644 --- a/src/gallia/commands/scan/uds/sa_dump_seeds.py +++ b/src/gallia/commands/scan/uds/sa_dump_seeds.py @@ -102,9 +102,7 @@ async def send_key(self, level: int, key: bytes) -> bool: if isinstance(resp, NegativeResponse): logger.debug(f"Key was rejected: {resp}") return False - logger.result( - f'Unlocked SA level {g_repr(level)} with key "{key.hex()}"! resp: {resp}' - ) + logger.result(f'Unlocked SA level {g_repr(level)} with key "{key.hex()}"! resp: {resp}') return True def log_size(self, path: Path, time_delta: float) -> None: @@ -118,9 +116,7 @@ def log_size(self, path: Path, time_delta: float) -> None: if size > 1024: size = size / 1024 size_unit = "MiB" - logger.notice( - f"Dumping seeds with {rate:.2f}{rate_unit}/h: {size:.2f}{size_unit}" - ) + logger.notice(f"Dumping seeds with {rate:.2f}{rate_unit}/h: {size:.2f}{size_unit}") async def main(self, args: Namespace) -> None: session = args.session @@ -155,9 +151,7 @@ async def main(self, args: Namespace) -> None: if args.check_session or reset: if not await self.ecu.check_and_set_session(args.session): - logger.error( - f"ECU persistently lost session {g_repr(args.session)}" - ) + logger.error(f"ECU persistently lost session {g_repr(args.session)}") sys.exit(1) reset = False diff --git a/src/gallia/commands/scan/uds/services.py b/src/gallia/commands/scan/uds/services.py index da5f73c15..a3dd05dca 100644 --- a/src/gallia/commands/scan/uds/services.py +++ b/src/gallia/commands/scan/uds/services.py @@ -28,9 +28,7 @@ class ServicesScanner(UDSScanner): COMMAND = "services" SHORT_HELP = "service scan on an ECU" - EPILOG = ( - "https://fraunhofer-aisec.github.io/gallia/uds/scan_modes.html#service-scan" - ) + EPILOG = "https://fraunhofer-aisec.github.io/gallia/uds/scan_modes.html#service-scan" def configure_parser(self) -> None: self.parser.add_argument( @@ -85,11 +83,7 @@ async def main(self, args: Namespace) -> None: if args.sessions is None: found[0] = await self.perform_scan(args) else: - sessions = [ - s - for s in args.sessions - if s not in args.skip or args.skip[s] is not None - ] + sessions = [s for s in args.sessions if s not in args.skip or args.skip[s] is not None] logger.info(f"testing sessions {g_repr(sessions)}") # TODO: Unified shortened output necessary here @@ -126,15 +120,11 @@ async def main(self, args: Namespace) -> None: for sid, data in value.items(): self.result.append((key, sid)) try: - logger.result( - f" [{g_repr(sid)}] {UDSIsoServices(sid).name}: {data}" - ) + logger.result(f" [{g_repr(sid)}] {UDSIsoServices(sid).name}: {data}") except Exception: logger.result(f" [{g_repr(sid)}] vendor specific sid: {data}") - async def perform_scan( - self, args: Namespace, session: None | int = None - ) -> dict[int, Any]: + async def perform_scan(self, args: Namespace, session: None | int = None) -> dict[int, Any]: result: dict[int, Any] = {} # Starts at 0x00, see first loop iteration. @@ -158,16 +148,12 @@ async def perform_scan( for length_payload in [1, 2, 3, 5]: pdu = bytes([sid]) + bytes(length_payload) try: - resp = await self.ecu.send_raw( - pdu, config=UDSRequestConfig(tags=["ANALYZE"]) - ) + resp = await self.ecu.send_raw(pdu, config=UDSRequestConfig(tags=["ANALYZE"])) except asyncio.TimeoutError: logger.info(f"{g_repr(sid)}: timeout") continue except MalformedResponse as e: - logger.warning( - f"{g_repr(sid)}: {e!r} occurred, this needs to be investigated!" - ) + logger.warning(f"{g_repr(sid)}: {e!r} occurred, this needs to be investigated!") continue if isinstance(resp, NegativeResponse) and resp.response_code in [ @@ -182,9 +168,7 @@ async def perform_scan( ]: continue - logger.result( - f"{g_repr(sid)}: available in session {g_repr(session)}: {resp}" - ) + logger.result(f"{g_repr(sid)}: available in session {g_repr(session)}: {resp}") result[sid] = resp break diff --git a/src/gallia/commands/scan/uds/sessions.py b/src/gallia/commands/scan/uds/sessions.py index 3803c7e47..7e60af0cd 100644 --- a/src/gallia/commands/scan/uds/sessions.py +++ b/src/gallia/commands/scan/uds/sessions.py @@ -78,9 +78,7 @@ async def set_session_with_hooks_handling( isinstance(resp, NegativeResponse) and resp.response_code == UDSErrorCodes.conditionsNotCorrect ): - logger.notice( - f"Received conditionsNotCorrect for session {g_repr(session)}" - ) + logger.notice(f"Received conditionsNotCorrect for session {g_repr(session)}") if not use_hooks: logger.warning( f"Session {g_repr(session)} is potentially available but could not be entered. " @@ -94,9 +92,7 @@ async def set_session_with_hooks_handling( ) if not isinstance(resp_, NegativeResponse): - logger.notice( - f"Successfully changed to session {g_repr(session)} with hooks" - ) + logger.notice(f"Successfully changed to session {g_repr(session)} with hooks") resp = resp_ else: logger.notice( @@ -185,9 +181,7 @@ async def main(self, args: Namespace) -> None: logger.error(f"Could not change to default session: {e!r}") sys.exit(1) - logger.debug( - f"Sleeping for {args.sleep}s after changing to DefaultSession" - ) + logger.debug(f"Sleeping for {args.sleep}s after changing to DefaultSession") await asyncio.sleep(args.sleep) logger.debug("Recovering the current session stack") @@ -196,19 +190,14 @@ async def main(self, args: Namespace) -> None: try: logger.debug(f"Attempting to change to session {session:#04x}") - resp = await self.set_session_with_hooks_handling( - session, args.with_hooks - ) + resp = await self.set_session_with_hooks_handling(session, args.with_hooks) # do not ignore NCR subFunctionNotSupportedInActiveSession in this case if ( isinstance(resp, NegativeResponse) - and resp.response_code - == UDSErrorCodes.subFunctionNotSupported + and resp.response_code == UDSErrorCodes.subFunctionNotSupported ): - logger.info( - f"Could not change to session {g_repr(session)}: {resp}" - ) + logger.info(f"Could not change to session {g_repr(session)}: {resp}") continue logger.notice( @@ -234,9 +223,7 @@ async def main(self, args: Namespace) -> None: ) except asyncio.TimeoutError: - logger.warning( - f"Could not change to session {g_repr(session)}: Timeout" - ) + logger.warning(f"Could not change to session {g_repr(session)}: Timeout") continue except Exception as e: logger.warning(f"Mamma mia: {repr(e)}") @@ -253,17 +240,11 @@ async def main(self, args: Namespace) -> None: logger.result(f"* Session {g_repr(session)} ") if self.db_handler is not None: - await self.db_handler.insert_session_transition( - session, res["stack"] - ) + await self.db_handler.insert_session_transition(session, res["stack"]) - logger.result( - f"\tvia stack: {'->'.join([f'{g_repr(i)}' for i in res['stack']])}" - ) + logger.result(f"\tvia stack: {'->'.join([f'{g_repr(i)}' for i in res['stack']])}") - logger.result( - "The following sessions were identified but could not be activated:" - ) + logger.result("The following sessions were identified but could not be activated:") previous_session = 0 for res in sorted(negative_results, key=lambda x: x["session"]): @@ -278,9 +259,7 @@ async def main(self, args: Namespace) -> None: logger.result(f"* Session {g_repr(session)} ") if self.db_handler is not None: - await self.db_handler.insert_session_transition( - session, res["stack"] - ) + await self.db_handler.insert_session_transition(session, res["stack"]) logger.result( f"\tvia stack: {'->'.join([f'{g_repr(i)}' for i in res['stack']])} " diff --git a/src/gallia/db/handler.py b/src/gallia/db/handler.py index 2853f8496..4ea1d8906 100644 --- a/src/gallia/db/handler.py +++ b/src/gallia/db/handler.py @@ -228,9 +228,7 @@ async def insert_run_meta( # noqa: PLR0913 await self.connection.commit() - async def complete_run_meta( - self, end_time: datetime, exit_code: int, path: Path - ) -> None: + async def complete_run_meta(self, end_time: datetime, exit_code: int, path: Path) -> None: assert self.connection is not None, "Not connected to the database" assert self.meta is not None, "Run meta not yet created" @@ -245,9 +243,7 @@ async def insert_scan_run(self, target: str) -> None: assert self.connection is not None, "Not connected to the database" assert self.meta is not None, "Run meta not yet created" - await self.connection.execute( - "INSERT OR IGNORE INTO address(url) VALUES(?)", (target,) - ) + await self.connection.execute("INSERT OR IGNORE INTO address(url) VALUES(?)", (target,)) query = ( "INSERT INTO scan_run(address, meta) VALUES " @@ -259,16 +255,12 @@ async def insert_scan_run(self, target: str) -> None: self.target = target await self.connection.commit() - async def insert_scan_run_properties_pre( - self, properties_pre: dict[str, Any] - ) -> None: + async def insert_scan_run_properties_pre(self, properties_pre: dict[str, Any]) -> None: assert self.connection is not None, "Not connected to the database" assert self.scan_run is not None, "Scan run not yet created" query = "UPDATE scan_run SET properties_pre = ? WHERE id = ?" - await self.connection.execute( - query, (json.dumps(properties_pre), self.scan_run) - ) + await self.connection.execute(query, (json.dumps(properties_pre), self.scan_run)) await self.connection.commit() async def complete_scan_run(self, properties_post: dict[str, Any]) -> None: @@ -276,9 +268,7 @@ async def complete_scan_run(self, properties_post: dict[str, Any]) -> None: assert self.scan_run is not None, "Scan run not yet created" query = "UPDATE scan_run SET properties_post = ? WHERE id = ?" - await self.connection.execute( - query, (json.dumps(properties_post), self.scan_run) - ) + await self.connection.execute(query, (json.dumps(properties_post), self.scan_run)) await self.connection.commit() async def insert_discovery_run(self, protocol: str) -> None: @@ -294,9 +284,7 @@ async def insert_discovery_result(self, target: str) -> None: assert self.connection is not None, "Not connected to the database" assert self.discovery_run is not None, "Discovery run not yet created" - await self.connection.execute( - "INSERT OR IGNORE INTO address(url) VALUES(?)", (target,) - ) + await self.connection.execute("INSERT OR IGNORE INTO address(url) VALUES(?)", (target,)) query = "INSERT INTO discovery_result(address, run) VALUES ((SELECT id FROM address WHERE url = ?), ?)" @@ -376,12 +364,8 @@ async def insert_scan_result( # noqa: PLR0913 send_time.tzname(), json.dumps(request_attributes), bytes_repr(response.pdu) if response is not None else None, - receive_time.timestamp() - if response is not None and receive_time is not None - else None, - receive_time.tzname() - if response is not None and receive_time is not None - else None, + receive_time.timestamp() if response is not None and receive_time is not None else None, + receive_time.tzname() if response is not None and receive_time is not None else None, json.dumps(response_attributes) if response is not None else None, repr(exception) if exception is not None else None, log_mode.name, @@ -406,9 +390,7 @@ async def execute() -> None: self.tasks.append(asyncio.create_task(execute())) - async def insert_session_transition( - self, destination: int, steps: list[int] - ) -> None: + async def insert_session_transition(self, destination: int, steps: list[int]) -> None: assert self.connection is not None, "Not connected to the database" query = "INSERT INTO session_transition VALUES(?, ?, ?)" diff --git a/src/gallia/dumpcap.py b/src/gallia/dumpcap.py index cb2f6b912..b27c5438a 100644 --- a/src/gallia/dumpcap.py +++ b/src/gallia/dumpcap.py @@ -58,16 +58,8 @@ async def start( match target.scheme: case ISOTPTransport.SCHEME | RawCANTransport.SCHEME: outfile = artifacts_dir.joinpath(f"candump-{ts}.pcap.gz") - src_addr = ( - auto_int(target.qs["src_addr"][0]) - if "src_addr" in target.qs - else None - ) - dst_addr = ( - auto_int(target.qs["dst_addr"][0]) - if "dst_addr" in target.qs - else None - ) + src_addr = auto_int(target.qs["src_addr"][0]) if "src_addr" in target.qs else None + dst_addr = auto_int(target.qs["dst_addr"][0]) if "dst_addr" in target.qs else None cmd = cls._can_cmd( target.netloc, src_addr, @@ -143,9 +135,7 @@ def _swap_bytes_16(x: int) -> int: return cast(int, struct.unpack(">H", struct.pack(" list[str] | None: + def _can_cmd(iface: str, src_addr: int | None, dst_addr: int | None) -> list[str] | None: args = ["dumpcap", "-q", "-i", iface, "-w", "-"] # Debug this with `dumpcap -d` or `tshark -x` to inspect the captured buffer. filter_ = "" diff --git a/src/gallia/log.py b/src/gallia/log.py index 57d902500..52a01868a 100644 --- a/src/gallia/log.py +++ b/src/gallia/log.py @@ -408,9 +408,7 @@ def _format_record( # noqa: PLR0913 if volatile_info and levelno <= Loglevel.INFO: terminal_width, _ = shutil.get_terminal_size() - msg = msg[ - : terminal_width + extra_len - 1 - ] # Adapt length to invisible ANSI colors + msg = msg[: terminal_width + extra_len - 1] # Adapt length to invisible ANSI colors msg += _Color.RESET.value msg += "\r" else: @@ -756,9 +754,7 @@ def format( assert exc_traceback stacktrace = "\n" - stacktrace += "".join( - traceback.format_exception(exc_type, exc_value, exc_traceback) - ) + stacktrace += "".join(traceback.format_exception(exc_type, exc_value, exc_traceback)) return _format_record( dt=datetime.datetime.fromtimestamp(record.created), diff --git a/src/gallia/services/uds/core/client.py b/src/gallia/services/uds/core/client.py index a6fff0179..9bbe7d85f 100644 --- a/src/gallia/services/uds/core/client.py +++ b/src/gallia/services/uds/core/client.py @@ -52,9 +52,7 @@ async def reconnect(self, timeout: int | None = None) -> None: async with self.mutex: self.transport = await self.transport.reconnect(timeout) - async def _read( - self, timeout: float | None = None, tags: list[str] | None = None - ) -> bytes: + async def _read(self, timeout: float | None = None, tags: list[str] | None = None) -> bytes: if timeout is None and self.timeout: timeout = self.timeout return await self.transport.read(timeout, tags) @@ -77,14 +75,10 @@ async def request_unsafe( # Avoid pasting this very line in every error branch. if i > 0: - logger.info( - f"Requesting UDS PDU failed; retrying: {i} from {max_retry}…" - ) + logger.info(f"Requesting UDS PDU failed; retrying: {i} from {max_retry}…") try: logger.debug(request.pdu.hex(), extra={"tags": ["write", "uds"] + tags}) - raw_resp = await self.transport.request_unsafe( - request.pdu, timeout, config.tags - ) + raw_resp = await self.transport.request_unsafe(request.pdu, timeout, config.tags) if raw_resp == b"": raise BrokenPipeError("connection to target lost") except asyncio.TimeoutError as e: @@ -112,8 +106,7 @@ async def request_unsafe( max_n_timeout = max(timeout if timeout else 0, 20) / waiting_time while ( isinstance(resp, service.NegativeResponse) - and resp.response_code - == UDSErrorCodes.requestCorrectlyReceivedResponsePending + and resp.response_code == UDSErrorCodes.requestCorrectlyReceivedResponsePending ): logger.info( f"Received ResponsePending: {n_pending}/{MAX_N_PENDING}; " @@ -138,9 +131,7 @@ async def request_unsafe( n_timeout = 0 # Only raise errors for consecutive timeouts n_pending += 1 if n_pending >= MAX_N_PENDING: - raise RuntimeError( - "ECU appears to be stuck in ResponsePending loop" - ) + raise RuntimeError("ECU appears to be stuck in ResponsePending loop") else: # We reach this code here once all response pending # and similar busy stuff is resolved. @@ -192,9 +183,7 @@ async def diagnostic_session_control( :return: The response of the server. """ return await self.request( - service.DiagnosticSessionControlRequest( - diagnostic_session_type, suppress_response - ), + service.DiagnosticSessionControlRequest(diagnostic_session_type, suppress_response), config, ) @@ -213,9 +202,7 @@ async def ecu_reset( :param config: Passed on to request_pdu(). :return: The response of the server. """ - return await self.request( - service.ECUResetRequest(reset_type, suppress_response), config - ) + return await self.request(service.ECUResetRequest(reset_type, suppress_response), config) async def security_access_request_seed( self, @@ -261,9 +248,7 @@ async def security_access_send_key( :return: The response of the server. """ return await self.request( - service.SendKeyRequest( - security_access_type, security_key, suppress_response - ), + service.SendKeyRequest(security_access_type, security_key, suppress_response), config, ) @@ -302,9 +287,7 @@ async def tester_present( :param config: Passed on to request_pdu(). :return: The response of the server. """ - return await self.request( - service.TesterPresentRequest(suppress_response), config - ) + return await self.request(service.TesterPresentRequest(suppress_response), config) async def control_dtc_setting( self, @@ -349,9 +332,7 @@ async def read_data_by_identifier( :param config: Passed on to request_pdu(). :return: The response of the server. """ - return await self.request( - service.ReadDataByIdentifierRequest(data_identifiers), config - ) + return await self.request(service.ReadDataByIdentifierRequest(data_identifiers), config) async def read_memory_by_address( self, @@ -443,9 +424,7 @@ async def clear_diagnostic_information( :param config: Passed on to request_pdu(). :return: The response of the server. """ - return await self.request( - service.ClearDiagnosticInformationRequest(group_of_dtc), config - ) + return await self.request(service.ClearDiagnosticInformationRequest(group_of_dtc), config) async def read_dtc_information_report_number_of_dtc_by_status_mask( self, @@ -464,9 +443,7 @@ async def read_dtc_information_report_number_of_dtc_by_status_mask( :return: The response of the server. """ return await self.request( - service.ReportNumberOfDTCByStatusMaskRequest( - dtc_status_mask, suppress_response - ), + service.ReportNumberOfDTCByStatusMaskRequest(dtc_status_mask, suppress_response), config, ) @@ -509,9 +486,7 @@ async def read_dtc_information_report_mirror_memory_dtc_by_status_mask( :return: The response of the server. """ return await self.request( - service.ReportMirrorMemoryDTCByStatusMaskRequest( - dtc_status_mask, suppress_response - ), + service.ReportMirrorMemoryDTCByStatusMaskRequest(dtc_status_mask, suppress_response), config, ) @@ -520,10 +495,7 @@ async def read_dtc_information_report_number_of_mirror_memory_dtc_by_status_mask dtc_status_mask: int, suppress_response: bool = False, config: UDSRequestConfig | None = None, - ) -> ( - service.NegativeResponse - | service.ReportNumberOfMirrorMemoryDTCByStatusMaskResponse - ): + ) -> service.NegativeResponse | service.ReportNumberOfMirrorMemoryDTCByStatusMaskResponse: """Read the number of DTCs with the specified state from the UDS server's mirror memory. This is an implementation of the UDS request for the reportNumberOfMirrorMemoryDTCByStatusMask sub-function of @@ -548,8 +520,7 @@ async def read_dtc_information_report_number_of_emissions_related_obd_dtc_by_sta suppress_response: bool = False, config: UDSRequestConfig | None = None, ) -> ( - service.NegativeResponse - | service.ReportNumberOfEmissionsRelatedOBDDTCByStatusMaskResponse + service.NegativeResponse | service.ReportNumberOfEmissionsRelatedOBDDTCByStatusMaskResponse ): """Read the number of emission related DTCs with the specified state from the UDS server. This is an implementation of the UDS request for the @@ -574,10 +545,7 @@ async def read_dtc_information_report_emissions_related_obd_dtc_by_status_mask( dtc_status_mask: int, suppress_response: bool = False, config: UDSRequestConfig | None = None, - ) -> ( - service.NegativeResponse - | service.ReportEmissionsRelatedOBDDTCByStatusMaskResponse - ): + ) -> service.NegativeResponse | service.ReportEmissionsRelatedOBDDTCByStatusMaskResponse: """Read the number of emission related DTCs with the specified state from the UDS server. This is an implementation of the UDS request for the reportNumberOfEmissionsRelatedOBDDTCByStatusMask @@ -622,9 +590,7 @@ async def input_output_control_by_identifier( :param config: Passed on to request_pdu(). :return: The response of the server. """ - pdu = struct.pack( - "!BH", UDSIsoServices.InputOutputControlByIdentifier, data_identifier - ) + pdu = struct.pack("!BH", UDSIsoServices.InputOutputControlByIdentifier, data_identifier) pdu += control_option_record + control_enable_mask_record return await self.request( service.InputOutputControlByIdentifierRequest( @@ -653,9 +619,7 @@ async def input_output_control_by_identifier_return_control_to_ecu( :return: The response of the server. """ return await self.request( - service.ReturnControlToECURequest( - data_identifier, control_enable_mask_record - ), + service.ReturnControlToECURequest(data_identifier, control_enable_mask_record), config, ) @@ -702,9 +666,7 @@ async def input_output_control_by_identifier_freeze_current_state( :return: The response of the server. """ return await self.request( - service.FreezeCurrentStateRequest( - data_identifier, control_enable_mask_record - ), + service.FreezeCurrentStateRequest(data_identifier, control_enable_mask_record), config, ) @@ -730,9 +692,7 @@ async def input_output_control_by_identifier_short_term_adjustment( :return: The response of the server. """ return await self.request( - service.ShortTermAdjustmentRequest( - data_identifier, control_enable_mask_record - ), + service.ShortTermAdjustmentRequest(data_identifier, control_enable_mask_record), config, ) @@ -898,9 +858,7 @@ async def transfer_data( :return: The response of the server. """ return await self.request( - service.TransferDataRequest( - block_sequence_counter, transfer_request_parameter_record - ), + service.TransferDataRequest(block_sequence_counter, transfer_request_parameter_record), config, ) @@ -1050,10 +1008,7 @@ async def request( self, request: service.ReportNumberOfMirrorMemoryDTCByStatusMaskRequest, config: UDSRequestConfig | None = None, - ) -> ( - service.NegativeResponse - | service.ReportNumberOfMirrorMemoryDTCByStatusMaskResponse - ): + ) -> service.NegativeResponse | service.ReportNumberOfMirrorMemoryDTCByStatusMaskResponse: ... @overload @@ -1062,8 +1017,7 @@ async def request( request: service.ReportNumberOfEmissionsRelatedOBDDTCByStatusMaskRequest, config: UDSRequestConfig | None = None, ) -> ( - service.NegativeResponse - | service.ReportNumberOfEmissionsRelatedOBDDTCByStatusMaskResponse + service.NegativeResponse | service.ReportNumberOfEmissionsRelatedOBDDTCByStatusMaskResponse ): ... @@ -1072,10 +1026,7 @@ async def request( self, request: service.ReportEmissionsRelatedOBDDTCByStatusMaskRequest, config: UDSRequestConfig | None = None, - ) -> ( - service.NegativeResponse - | service.ReportEmissionsRelatedOBDDTCByStatusMaskResponse - ): + ) -> service.NegativeResponse | service.ReportEmissionsRelatedOBDDTCByStatusMaskResponse: ... @overload diff --git a/src/gallia/services/uds/core/exception.py b/src/gallia/services/uds/core/exception.py index f09715b60..60ff6a5d0 100644 --- a/src/gallia/services/uds/core/exception.py +++ b/src/gallia/services/uds/core/exception.py @@ -44,9 +44,7 @@ def _message_core(self) -> str: class ResponseException(UDSException): - def __init__( - self, request: UDSRequest, response: UDSResponse, message: str | None = None - ): + def __init__(self, request: UDSRequest, response: UDSResponse, message: str | None = None): self.response = response super().__init__(request, message) @@ -74,9 +72,7 @@ class UnexpectedResponse(ResponseException): class UnexpectedNegativeResponse(UnexpectedResponse, ABC): RESPONSE_CODE: UDSErrorCodes - _CONCRETE_EXCEPTIONS: dict[ - UDSErrorCodes | None, type[UnexpectedNegativeResponse] - ] = {} + _CONCRETE_EXCEPTIONS: dict[UDSErrorCodes | None, type[UnexpectedNegativeResponse]] = {} def __init_subclass__(cls, /, response_code: UDSErrorCodes, **kwargs: Any) -> None: super().__init_subclass__(**kwargs) @@ -113,9 +109,7 @@ def parse_dynamic( # f'\n UnexpectedNegativeResponse, response_code={str(ec)}):\n pass\n\n') -class GeneralReject( - UnexpectedNegativeResponse, response_code=UDSErrorCodes.generalReject -): +class GeneralReject(UnexpectedNegativeResponse, response_code=UDSErrorCodes.generalReject): pass @@ -138,15 +132,11 @@ class IncorrectMessageLengthOrInvalidFormat( pass -class ResponseTooLong( - UnexpectedNegativeResponse, response_code=UDSErrorCodes.responseTooLong -): +class ResponseTooLong(UnexpectedNegativeResponse, response_code=UDSErrorCodes.responseTooLong): pass -class BusyRepeatRequest( - UnexpectedNegativeResponse, response_code=UDSErrorCodes.busyRepeatRequest -): +class BusyRepeatRequest(UnexpectedNegativeResponse, response_code=UDSErrorCodes.busyRepeatRequest): pass @@ -169,9 +159,7 @@ class NoResponseFromSubnetComponent( pass -class RequestOutOfRange( - UnexpectedNegativeResponse, response_code=UDSErrorCodes.requestOutOfRange -): +class RequestOutOfRange(UnexpectedNegativeResponse, response_code=UDSErrorCodes.requestOutOfRange): pass @@ -369,9 +357,7 @@ class RpmTooLow(UnexpectedNegativeResponse, response_code=UDSErrorCodes.rpmTooLo pass -class EngineIsRunning( - UnexpectedNegativeResponse, response_code=UDSErrorCodes.engineIsRunning -): +class EngineIsRunning(UnexpectedNegativeResponse, response_code=UDSErrorCodes.engineIsRunning): pass @@ -393,9 +379,7 @@ class TemperatureTooHigh( pass -class TemperatureTooLow( - UnexpectedNegativeResponse, response_code=UDSErrorCodes.temperatureTooLow -): +class TemperatureTooLow(UnexpectedNegativeResponse, response_code=UDSErrorCodes.temperatureTooLow): pass @@ -454,15 +438,11 @@ class TorqueConverterClutchLocked( pass -class VoltageTooHigh( - UnexpectedNegativeResponse, response_code=UDSErrorCodes.voltageTooHigh -): +class VoltageTooHigh(UnexpectedNegativeResponse, response_code=UDSErrorCodes.voltageTooHigh): pass -class VoltageTooLow( - UnexpectedNegativeResponse, response_code=UDSErrorCodes.voltageTooLow -): +class VoltageTooLow(UnexpectedNegativeResponse, response_code=UDSErrorCodes.voltageTooLow): pass diff --git a/src/gallia/services/uds/core/service.py b/src/gallia/services/uds/core/service.py index 3be655c85..70dd6c02e 100644 --- a/src/gallia/services/uds/core/service.py +++ b/src/gallia/services/uds/core/service.py @@ -93,9 +93,7 @@ def _check_pdu(cls, pdu: bytes) -> None: check_length(pdu, cls._MINIMAL_LENGTH, cls._MAXIMAL_LENGTH) if cls.SERVICE_ID is not None and pdu[0] != cls.SERVICE_ID: - raise ValueError( - f"Service ID mismatch: {hex(pdu[0])} != {hex(cls.SERVICE_ID)}" - ) + raise ValueError(f"Service ID mismatch: {hex(pdu[0])} != {hex(cls.SERVICE_ID)}") @property def service_id(self) -> int: @@ -113,9 +111,7 @@ def __repr__(self) -> str: if not attr.startswith("_"): relevant_attributes[attr] = any_repr(value) - attributes = ", ".join( - f"{attr}={value}" for attr, value in relevant_attributes.items() - ) + attributes = ", ".join(f"{attr}={value}" for attr, value in relevant_attributes.items()) return f"{title}({attributes})" @staticmethod @@ -225,9 +221,7 @@ def parse_dynamic(pdu: bytes) -> UDSResponse: try: response_service = UDSService._SERVICES[UDSIsoServices(pdu[0] - 0x40)] except Exception: - logger.trace( - " - Falling back to raw response because the service is unknown" - ) + logger.trace(" - Falling back to raw response because the service is unknown") return RawPositiveResponse(pdu) logger.trace(f" - Inferred service {response_service.__name__}") @@ -236,9 +230,7 @@ def parse_dynamic(pdu: bytes) -> UDSResponse: response_type = response_service.Response elif issubclass(response_service, SpecializedSubFunctionService): if len(pdu) < 2: - raise ValueError( - "Message of subfunction service contains no subfunction" - ) + raise ValueError("Message of subfunction service contains no subfunction") logger.trace(" - Trying to infer subfunction") try: @@ -251,9 +243,7 @@ def parse_dynamic(pdu: bytes) -> UDSResponse: assert (response_type_ := response_sub_function.Response) is not None response_type = response_type_ else: - logger.trace( - " - Falling back to raw response because the response cannot be parsed" - ) + logger.trace(" - Falling back to raw response because the response cannot be parsed") return RawPositiveResponse(pdu) logger.trace(f" - Trying {response_type.__name__}") @@ -263,9 +253,7 @@ def parse_dynamic(pdu: bytes) -> UDSResponse: T_RawResponse = TypeVar("T_RawResponse", bound="RawResponse") -class RawResponse( - UDSResponse, ABC, service_id=None, minimal_length=1, maximal_length=None -): +class RawResponse(UDSResponse, ABC, service_id=None, minimal_length=1, maximal_length=None): def __init__(self, pdu: bytes) -> None: super().__init__() @@ -322,9 +310,7 @@ def __init__(self, request_service_id: int, response_code: UDSErrorCodes) -> Non @property def pdu(self) -> bytes: - return pack( - "!BBB", self.SERVICE_ID, self.request_service_id, self.response_code - ) + return pack("!BBB", self.SERVICE_ID, self.request_service_id, self.response_code) @classmethod def _from_pdu(cls, pdu: bytes) -> NegativeResponse: @@ -356,9 +342,7 @@ def __repr__(self) -> str: T_PositiveResponse = TypeVar("T_PositiveResponse", bound="PositiveResponse") -class PositiveResponse( - UDSResponse, ABC, service_id=None, minimal_length=0, maximal_length=None -): +class PositiveResponse(UDSResponse, ABC, service_id=None, minimal_length=0, maximal_length=None): @property def data(self) -> bytes: return self.pdu[1:] @@ -371,9 +355,7 @@ def __repr__(self) -> str: if not attr.startswith("_") and attr not in ["trigger_request"]: relevant_attributes[attr] = any_repr(value) - attributes = ", ".join( - f"{attr}={value}" for attr, value in relevant_attributes.items() - ) + attributes = ", ".join(f"{attr}={value}" for attr, value in relevant_attributes.items()) return f"{title}({attributes})" @classmethod @@ -402,9 +384,7 @@ def _response_type(cls, pdu: bytes) -> type[PositiveResponse] | None: def _request_type(cls, pdu: bytes) -> type[UDSRequest] | None: return cls.Request - def __init_subclass__( - cls, /, service_id: UDSIsoServices | None, **kwargs: Any - ) -> None: + def __init_subclass__(cls, /, service_id: UDSIsoServices | None, **kwargs: Any) -> None: super().__init_subclass__(**kwargs) cls.SERVICE_ID = service_id @@ -428,9 +408,7 @@ def _sub_function_type(cls, pdu: bytes) -> type[SubFunction]: sub_function_id = pdu[1] % 0x80 sub_functions = [ - x - for x in cls.__dict__.values() - if inspect.isclass(x) and issubclass(x, SubFunction) + x for x in cls.__dict__.values() if inspect.isclass(x) and issubclass(x, SubFunction) ] for sub_function in sub_functions: @@ -514,8 +492,7 @@ def _check_pdu(cls, pdu: bytes) -> None: if pdu[1] != cls.SUB_FUNCTION_ID: raise ValueError( - f"Sub-function ID mismatch: {hex(pdu[1])} != " - f"{hex(cls.SUB_FUNCTION_ID)}" + f"Sub-function ID mismatch: {hex(pdu[1])} != " f"{hex(cls.SUB_FUNCTION_ID)}" ) @property @@ -547,8 +524,7 @@ def _check_pdu(cls, pdu: bytes) -> None: if pdu[1] % 0x80 != cls.SUB_FUNCTION_ID: raise ValueError( - f"Sub-function ID mismatch: {hex(pdu[1])} != " - f"{hex(cls.SUB_FUNCTION_ID)}" + f"Sub-function ID mismatch: {hex(pdu[1])} != " f"{hex(cls.SUB_FUNCTION_ID)}" ) @property @@ -648,13 +624,9 @@ def pdu(self) -> bytes: def _from_pdu(cls, pdu: bytes) -> DiagnosticSessionControlResponse: diagnostic_session_type = from_bytes(pdu[1:2]) session_parameter_record = pdu[2:] - return DiagnosticSessionControlResponse( - diagnostic_session_type, session_parameter_record - ) + return DiagnosticSessionControlResponse(diagnostic_session_type, session_parameter_record) - def __init__( - self, diagnostic_session_type: int, session_parameter_record: bytes = b"" - ) -> None: + def __init__(self, diagnostic_session_type: int, session_parameter_record: bytes = b"") -> None: self.diagnostic_session_type = diagnostic_session_type self.session_parameter_record = session_parameter_record @@ -678,9 +650,7 @@ class DiagnosticSessionControlRequest( minimal_length=2, maximal_length=2, ): - def __init__( - self, diagnostic_session_type: int, suppress_response: bool = False - ) -> None: + def __init__(self, diagnostic_session_type: int, suppress_response: bool = False) -> None: """Sets the diagnostic session which is specified by a specific diagnosticSessionType sub-function. This is an implementation of the UDS request for service DiagnosticSessionControl (0x10). @@ -695,9 +665,7 @@ def __init__( @property def pdu(self) -> bytes: - return pack( - "!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit - ) + return pack("!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit) @classmethod def _from_pdu(cls, pdu: bytes) -> DiagnosticSessionControlRequest: @@ -708,9 +676,7 @@ def sub_function(self) -> int: return self.diagnostic_session_type -class DiagnosticSessionControl( - UDSService, service_id=UDSIsoServices.DiagnosticSessionControl -): +class DiagnosticSessionControl(UDSService, service_id=UDSIsoServices.DiagnosticSessionControl): Response = DiagnosticSessionControlResponse Request = DiagnosticSessionControlRequest @@ -740,9 +706,7 @@ def pdu(self) -> bytes: if self.power_down_time is None: return pack("!BB", self.RESPONSE_SERVICE_ID, self.reset_type) - return pack( - "!BBB", self.RESPONSE_SERVICE_ID, self.reset_type, self.power_down_time - ) + return pack("!BBB", self.RESPONSE_SERVICE_ID, self.reset_type, self.power_down_time) @classmethod def _from_pdu(cls, pdu: bytes) -> ECUResetResponse: @@ -752,10 +716,7 @@ def _from_pdu(cls, pdu: bytes) -> ECUResetResponse: return ECUResetResponse(reset_type, power_down_time) def matches(self, request: UDSRequest) -> bool: - return ( - isinstance(request, ECUResetRequest) - and request.reset_type == self.reset_type - ) + return isinstance(request, ECUResetRequest) and request.reset_type == self.reset_type @property def sub_function(self) -> int: @@ -783,9 +744,7 @@ def __init__(self, reset_type: int, suppress_response: bool = False) -> None: @property def pdu(self) -> bytes: - return pack( - "!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit - ) + return pack("!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit) @classmethod def _from_pdu(cls, pdu: bytes) -> ECUResetRequest: @@ -820,10 +779,7 @@ def __init__(self, security_access_type: int, security_seed: bytes = b"") -> Non @property def pdu(self) -> bytes: - return ( - pack("!BB", self.RESPONSE_SERVICE_ID, self.security_access_type) - + self.security_seed - ) + return pack("!BB", self.RESPONSE_SERVICE_ID, self.security_access_type) + self.security_seed @classmethod def _from_pdu(cls, pdu: bytes) -> SecurityAccessResponse: @@ -850,9 +806,7 @@ class _SecurityAccessRequest( minimal_length=2, maximal_length=None, ): - def __init__( - self, security_access_type: int, suppress_response: bool = False - ) -> None: + def __init__(self, security_access_type: int, suppress_response: bool = False) -> None: self.security_access_type = security_access_type super().__init__(suppress_response) @@ -956,9 +910,7 @@ def _from_pdu(cls, pdu: bytes) -> SendKeyRequest: return SendKeyRequest(security_access_type, security_key, suppress_response) -class SecurityAccess( - SpecializedSubFunctionService, service_id=UDSIsoServices.SecurityAccess -): +class SecurityAccess(SpecializedSubFunctionService, service_id=UDSIsoServices.SecurityAccess): class RequestSeed(SubFunction, sub_function_id=None): Response = SecurityAccessResponse Request = RequestSeedRequest @@ -970,11 +922,7 @@ class SendKey(SubFunction, sub_function_id=None): @classmethod def _sub_function_type(cls, pdu: bytes) -> type[SubFunction]: sub_function_id = pdu[1] - return ( - SecurityAccess.RequestSeed - if sub_function_id % 2 == 1 - else SecurityAccess.SendKey - ) + return SecurityAccess.RequestSeed if sub_function_id % 2 == 1 else SecurityAccess.SendKey # ************************* @@ -1054,9 +1002,7 @@ def _from_pdu(cls, pdu: bytes) -> CommunicationControlRequest: control_type, suppress_response = sub_function_split(pdu[1]) communication_type = pdu[2] - return CommunicationControlRequest( - control_type, communication_type, suppress_response - ) + return CommunicationControlRequest(control_type, communication_type, suppress_response) @property def sub_function(self) -> int: @@ -1112,9 +1058,7 @@ def __init__(self, suppress_response: bool = False) -> None: @property def pdu(self) -> bytes: - return pack( - "!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit - ) + return pack("!BB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit) @classmethod def _from_pdu(cls, pdu: bytes) -> TesterPresentRequest: @@ -1298,9 +1242,7 @@ def data_identifier(self, data_identifier: int) -> None: def pdu(self) -> bytes: pdu = pack("!B", self.RESPONSE_SERVICE_ID) - for data_identifier, data_record in zip( - self.data_identifiers, self.data_records - ): + for data_identifier, data_record in zip(self.data_identifiers, self.data_records): pdu = pdu + to_bytes(data_identifier, 2) + data_record return pdu @@ -1330,9 +1272,7 @@ def matches(self, request: UDSRequest) -> bool: return all( req_id == resp_id - for req_id, resp_id in zip( - request.data_identifiers, self.data_identifiers - ) + for req_id, resp_id in zip(request.data_identifiers, self.data_identifiers) ) return request.data_identifiers[0] == self.data_identifiers[0] @@ -1484,14 +1424,10 @@ def pdu(self) -> bytes: @classmethod def _from_pdu(cls, pdu: bytes) -> ReadMemoryByAddressRequest: address_and_length_format_identifier = pdu[1] - address_length, size_length = address_and_size_length( - address_and_length_format_identifier - ) + address_length, size_length = address_and_size_length(address_and_length_format_identifier) if len(pdu) != 2 + address_length + size_length: - raise ValueError( - "The addressAndLengthIdentifier is incompatible with the PDU size" - ) + raise ValueError("The addressAndLengthIdentifier is incompatible with the PDU size") return ReadMemoryByAddressRequest( from_bytes(pdu[2 : 2 + address_length]), @@ -1587,9 +1523,7 @@ def _from_pdu(cls, pdu: bytes) -> WriteDataByIdentifierRequest: return WriteDataByIdentifierRequest(data_identifier, data_record) -class WriteDataByIdentifier( - UDSService, service_id=UDSIsoServices.WriteDataByIdentifier -): +class WriteDataByIdentifier(UDSService, service_id=UDSIsoServices.WriteDataByIdentifier): Response = WriteDataByIdentifierResponse Request = WriteDataByIdentifierRequest @@ -1613,18 +1547,14 @@ def pdu(self) -> bytes: self.address_and_length_format_identifier, ) - pdu = pack( - "!BB", self.RESPONSE_SERVICE_ID, self.address_and_length_format_identifier - ) + pdu = pack("!BB", self.RESPONSE_SERVICE_ID, self.address_and_length_format_identifier) pdu += address_bytes + size_bytes return pdu @classmethod def _from_pdu(cls, pdu: bytes) -> WriteMemoryByAddressResponse: address_and_length_format_identifier = pdu[1] - addr_len, size_len = address_and_size_length( - address_and_length_format_identifier - ) + addr_len, size_len = address_and_size_length(address_and_length_format_identifier) if len(pdu) < 2 + addr_len + size_len: raise ValueError( @@ -1726,14 +1656,10 @@ def pdu(self) -> bytes: @classmethod def _from_pdu(cls, pdu: bytes) -> WriteMemoryByAddressRequest: address_and_length_format_identifier = pdu[1] - address_length, size_length = address_and_size_length( - address_and_length_format_identifier - ) + address_length, size_length = address_and_size_length(address_and_length_format_identifier) if len(pdu) < 2 + address_length + size_length: - raise ValueError( - "The addressAndLengthIdentifier is incompatible with the PDU size" - ) + raise ValueError("The addressAndLengthIdentifier is incompatible with the PDU size") return WriteMemoryByAddressRequest( from_bytes(pdu[2 : 2 + address_length]), @@ -1800,9 +1726,7 @@ def _from_pdu(cls, pdu: bytes) -> ClearDiagnosticInformationRequest: return ClearDiagnosticInformationRequest(group_of_dtc) -class ClearDiagnosticInformation( - UDSService, service_id=UDSIsoServices.ClearDiagnosticInformation -): +class ClearDiagnosticInformation(UDSService, service_id=UDSIsoServices.ClearDiagnosticInformation): Response = ClearDiagnosticInformationResponse Request = ClearDiagnosticInformationRequest @@ -1821,10 +1745,7 @@ class _ReadDTCResponse( maximal_length=None, ): def matches(self, request: UDSRequest) -> bool: - return ( - isinstance(request, _ReadDTCRequest) - and self.sub_function == request.sub_function - ) + return isinstance(request, _ReadDTCRequest) and self.sub_function == request.sub_function class _ReadDTCRequest( @@ -1839,9 +1760,7 @@ class _ReadDTCRequest( pass -T_ReadDTCType0Response = TypeVar( - "T_ReadDTCType0Response", bound="_ReadDTCType0Response" -) +T_ReadDTCType0Response = TypeVar("T_ReadDTCType0Response", bound="_ReadDTCType0Response") class _ReadDTCType0Response( @@ -1879,18 +1798,14 @@ def pdu(self) -> bytes: ) @classmethod - def _from_pdu( - cls: type[T_ReadDTCType0Response], pdu: bytes - ) -> T_ReadDTCType0Response: + def _from_pdu(cls: type[T_ReadDTCType0Response], pdu: bytes) -> T_ReadDTCType0Response: dtc_status_availability_mask = pdu[2] dtc_format_identifier = DTCFormatIdentifier(pdu[3]) dtc_count = from_bytes(pdu[4:]) return cls(dtc_status_availability_mask, dtc_format_identifier, dtc_count) -T_ReadDTCType1Response = TypeVar( - "T_ReadDTCType1Response", bound="_ReadDTCType1Response" -) +T_ReadDTCType1Response = TypeVar("T_ReadDTCType1Response", bound="_ReadDTCType1Response") class _ReadDTCType1Response( @@ -1915,9 +1830,7 @@ def __init__( raise ValueError("Not a valid dtc_and_status_record") self.dtc_and_status_record = { - from_bytes(dtc_and_status_record[i : i + 3]): dtc_and_status_record[ - i + 3 - ] + from_bytes(dtc_and_status_record[i : i + 3]): dtc_and_status_record[i + 3] for i in range(0, len(dtc_and_status_record), 4) } else: @@ -1950,9 +1863,7 @@ def pdu(self) -> bytes: ) @classmethod - def _from_pdu( - cls: type[T_ReadDTCType1Response], pdu: bytes - ) -> T_ReadDTCType1Response: + def _from_pdu(cls: type[T_ReadDTCType1Response], pdu: bytes) -> T_ReadDTCType1Response: dtc_status_availability_mask = pdu[2] dtc_and_status_record = pdu[3:] return cls(dtc_status_availability_mask, dtc_and_status_record) @@ -1987,9 +1898,7 @@ def pdu(self) -> bytes: ) @classmethod - def _from_pdu( - cls: type[T_ReadDTCType0Request], pdu: bytes - ) -> T_ReadDTCType0Request: + def _from_pdu(cls: type[T_ReadDTCType0Request], pdu: bytes) -> T_ReadDTCType0Request: dtc_status_mask = pdu[2] return cls(dtc_status_mask, cls.suppress_response_set(pdu)) @@ -2011,14 +1920,10 @@ def __init__(self, suppress_response: bool = False) -> None: @property def pdu(self) -> bytes: - return pack( - "!BBB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit - ) + return pack("!BBB", self.SERVICE_ID, self.sub_function_with_suppress_response_bit) @classmethod - def _from_pdu( - cls: type[T_ReadDTCType6Request], pdu: bytes - ) -> T_ReadDTCType6Request: + def _from_pdu(cls: type[T_ReadDTCType6Request], pdu: bytes) -> T_ReadDTCType6Request: return cls(cls.suppress_response_set(pdu)) @@ -2488,17 +2393,14 @@ def __init__(self, data_identifier: int, control_status_record: bytes) -> None: @property def pdu(self) -> bytes: return ( - pack("!BH", self.RESPONSE_SERVICE_ID, self.data_identifier) - + self.control_status_record + pack("!BH", self.RESPONSE_SERVICE_ID, self.data_identifier) + self.control_status_record ) @classmethod def _from_pdu(cls, pdu: bytes) -> InputOutputControlByIdentifierResponse: data_identifier = from_bytes(pdu[1:3]) control_status_record = pdu[3:] - return InputOutputControlByIdentifierResponse( - data_identifier, control_status_record - ) + return InputOutputControlByIdentifierResponse(data_identifier, control_status_record) def matches(self, request: UDSRequest) -> bool: return ( @@ -2583,9 +2485,7 @@ def __init__(self, data_identifier: int, control_states: bytes = b"") -> None: ) def matches(self, request: UDSRequest) -> bool: - return super().matches(request) and isinstance( - request, ReturnControlToECURequest - ) + return super().matches(request) and isinstance(request, ReturnControlToECURequest) class ReturnControlToECURequest( @@ -2595,9 +2495,7 @@ class ReturnControlToECURequest( minimal_length=4, maximal_length=None, ): - def __init__( - self, data_identifier: int, control_enable_mask_record: bytes = b"" - ) -> None: + def __init__(self, data_identifier: int, control_enable_mask_record: bytes = b"") -> None: """Gives the control over input / output parameters back to the ECU. This is a convenience wrapper of the generic request for the case where an inputOutputControlParameter is used and is set to returnControlToECU. In that case no @@ -2645,9 +2543,7 @@ class ResetToDefaultRequest( minimal_length=4, maximal_length=None, ): - def __init__( - self, data_identifier: int, control_enable_mask_record: bytes = b"" - ) -> None: + def __init__(self, data_identifier: int, control_enable_mask_record: bytes = b"") -> None: """Sets the input / output parameters to the default value(s). This is a convenience wrapper of the generic request for the case where an inputOutputControlParameter is used and is set to resetToDefault. @@ -2685,9 +2581,7 @@ def __init__(self, data_identifier: int, control_states: bytes = b"") -> None: ) def matches(self, request: UDSRequest) -> bool: - return super().matches(request) and isinstance( - request, FreezeCurrentStateResponse - ) + return super().matches(request) and isinstance(request, FreezeCurrentStateResponse) class FreezeCurrentStateRequest( @@ -2697,9 +2591,7 @@ class FreezeCurrentStateRequest( minimal_length=4, maximal_length=None, ): - def __init__( - self, data_identifier: int, control_enable_mask_record: bytes = b"" - ) -> None: + def __init__(self, data_identifier: int, control_enable_mask_record: bytes = b"") -> None: """Freezes the input / output parameters at their current state. This is a convenience wrapper of the generic request for the case where an inputOutputControlParameter is used and is set to freezeCurrentState. @@ -2764,9 +2656,7 @@ def __init__( control_option_record = ( bytes([InputOutputControlParameter.shortTermAdjustment]) + control_states ) - super().__init__( - data_identifier, control_option_record, control_enable_mask_record - ) + super().__init__(data_identifier, control_option_record, control_enable_mask_record) class InputOutputControlByIdentifier( @@ -2797,9 +2687,7 @@ class ShortTermAdjustment: # ******************* -T_RoutineControlResponse = TypeVar( - "T_RoutineControlResponse", bound="RoutineControlResponse" -) +T_RoutineControlResponse = TypeVar("T_RoutineControlResponse", bound="RoutineControlResponse") class RoutineControlResponse( @@ -2823,17 +2711,13 @@ def pdu(self) -> bytes: ) @classmethod - def _from_pdu( - cls: type[T_RoutineControlResponse], pdu: bytes - ) -> T_RoutineControlResponse: + def _from_pdu(cls: type[T_RoutineControlResponse], pdu: bytes) -> T_RoutineControlResponse: routine_identifier = from_bytes(pdu[2:4]) routine_status_record = pdu[4:] return cls(routine_identifier, routine_status_record) - def __init__( - self, routine_identifier: int, routine_status_record: bytes = b"" - ) -> None: + def __init__(self, routine_identifier: int, routine_status_record: bytes = b"") -> None: super().__init__() self.routine_control_type = self.sub_function @@ -2848,9 +2732,7 @@ def matches(self, request: UDSRequest) -> bool: ) -T_RoutineControlRequest = TypeVar( - "T_RoutineControlRequest", bound="RoutineControlRequest" -) +T_RoutineControlRequest = TypeVar("T_RoutineControlRequest", bound="RoutineControlRequest") class RoutineControlRequest( @@ -2888,9 +2770,7 @@ def pdu(self) -> bytes: ) @classmethod - def _from_pdu( - cls: type[T_RoutineControlRequest], pdu: bytes - ) -> T_RoutineControlRequest: + def _from_pdu(cls: type[T_RoutineControlRequest], pdu: bytes) -> T_RoutineControlRequest: routine_identifier = from_bytes(pdu[2:4]) routine_control_option_record = pdu[4:] @@ -2934,9 +2814,7 @@ def __init__( :param suppress_response: If set to True, the server is advised to not send back a positive response. """ - super().__init__( - routine_identifier, routine_control_option_record, suppress_response - ) + super().__init__(routine_identifier, routine_control_option_record, suppress_response) class StopRoutineResponse( @@ -2972,9 +2850,7 @@ def __init__( :param suppress_response: If set to True, the server is advised to not send back a positive response. """ - super().__init__( - routine_identifier, routine_control_option_record, suppress_response - ) + super().__init__(routine_identifier, routine_control_option_record, suppress_response) class RequestRoutineResultsResponse( @@ -3010,17 +2886,11 @@ def __init__( :param suppress_response: If set to True, the server is advised to not send back a positive response. """ - super().__init__( - routine_identifier, routine_control_option_record, suppress_response - ) + super().__init__(routine_identifier, routine_control_option_record, suppress_response) -class RoutineControl( - SpecializedSubFunctionService, service_id=UDSIsoServices.RoutineControl -): - class StartRoutine( - SubFunction, sub_function_id=RoutineControlSubFuncs.startRoutine - ): +class RoutineControl(SpecializedSubFunctionService, service_id=UDSIsoServices.RoutineControl): + class StartRoutine(SubFunction, sub_function_id=RoutineControlSubFuncs.startRoutine): Request = StartRoutineRequest Response = StartRoutineResponse @@ -3056,27 +2926,17 @@ def __init__( super().__init__() if length_format_identifier is not None: - if ( - not 0 <= length_format_identifier <= 0xF0 - or length_format_identifier % 2**4 > 0 - ): + if not 0 <= length_format_identifier <= 0xF0 or length_format_identifier % 2**4 > 0: raise ValueError( - f"Invalid value for lengthFormatIdentifier: " - f"{length_format_identifier}" + f"Invalid value for lengthFormatIdentifier: " f"{length_format_identifier}" ) - uds_memory_parameters( - 0, max_number_of_block_length, length_format_identifier + 1 - ) + uds_memory_parameters(0, max_number_of_block_length, length_format_identifier + 1) else: - length_format_identifier, _, _ = uds_memory_parameters( - 0, max_number_of_block_length - ) + length_format_identifier, _, _ = uds_memory_parameters(0, max_number_of_block_length) self.max_number_of_block_length = max_number_of_block_length - self.length_format_identifier = length_format_identifier - ( - length_format_identifier % 2**4 - ) + self.length_format_identifier = length_format_identifier - (length_format_identifier % 2**4) @property def pdu(self) -> bytes: @@ -3152,9 +3012,7 @@ def pdu(self) -> bytes: self.address_and_length_format_identifier, ) - pdu = struct.pack( - "!BBB", self.SERVICE_ID, data_format_identifier, addr_and_len_format_id - ) + pdu = struct.pack("!BBB", self.SERVICE_ID, data_format_identifier, addr_and_len_format_id) pdu += address_bytes + size_bytes return pdu @@ -3164,14 +3022,10 @@ def _from_pdu( ) -> T_RequestUpOrDownloadRequest: data_format_identifier = pdu[1] address_and_length_format_identifier = pdu[2] - address_length, size_length = address_and_size_length( - address_and_length_format_identifier - ) + address_length, size_length = address_and_size_length(address_and_length_format_identifier) if len(pdu) != 3 + address_length + size_length: - raise ValueError( - "The addressAndLengthIdentifier is incompatible with the PDU size" - ) + raise ValueError("The addressAndLengthIdentifier is incompatible with the PDU size") return cls( from_bytes(pdu[3 : 3 + address_length]), @@ -3315,9 +3169,7 @@ def __init__( def _from_pdu(cls, pdu: bytes) -> TransferDataResponse: block_sequence_counter = pdu[1] transfer_response_parameter_record = pdu[2:] - return TransferDataResponse( - block_sequence_counter, transfer_response_parameter_record - ) + return TransferDataResponse(block_sequence_counter, transfer_response_parameter_record) @property def pdu(self) -> bytes: @@ -3363,9 +3215,7 @@ def __init__( def _from_pdu(cls, pdu: bytes) -> TransferDataRequest: block_sequence_counter = pdu[1] transfer_request_parameter_record = pdu[2:] - return TransferDataRequest( - block_sequence_counter, transfer_request_parameter_record - ) + return TransferDataRequest(block_sequence_counter, transfer_request_parameter_record) @property def pdu(self) -> bytes: @@ -3399,9 +3249,7 @@ def __init__(self, transfer_response_parameter_record: bytes = b"") -> None: @property def pdu(self) -> bytes: assert self.RESPONSE_SERVICE_ID is not None - return ( - bytes([self.RESPONSE_SERVICE_ID]) + self.transfer_response_parameter_record - ) + return bytes([self.RESPONSE_SERVICE_ID]) + self.transfer_response_parameter_record @classmethod def _from_pdu(cls, pdu: bytes) -> RequestTransferExitResponse: diff --git a/src/gallia/services/uds/core/utils.py b/src/gallia/services/uds/core/utils.py index 7ed529876..81e48db5a 100644 --- a/src/gallia/services/uds/core/utils.py +++ b/src/gallia/services/uds/core/utils.py @@ -36,9 +36,7 @@ def check_sub_function(sub_function: int) -> None: raise ValueError(f"Not a valid subFunction: {int_repr(sub_function)}") -def check_length( - pdu: bytes, minimal_length: int = 0, maximal_length: int | None = None -) -> None: +def check_length(pdu: bytes, minimal_length: int = 0, maximal_length: int | None = None) -> None: if len(pdu) < 1: raise ValueError("The PDU is empty") @@ -208,17 +206,12 @@ def address_and_size_length(address_and_length_fmt: int) -> tuple[int, int]: """ if not 0x00 <= address_and_length_fmt <= 0xFF: raise ValueError( - "The addressAndLengthFormatIdentifier must not be negative " - "nor exceed 0xff" + "The addressAndLengthFormatIdentifier must not be negative " "nor exceed 0xff" ) if address_and_length_fmt & 0xF0 == 0: - raise ValueError( - "The addressAndLengthFormatIdentifier's first nibble must not be 0" - ) + raise ValueError("The addressAndLengthFormatIdentifier's first nibble must not be 0") if address_and_length_fmt & 0x0F == 0: - raise ValueError( - "The addressAndLengthFormatIdentifier's second nibble must not be 0" - ) + raise ValueError("The addressAndLengthFormatIdentifier's second nibble must not be 0") addr_length = address_and_length_fmt & 0x0F size_length = (address_and_length_fmt & 0xF0) >> 4 diff --git a/src/gallia/services/uds/ecu.py b/src/gallia/services/uds/ecu.py index 7ea9b672e..b5f2f0b80 100644 --- a/src/gallia/services/uds/ecu.py +++ b/src/gallia/services/uds/ecu.py @@ -103,9 +103,7 @@ async def read_session(self, config: UDSRequestConfig | None = None) -> int: raise as_exception(resp) return from_bytes(resp.data_record) - async def set_session_pre( - self, level: int, config: UDSRequestConfig | None = None - ) -> bool: + async def set_session_pre(self, level: int, config: UDSRequestConfig | None = None) -> bool: """set_session_pre() is called before the diagnostic session control pdu is written on the wire. Implement this if there are special preconditions for a particular session, such as disabling error @@ -120,9 +118,7 @@ async def set_session_pre( """ return True - async def set_session_post( - self, level: int, config: UDSRequestConfig | None = None - ) -> bool: + async def set_session_post(self, level: int, config: UDSRequestConfig | None = None) -> bool: """set_session_post() is called after the diagnostic session control pdu was written on the wire. Implement this if there are special cleanup routines or sleeping until a certain moment is required. @@ -150,9 +146,7 @@ async def check_and_set_session( logger.debug(f"Checking current session, expecting {g_repr(expected_session)}") try: - current_session = await self.read_session( - config=UDSRequestConfig(max_retry=retries) - ) + current_session = await self.read_session(config=UDSRequestConfig(max_retry=retries)) except UnexpectedNegativeResponse as e: if suggests_identifier_not_supported(e.RESPONSE_CODE): logger.info( @@ -179,9 +173,7 @@ async def check_and_set_session( resp = await self.set_session(expected_session) if isinstance(resp, service.NegativeResponse): - logger.warning( - f"Switching to session {g_repr(expected_session)} failed: {resp}" - ) + logger.warning(f"Switching to session {g_repr(expected_session)} failed: {resp}") try: current_session = await self.read_session( @@ -198,9 +190,7 @@ async def check_and_set_session( return True raise e except asyncio.TimeoutError: - logger.warning( - "Reading current session timed out, skipping check_session" - ) + logger.warning("Reading current session timed out, skipping check_session") return True logger.warning( @@ -220,9 +210,7 @@ async def callback() -> None: self.state.reset() return True - async def leave_session( - self, level: int, config: UDSRequestConfig | None = None - ) -> bool: + async def leave_session(self, level: int, config: UDSRequestConfig | None = None) -> bool: """leave_session() is a hook which can be called explicitly by a scanner when a session is to be disabled. Use this hook if resetting the ECU is required, e.g. when disabling the programming session. @@ -259,14 +247,8 @@ async def set_session( resp = await self.diagnostic_session_control(level, config=config) - if ( - isinstance(resp, service.NegativeResponse) - and self.db_handler is not None - and use_db - ): - logger.debug( - "Could not switch to session. Trying with database transitions ..." - ) + if isinstance(resp, service.NegativeResponse) and self.db_handler is not None and use_db: + logger.debug("Could not switch to session. Trying with database transitions ...") if self.db_handler is not None: steps = await self.db_handler.get_session_transition(level) @@ -288,9 +270,7 @@ async def read_dtc( self, config: UDSRequestConfig | None = None ) -> service.NegativeResponse | service.ReportDTCByStatusMaskResponse: """Read all dtc records from the ecu.""" - return await self.read_dtc_information_report_dtc_by_status_mask( - 0xFF, config=config - ) + return await self.read_dtc_information_report_dtc_by_status_mask(0xFF, config=config) async def clear_dtc( self, config: UDSRequestConfig | None = None @@ -323,8 +303,7 @@ async def transmit_data( counter += 1 payload = data[i : i + payload_size] logger.debug( - f"Transferring block {g_repr(counter)} " - f"with payload size {g_repr(len(payload))}" + f"Transferring block {g_repr(counter)} " f"with payload size {g_repr(len(payload))}" ) resp: service.UDSResponse = await self.transfer_data( counter & 0xFF, payload, config=config @@ -383,9 +362,7 @@ async def _tester_present_worker(self, interval: float) -> None: logger.info("connection lost; tester present waiting…") except Exception as e: logger.warning(f"Tester present worker got {e!r}") - logger.debug( - "Tester present worker was cancelled but received no asyncio.CancelledError" - ) + logger.debug("Tester present worker was cancelled but received no asyncio.CancelledError") async def start_cyclic_tester_present(self, interval: float) -> None: logger.debug("Starting tester present worker") @@ -401,9 +378,7 @@ async def start_cyclic_tester_present(self, interval: float) -> None: async def stop_cyclic_tester_present(self) -> None: logger.debug("Stopping tester present worker") if self.tester_present_task is None: - logger.warning( - "BUG: stop_cyclic_tester_present() called but no task running" - ) + logger.warning("BUG: stop_cyclic_tester_present() called but no task running") return self.tester_present_task.cancel() @@ -421,8 +396,7 @@ async def update_state( if ( isinstance(response, service.ReadDataByIdentifierResponse) - and response.data_identifier - == DataIdentifier.ActiveDiagnosticSessionDataIdentifier + and response.data_identifier == DataIdentifier.ActiveDiagnosticSessionDataIdentifier ): new_session = int.from_bytes(response.data_record, "big") @@ -485,11 +459,7 @@ async def _request( if self.implicit_logging and self.db_handler is not None: mode = LogMode.implicit - if ( - config is not None - and config.tags is not None - and "ANALYZE" in config.tags - ): + if config is not None and config.tags is not None and "ANALYZE" in config.tags: mode = LogMode.emphasized await self.db_handler.insert_scan_result( diff --git a/src/gallia/services/uds/helpers.py b/src/gallia/services/uds/helpers.py index 592b1e3c3..2df64b7b5 100644 --- a/src/gallia/services/uds/helpers.py +++ b/src/gallia/services/uds/helpers.py @@ -16,9 +16,7 @@ def raise_for_error(response: service.UDSResponse, message: str | None = None) - if response.trigger_request is None: raise ValueError("The response has not been assigned a trigger request") - raise UnexpectedNegativeResponse.parse_dynamic( - response.trigger_request, response, message - ) + raise UnexpectedNegativeResponse.parse_dynamic(response.trigger_request, response, message) def as_exception( @@ -27,9 +25,7 @@ def as_exception( if response.trigger_request is None: raise ValueError("The response has not been assigned a trigger request") - return UnexpectedNegativeResponse.parse_dynamic( - response.trigger_request, response, message - ) + return UnexpectedNegativeResponse.parse_dynamic(response.trigger_request, response, message) def raise_for_mismatch( diff --git a/src/gallia/services/uds/server.py b/src/gallia/services/uds/server.py index b5d8973f8..eecbb3033 100644 --- a/src/gallia/services/uds/server.py +++ b/src/gallia/services/uds/server.py @@ -56,9 +56,7 @@ def supported_services( def default_response_if_service_not_supported( self, request: service.UDSRequest ) -> service.NegativeResponse | None: - assert ( - self.state.session in self.supported_services - ), "Virtual ECU in unsupported session" + assert self.state.session in self.supported_services, "Virtual ECU in unsupported session" if request.service_id not in self.supported_services[self.state.session]: if any(request.service_id in s for s in self.supported_services.values()): @@ -102,9 +100,7 @@ def default_response_if_missing_sub_function( def default_response_if_sub_function_not_supported( self, request: service.UDSRequest ) -> service.NegativeResponse | None: - assert ( - self.state.session in self.supported_services - ), "Virtual ECU in unsupported session" + assert self.state.session in self.supported_services, "Virtual ECU in unsupported session" # The standards explicitly excludes RoutineControl for this check because the availability of a sub function # depends on the routineIdentifier @@ -116,10 +112,7 @@ def default_response_if_sub_function_not_supported( supported_in_other_session = False for session in self.supported_services: - if ( - UDSIsoServices(request.service_id) - not in self.supported_services[session] - ): + if UDSIsoServices(request.service_id) not in self.supported_services[session]: continue supported_sub_functions = self.supported_services[session][ @@ -159,13 +152,11 @@ def default_response_if_incorrect_format( return None - def default_response_if_session_change( - self, request: service.UDSRequest - ) -> None | (service.NegativeResponse | service.DiagnosticSessionControlResponse): + def default_response_if_session_change(self, request: service.UDSRequest) -> None | ( + service.NegativeResponse | service.DiagnosticSessionControlResponse + ): if isinstance(request, service.DiagnosticSessionControlRequest): - return service.DiagnosticSessionControlResponse( - request.diagnostic_session_type - ) + return service.DiagnosticSessionControlResponse(request.diagnostic_session_type) return None @@ -173,10 +164,7 @@ def default_response_if_session_read( self, request: service.UDSRequest ) -> service.ReadDataByIdentifierResponse | None: if isinstance(request, service.ReadDataByIdentifierRequest): - if ( - request.data_identifier - == DataIdentifier.ActiveDiagnosticSessionDataIdentifier - ): + if request.data_identifier == DataIdentifier.ActiveDiagnosticSessionDataIdentifier: return service.ReadDataByIdentifierResponse( request.data_identifier, to_bytes(self.state.session, 1) ) @@ -191,9 +179,7 @@ def default_response_if_tester_present( return None - def default_response_if_none( - self, request: service.UDSRequest - ) -> service.NegativeResponse: + def default_response_if_none(self, request: service.UDSRequest) -> service.NegativeResponse: return service.NegativeResponse(request.service_id, UDSErrorCodes.generalReject) def default_response_if_suppress( @@ -237,38 +223,32 @@ async def respond_without_state_change( if ( self.use_default_response_if_service_not_supported - and (response := self.default_response_if_service_not_supported(request)) - is not None + and (response := self.default_response_if_service_not_supported(request)) is not None ): return response if ( self.use_default_response_if_missing_sub_function - and (response := self.default_response_if_missing_sub_function(request)) - is not None + and (response := self.default_response_if_missing_sub_function(request)) is not None ): return response if ( self.use_default_response_if_sub_function_not_supported - and ( - response := self.default_response_if_sub_function_not_supported(request) - ) + and (response := self.default_response_if_sub_function_not_supported(request)) is not None ): return response if ( self.use_default_response_if_incorrect_format - and (response := self.default_response_if_incorrect_format(request)) - is not None + and (response := self.default_response_if_incorrect_format(request)) is not None ): return response if ( self.use_default_response_if_session_change - and (response := self.default_response_if_session_change(request)) - is not None + and (response := self.default_response_if_session_change(request)) is not None ): return response @@ -280,8 +260,7 @@ async def respond_without_state_change( if ( self.use_default_response_if_tester_present - and (response := self.default_response_if_tester_present(request)) - is not None + and (response := self.default_response_if_tester_present(request)) is not None ): return response @@ -373,8 +352,7 @@ def __init__(self, seed: int): self.services: dict[int, dict[UDSIsoServices, list[int] | None]] = {} self.mandatory_services = [UDSIsoServices.DiagnosticSessionControl] self.optional_services = list( - set(UDSIsoServices) - - set(self.mandatory_services + [UDSIsoServices.NegativeResponse]) + set(UDSIsoServices) - set(self.mandatory_services + [UDSIsoServices.NegativeResponse]) ) self.p_service = 0.2 @@ -426,9 +404,7 @@ def randomize(self) -> None: for session in level_sessions: transitions = [ - session - for session in combined_sessions - if rng.random() < p_transition + session for session in combined_sessions if rng.random() < p_transition ] session_transitions[session].update(transitions) next_level_sessions.update(transitions) @@ -471,9 +447,7 @@ def randomize(self) -> None: supported_sub_functions = sorted(session_specific_transitions) elif supported_service == UDSIsoServices.SecurityAccess: supported_sub_functions_tmp = [ - sf - for sf in range(1, 0x7E, 2) - if rng.random() < self.p_sub_function / 2 + sf for sf in range(1, 0x7E, 2) if rng.random() < self.p_sub_function / 2 ] supported_sub_functions = [] @@ -481,19 +455,13 @@ def randomize(self) -> None: supported_sub_functions.append(sf) supported_sub_functions.append(sf + 1) elif supported_service == UDSIsoServices.RoutineControl: - supported_sub_functions = [ - sf.value for sf in RoutineControlSubFuncs - ] + supported_sub_functions = [sf.value for sf in RoutineControlSubFuncs] # Currently only this sub function is supported so it doesn't make sense to gamble a lot here elif supported_service == UDSIsoServices.ReadDTCInformation: - supported_sub_functions = [ - ReadDTCInformationSubFuncs.reportDTCByStatusMask - ] + supported_sub_functions = [ReadDTCInformationSubFuncs.reportDTCByStatusMask] else: supported_sub_functions = [ - sf - for sf in range(1, 0x80, 1) - if rng.random() < self.p_sub_function + sf for sf in range(1, 0x80, 1) if rng.random() < self.p_sub_function ] self.services[session][supported_service] = supported_sub_functions @@ -506,10 +474,7 @@ def supported_services( def stateful_rng(self, *args: Any) -> RNG: return RNG( - str(self.seed) - + "|" - + str(self.state.session) - + "|".join(str(arg) for arg in args) + str(self.seed) + "|" + str(self.state.session) + "|".join(str(arg) for arg in args) ) async def respond_after_default( @@ -544,9 +509,7 @@ async def update_state( if not isinstance(response, service.TesterPresentResponse): self.state.last_sa_response = ( - response - if isinstance(response, service.SecurityAccessResponse) - else None + response if isinstance(response, service.SecurityAccessResponse) else None ) def ecu_reset(self, request: service.ECUResetRequest) -> service.UDSResponse: @@ -557,9 +520,7 @@ def ecu_reset(self, request: service.ECUResetRequest) -> service.UDSResponse: return service.ECUResetResponse(request.reset_type) - def security_access( - self, request: service._SecurityAccessRequest - ) -> service.UDSResponse: + def security_access(self, request: service._SecurityAccessRequest) -> service.UDSResponse: if isinstance(request, service.RequestSeedRequest): return service.SecurityAccessResponse( request.security_access_type, RNG().random_payload() @@ -582,21 +543,15 @@ def security_access( if request.security_key == expected_key: return service.SecurityAccessResponse(request.security_access_type) - return service.NegativeResponse( - request.service_id, UDSErrorCodes.invalidKey - ) + return service.NegativeResponse(request.service_id, UDSErrorCodes.invalidKey) raise AssertionError() - def routine_control( - self, request: service.RoutineControlRequest - ) -> service.UDSResponse: + def routine_control(self, request: service.RoutineControlRequest) -> service.UDSResponse: rng = self.stateful_rng(request.service_id, request.routine_identifier) if not rng.random_bool(self.p_identifier): - return service.NegativeResponse( - request.service_id, UDSErrorCodes.requestOutOfRange - ) + return service.NegativeResponse(request.service_id, UDSErrorCodes.requestOutOfRange) rng.add_seeds(request.sub_function) @@ -620,9 +575,7 @@ def read_data_by_identifier( rng = self.stateful_rng(request.pdu) if not rng.random_bool(self.p_identifier): - return service.NegativeResponse( - request.service_id, UDSErrorCodes.requestOutOfRange - ) + return service.NegativeResponse(request.service_id, UDSErrorCodes.requestOutOfRange) return service.ReadDataByIdentifierResponse( request.data_identifier, rng.random_payload(min_len=1) @@ -634,9 +587,7 @@ def write_data_by_identifier( rng = self.stateful_rng(request.service_id, request.data_identifier) if not rng.random_bool(self.p_identifier): - return service.NegativeResponse( - request.service_id, UDSErrorCodes.requestOutOfRange - ) + return service.NegativeResponse(request.service_id, UDSErrorCodes.requestOutOfRange) rng = self.stateful_rng(request.pdu) @@ -653,9 +604,7 @@ def input_output_control_by_identifier( rng = self.stateful_rng(request.service_id, request.data_identifier) if not rng.random_bool(self.p_identifier): - return service.NegativeResponse( - request.service_id, UDSErrorCodes.requestOutOfRange - ) + return service.NegativeResponse(request.service_id, UDSErrorCodes.requestOutOfRange) rng = self.stateful_rng(request.pdu) @@ -664,9 +613,7 @@ def input_output_control_by_identifier( request.service_id, UDSErrorCodes.incorrectMessageLengthOrInvalidFormat ) - return request.RESPONSE_TYPE( - request.data_identifier, rng.random_payload(min_len=1) - ) # type: ignore + return request.RESPONSE_TYPE(request.data_identifier, rng.random_payload(min_len=1)) # type: ignore def clear_diagnostic_information( self, request: service.ClearDiagnosticInformationRequest @@ -674,9 +621,7 @@ def clear_diagnostic_information( rng = self.stateful_rng(request.service_id, request.group_of_dtc) if not rng.random_bool(self.p_dtc_status_mask): - return service.NegativeResponse( - request.service_id, UDSErrorCodes.requestOutOfRange - ) + return service.NegativeResponse(request.service_id, UDSErrorCodes.requestOutOfRange) return service.ClearDiagnosticInformationResponse() @@ -702,15 +647,11 @@ def read_dtc_information(self, request: service.UDSRequest) -> service.UDSRespon dtc_status_availability_mask, dtc_and_status_record ) - return service.NegativeResponse( - request.service_id, UDSErrorCodes.subFunctionNotSupported - ) + return service.NegativeResponse(request.service_id, UDSErrorCodes.subFunctionNotSupported) class DBUDSServer(UDSServer): - def __init__( - self, db_path: Path, ecu: str | None, properties: dict[str, Any] | None - ): + def __init__(self, db_path: Path, ecu: str | None, properties: dict[str, Any] | None): super().__init__() self.db_path = db_path @@ -772,9 +713,7 @@ async def respond_after_default( query += f"json_extract(r.state, '$.{key}') IS NULL AND " else: query += f"json_extract(r.state, '$.{key}') = ? AND " - parameters.append( - value if isinstance(value, int | float) else json.dumps(value) - ) + parameters.append(value if isinstance(value, int | float) else json.dumps(value)) if self.properties is not None: for key, value in self.properties.items(): @@ -791,9 +730,7 @@ async def respond_after_default( final_query = f"{query} AND r.id > ? ORDER BY r.id LIMIT 1 " parameters += [bytes_repr(request.pdu, False, None), self.last_response] - cursor: aiosqlite.Cursor = await self.connection.execute( - final_query, parameters - ) + cursor: aiosqlite.Cursor = await self.connection.execute(final_query, parameters) result = await cursor.fetchone() if result is None: @@ -861,18 +798,14 @@ async def handle_client( tcp_request = line.decode("ascii").strip() uds_request_raw = unhexlify(tcp_request) - uds_response_raw, response_time = await self.handle_request( - uds_request_raw - ) + uds_response_raw, response_time = await self.handle_request(uds_request_raw) response_times.append(response_time) if uds_response_raw is not None: writer.write(hexlify(uds_response_raw) + b"\n") await writer.drain() except Exception as e: - logger.error( - f"Unexpected exception when handling client communication: {e!r}" - ) + logger.error(f"Unexpected exception when handling client communication: {e!r}") traceback.print_exc() break @@ -902,9 +835,7 @@ async def run(self) -> None: if uds_response_raw is not None: await transport.write(uds_response_raw) except Exception as e: - logger.error( - f"Unexpected exception when handling client communication: {e!r}" - ) + logger.error(f"Unexpected exception when handling client communication: {e!r}") traceback.print_exc() break diff --git a/src/gallia/services/xcp/__init__.py b/src/gallia/services/xcp/__init__.py index 717f51584..e131ff5d1 100644 --- a/src/gallia/services/xcp/__init__.py +++ b/src/gallia/services/xcp/__init__.py @@ -26,9 +26,7 @@ async def request(self, data: bytes, timeout: float | None = None) -> bytes: header = types.Response.parse(resp) logger.info(header) if int(header.type) != 255: - raise ValueError( - f"Unknown response type: {header.type}, maybe no XCP packet?" - ) + raise ValueError(f"Unknown response type: {header.type}, maybe no XCP packet?") # strip header byte return resp[1:] @@ -106,8 +104,6 @@ async def request(self, data: bytes, timeout: float | None = None) -> bytes: header = types.Response.parse(resp) logger.info(header) if int(header.type) != 255: - raise ValueError( - f"Unknown response type: {header.type}, maybe no XCP packet?" - ) + raise ValueError(f"Unknown response type: {header.type}, maybe no XCP packet?") # strip header byte return resp[1:] diff --git a/src/gallia/services/xcp/types.py b/src/gallia/services/xcp/types.py index 0a360d9f5..3839b9b36 100644 --- a/src/gallia/services/xcp/types.py +++ b/src/gallia/services/xcp/types.py @@ -326,9 +326,7 @@ class Event(enum.IntEnum): "calpag": 1, } -AddressGranularity = Enum( - BitsInteger(2), BYTE=0b00, WORD=0b01, DWORD=0b10, RESERVED=0b11 -) +AddressGranularity = Enum(BitsInteger(2), BYTE=0b00, WORD=0b01, DWORD=0b10, RESERVED=0b11) ByteOrder = Enum(BitsInteger(1), INTEL=0, MOTOROLA=1) @@ -348,9 +346,7 @@ class Event(enum.IntEnum): "byteOrder" / ByteOrder, ) -ConnectResponsePartial = Struct( - "resource" / ResourceType, "commModeBasic" / CommModeBasic -) +ConnectResponsePartial = Struct("resource" / ResourceType, "commModeBasic" / CommModeBasic) ConnectResponse = Struct( "resource" / ResourceType, @@ -409,9 +405,7 @@ class Event(enum.IntEnum): "identification" / If(this.mode == 1, Int8ul[this.length]), ) -GetSeedResponse = Struct( - "length" / Int8ul, "seed" / If(this.length > 0, Int8ul[this.length]) -) +GetSeedResponse = Struct("length" / Int8ul, "seed" / If(this.length > 0, Int8ul[this.length])) SetRequestMode = BitStruct( Padding(4), @@ -579,14 +573,12 @@ class Event(enum.IntEnum): "daqPackedMode" / DaqPackedMode, "dpmTimestampMode" / If( - (this.daqPackedMode == "ELEMENT_GROUPED") - | (this.daqPackedMode == "EVENT_GROUPED"), + (this.daqPackedMode == "ELEMENT_GROUPED") | (this.daqPackedMode == "EVENT_GROUPED"), Int8ul, ), "dpmSampleCount" / If( - (this.daqPackedMode == "ELEMENT_GROUPED") - | (this.daqPackedMode == "EVENT_GROUPED"), + (this.daqPackedMode == "ELEMENT_GROUPED") | (this.daqPackedMode == "EVENT_GROUPED"), Int16u, ), ) @@ -741,9 +733,7 @@ class Event(enum.IntEnum): "absoluteMode" / Flag, ) -GetPgmProcessorInfoResponse = Struct( - "pgmProperties" / PgmProperties, "maxSector" / Int8ul -) +GetPgmProcessorInfoResponse = Struct("pgmProperties" / PgmProperties, "maxSector" / Int8ul) GetSectorInfoResponseMode01 = Struct( "clearSequenceNumber" / Int8ul, @@ -840,8 +830,7 @@ class Event(enum.IntEnum): If(this._.width == 2, Padding(1)), If(this._.width == 4, Padding(3)), If(this._.width == 8, Padding(7)), - "data" - / GreedyRange(Switch(this._.width, {1: Int8ul, 2: Int16u, 4: Int32u, 8: Int64u})), + "data" / GreedyRange(Switch(this._.width, {1: Int8ul, 2: Int16u, 4: Int32u, 8: Int64u})), ) DbgGetTriDescTblTrad = Struct( diff --git a/src/gallia/transports/can.py b/src/gallia/transports/can.py index b36ab1952..6e6e39ecc 100644 --- a/src/gallia/transports/can.py +++ b/src/gallia/transports/can.py @@ -193,20 +193,14 @@ async def recvfrom( self, timeout: float | None = None, tags: list[str] | None = None ) -> tuple[int, bytes]: loop = asyncio.get_running_loop() - can_frame = await asyncio.wait_for( - loop.sock_recv(self._sock, self.BUFSIZE), timeout - ) + can_frame = await asyncio.wait_for(loop.sock_recv(self._sock, self.BUFSIZE), timeout) msg = CANMessage.unpack(can_frame) t = tags + ["read"] if tags is not None else ["read"] if msg.is_extended_id: - logger.trace( - f"{msg.arbitration_id:08x}#{msg.data.hex()}", extra={"tags": t} - ) + logger.trace(f"{msg.arbitration_id:08x}#{msg.data.hex()}", extra={"tags": t}) else: - logger.trace( - f"{msg.arbitration_id:03x}#{msg.data.hex()}", extra={"tags": t} - ) + logger.trace(f"{msg.arbitration_id:03x}#{msg.data.hex()}", extra={"tags": t}) return msg.arbitration_id, msg.data async def close(self) -> None: diff --git a/src/gallia/transports/doip.py b/src/gallia/transports/doip.py index 6f1e088f3..9b6e74504 100644 --- a/src/gallia/transports/doip.py +++ b/src/gallia/transports/doip.py @@ -55,9 +55,7 @@ class DoIPRoutingActivationDeniedError(ConnectionAbortedError): def __init__(self, rac_code: int): self.rac_code = RoutingActivationResponseCodes(rac_code) - super().__init__( - f"DoIP routing activation denied: {self.rac_code.name} ({rac_code})" - ) + super().__init__(f"DoIP routing activation denied: {self.rac_code.name} ({rac_code})") @unique @@ -103,9 +101,7 @@ class DoIPNegativeAckError(BrokenPipeError): def __init__(self, negative_ack_code: int): self.nack_code = DiagnosticMessageNegativeAckCodes(negative_ack_code) - super().__init__( - f"DoIP negative ACK received: {self.nack_code.name} ({negative_ack_code})" - ) + super().__init__(f"DoIP negative ACK received: {self.nack_code.name} ({negative_ack_code})") @unique @@ -177,9 +173,7 @@ class RoutingActivationRequest: # OEMReserved uint32 def pack(self) -> bytes: - return struct.pack( - "!HBI", self.SourceAddress, self.ActivationType, self.Reserved - ) + return struct.pack("!HBI", self.SourceAddress, self.ActivationType, self.Reserved) @dataclass @@ -361,9 +355,7 @@ async def _read_frame(self) -> DoIPFrame: case PayloadTypes.AliveCheckRequest: payload = AliveCheckRequest() case _: - raise BrokenPipeError( - f"unexpected DoIP message: {hdr} {payload_buf.hex()}" - ) + raise BrokenPipeError(f"unexpected DoIP message: {hdr} {payload_buf.hex()}") return hdr, payload async def _read_worker(self) -> None: @@ -399,15 +391,10 @@ async def read_diag_request_raw(self) -> DoIPDiagFrame: while True: hdr, payload = await self.read_frame() if not isinstance(payload, DiagnosticMessage): - logger.warning( - f"expected DoIP DiagnosticMessage, instead got: {hdr} {payload}" - ) + logger.warning(f"expected DoIP DiagnosticMessage, instead got: {hdr} {payload}") unexpected_packets.append((hdr, payload)) continue - if ( - payload.SourceAddress != self.target_addr - or payload.TargetAddress != self.src_addr - ): + if payload.SourceAddress != self.target_addr or payload.TargetAddress != self.src_addr: logger.warning( f"DoIP-DiagnosticMessage: unexpected addresses (src:dst); expected {self.src_addr:#04x}:" + f"{self.target_addr:#04x} but got: {payload.SourceAddress:#04x}:{payload.TargetAddress:#04x}" @@ -429,19 +416,14 @@ async def _read_ack(self, prev_data: bytes) -> None: unexpected_packets: list[tuple[Any, Any]] = [] while True: hdr, payload = await self.read_frame_unsafe() - if not isinstance( - payload, DiagnosticMessagePositiveAcknowledgement - ) and not isinstance(payload, DiagnosticMessageNegativeAcknowledgement): - logger.warning( - f"expected DoIP positive/negative ACK, instead got: {hdr} {payload}" - ) + if not isinstance(payload, DiagnosticMessagePositiveAcknowledgement) and not isinstance( + payload, DiagnosticMessageNegativeAcknowledgement + ): + logger.warning(f"expected DoIP positive/negative ACK, instead got: {hdr} {payload}") unexpected_packets.append((hdr, payload)) continue - if ( - payload.SourceAddress != self.target_addr - or payload.TargetAddress != self.src_addr - ): + if payload.SourceAddress != self.target_addr or payload.TargetAddress != self.src_addr: logger.warning( f"DoIP-ACK: unexpected addresses (src:dst); expected {self.src_addr:#04x}:{self.target_addr:#04x} " + f"but got: {payload.SourceAddress:#04x}:{payload.TargetAddress:#04x}" @@ -482,13 +464,8 @@ async def _read_routing_activation_response(self) -> None: for item in unexpected_packets: await self._read_queue.put(item) - if ( - payload.RoutingActivationResponseCode - != RoutingActivationResponseCodes.Success - ): - raise DoIPRoutingActivationDeniedError( - payload.RoutingActivationResponseCode - ) + if payload.RoutingActivationResponseCode != RoutingActivationResponseCodes.Success: + raise DoIPRoutingActivationDeniedError(payload.RoutingActivationResponseCode) return async def write_request_raw(self, hdr: GenericHeader, payload: DoIPOutData) -> None: @@ -518,9 +495,7 @@ async def write_request_raw(self, hdr: GenericHeader, payload: DoIPOutData) -> N ) except asyncio.TimeoutError as e: await self.close() - raise BrokenPipeError( - "Timeout while waiting for DoIP ACK message" - ) from e + raise BrokenPipeError("Timeout while waiting for DoIP ACK message") from e async def write_diag_request(self, data: bytes) -> None: hdr = GenericHeader( @@ -614,9 +589,7 @@ async def _connect( src_addr, target_addr, ) - await conn.write_routing_activation_request( - RoutingActivationRequestTypes(activation_type) - ) + await conn.write_routing_activation_request(RoutingActivationRequestTypes(activation_type)) return conn @classmethod diff --git a/src/gallia/transports/isotp.py b/src/gallia/transports/isotp.py index df402b99a..103c7fe6e 100644 --- a/src/gallia/transports/isotp.py +++ b/src/gallia/transports/isotp.py @@ -187,14 +187,10 @@ async def write( await asyncio.wait_for(loop.sock_sendall(self._sock, data), timeout) return len(data) - async def read( - self, timeout: float | None = None, tags: list[str] | None = None - ) -> bytes: + async def read(self, timeout: float | None = None, tags: list[str] | None = None) -> bytes: loop = asyncio.get_running_loop() try: - data = await asyncio.wait_for( - loop.sock_recv(self._sock, self.BUFSIZE), timeout - ) + data = await asyncio.wait_for(loop.sock_recv(self._sock, self.BUFSIZE), timeout) except OSError as e: if e.errno == errno.ECOMM: raise BrokenPipeError(f"isotp flow control frame missing: {e}") from e diff --git a/src/gallia/transports/tcp.py b/src/gallia/transports/tcp.py index b934bb429..743979ecc 100644 --- a/src/gallia/transports/tcp.py +++ b/src/gallia/transports/tcp.py @@ -24,9 +24,7 @@ def __init__( self.writer = writer @classmethod - async def connect( - cls, target: str | TargetURI, timeout: float | None = None - ) -> TCPTransport: + async def connect(cls, target: str | TargetURI, timeout: float | None = None) -> TCPTransport: t = target if isinstance(target, TargetURI) else TargetURI(target) cls.check_scheme(t) diff --git a/src/gallia/transports/unix.py b/src/gallia/transports/unix.py index ed0c81849..dd87ab553 100644 --- a/src/gallia/transports/unix.py +++ b/src/gallia/transports/unix.py @@ -24,15 +24,11 @@ def __init__( self.writer = writer @classmethod - async def connect( - cls, target: str | TargetURI, timeout: float | None = None - ) -> UnixTransport: + async def connect(cls, target: str | TargetURI, timeout: float | None = None) -> UnixTransport: t = target if isinstance(target, TargetURI) else TargetURI(target) cls.check_scheme(t) - reader, writer = await asyncio.wait_for( - asyncio.open_unix_connection(t.path), timeout - ) + reader, writer = await asyncio.wait_for(asyncio.open_unix_connection(t.path), timeout) return cls(t, reader, writer) diff --git a/src/opennetzteil/cli.py b/src/opennetzteil/cli.py index a2b6df66e..6a6524ef0 100644 --- a/src/opennetzteil/cli.py +++ b/src/opennetzteil/cli.py @@ -62,9 +62,7 @@ async def main(self, args: Namespace) -> None: client = await netzteil.connect(args.target, timeout=1.0) break else: - self.parser.error( - f"powersupply {args.power_supply.product_id} is not supported" - ) + self.parser.error(f"powersupply {args.power_supply.product_id} is not supported") match args.subcommand: case "get": diff --git a/src/opennetzteil/devices/rs/hmc804.py b/src/opennetzteil/devices/rs/hmc804.py index b36031315..f7458c3fa 100644 --- a/src/opennetzteil/devices/rs/hmc804.py +++ b/src/opennetzteil/devices/rs/hmc804.py @@ -33,9 +33,7 @@ async def _send_line(self, writer: asyncio.StreamWriter, data: str) -> None: await asyncio.wait_for(writer.drain(), self.timeout) async def _recv_line(self, reader: asyncio.StreamReader) -> str: - return ( - (await asyncio.wait_for(reader.readline(), self.timeout)).decode().strip() - ) + return (await asyncio.wait_for(reader.readline(), self.timeout)).decode().strip() async def _close_conn(self, writer: asyncio.StreamWriter) -> None: writer.close() diff --git a/tests/test_target_uris.py b/tests/test_target_uris.py index 6a814abb4..f0eda04f7 100644 --- a/tests/test_target_uris.py +++ b/tests/test_target_uris.py @@ -27,8 +27,7 @@ invalid_uris = [ "doip://127.0.0.1:13400?src_addr=1", - "doip://127.0.0.1:13400?target_addr=1" - "doip://127.0.0.1:13400?src_addr=0x01&target_addr=hans", + "doip://127.0.0.1:13400?target_addr=1" "doip://127.0.0.1:13400?src_addr=0x01&target_addr=hans", "doip://127.0.0.1:13400?src_addr=hans&target_addr=0x01", "isotp://can0?src_addr=1", "isotp://can0?dst_addr=1",