Skip to content

Commit

Permalink
chore: make fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
rumpelsepp committed Feb 20, 2024
1 parent 8ab5671 commit 598d7c3
Show file tree
Hide file tree
Showing 40 changed files with 338 additions and 1,008 deletions.
143 changes: 35 additions & 108 deletions src/cursed_hr/cursed_hr.py

Large diffs are not rendered by default.

16 changes: 4 additions & 12 deletions src/gallia/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ def build_cli(
continue

if cls.SUBGROUP is not None:
subparsers = parsers["siblings"][cls.GROUP]["siblings"][cls.SUBGROUP][
"subparsers"
]
subparsers = parsers["siblings"][cls.GROUP]["siblings"][cls.SUBGROUP]["subparsers"]
else:
subparsers = parsers["siblings"][cls.GROUP]["subparsers"]

Expand Down Expand Up @@ -224,9 +222,7 @@ def _get_cli_defaults(parser: argparse.ArgumentParser, out: dict[str, Any]) -> N
else:
continue

keys = (
f"{parser.prog} {opts_str.removeprefix('--').replace('-', '_')}".split()
)
keys = f"{parser.prog} {opts_str.removeprefix('--').replace('-', '_')}".split()
value = action.default

d = out
Expand Down Expand Up @@ -318,9 +314,7 @@ def _print_plugin(description: str, eps: list[EntryPoint]) -> None:


def cmd_show_plugins() -> None:
_print_plugin(
"initialization callbacks (gallia_cli_init)", load_cli_init_plugin_eps()
)
_print_plugin("initialization callbacks (gallia_cli_init)", load_cli_init_plugin_eps())
_print_plugin("commands (gallia_commands)", load_command_plugin_eps())
_print_plugin("transports (gallia_transports)", load_transport_plugin_eps())
_print_plugin("ecus (gallia_ecus)", load_ecu_plugin_eps())
Expand Down Expand Up @@ -419,9 +413,7 @@ def main() -> None:

setup_logging(
level=get_log_level(args),
no_volatile_info=args.no_volatile_info
if hasattr(args, "no_volatile_info")
else True,
no_volatile_info=args.no_volatile_info if hasattr(args, "no_volatile_info") else True,
)

sys.exit(args.cls_object.entry_point(args))
Expand Down
8 changes: 2 additions & 6 deletions src/gallia/command/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,7 @@ async def _db_finish_run_meta(self) -> None:
self.artifacts_dir,
)
except Exception as e:
logger.warning(
f"Could not write the run meta to the database: {e!r}"
)
logger.warning(f"Could not write the run meta to the database: {e!r}")

try:
await self.db_handler.disconnect()
Expand Down Expand Up @@ -416,9 +414,7 @@ def entry_point(self, args: Namespace) -> int:
if isinstance(e, t):
# TODO: Map the exitcode to superclass of builtin exceptions.
exit_code = exitcode.IOERR
logger.critical(
f"Caught expected exception, stack trace on debug level: {e!r}"
)
logger.critical(f"Caught expected exception, stack trace on debug level: {e!r}")
logger.debug(e, exc_info=True)
break
else:
Expand Down
24 changes: 6 additions & 18 deletions src/gallia/command/uds.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ def configure_class_parser(self) -> None:
)
group.add_argument(
"--tester-present-interval",
default=self.config.get_value(
"gallia.protocols.uds.tester_present_interval", 0.5
),
default=self.config.get_value("gallia.protocols.uds.tester_present_interval", 0.5),
type=float,
metavar="SECONDS",
help="Modify the interval of the cyclic tester present packets",
Expand All @@ -97,9 +95,7 @@ def configure_class_parser(self) -> None:
)
group.add_argument(
"--compare-properties",
default=self.config.get_value(
"gallia.protocols.uds.compare_properties", True
),
default=self.config.get_value("gallia.protocols.uds.compare_properties", True),
action=BooleanOptionalAction,
help="Compare properties before and after the scan",
)
Expand Down Expand Up @@ -167,14 +163,10 @@ async def setup(self, args: Namespace) -> None:

if self.db_handler is not None:
try:
await self.db_handler.insert_scan_run_properties_pre(
await self.ecu.properties()
)
await self.db_handler.insert_scan_run_properties_pre(await self.ecu.properties())
self._apply_implicit_logging_setting()
except Exception as e:
logger.warning(
f"Could not write the properties_pre to the database: {e!r}"
)
logger.warning(f"Could not write the properties_pre to the database: {e!r}")

async def teardown(self, args: Namespace) -> None:
if args.properties is True and not self.ecu.transport.is_closed:
Expand All @@ -192,9 +184,7 @@ async def teardown(self, args: Namespace) -> None:

if self.db_handler is not None:
try:
await self.db_handler.complete_scan_run(
await self.ecu.properties(False)
)
await self.db_handler.complete_scan_run(await self.ecu.properties(False))
except Exception as e:
logger.warning(f"Could not write the scan run to the database: {e!r}")

Expand Down Expand Up @@ -225,6 +215,4 @@ async def setup(self, args: Namespace) -> None:
try:
await self.db_handler.insert_discovery_run(args.target.url.scheme)
except Exception as e:
logger.warning(
f"Could not write the discovery run to the database: {e!r}"
)
logger.warning(f"Could not write the discovery run to the database: {e!r}")
75 changes: 18 additions & 57 deletions src/gallia/commands/discover/doip.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,12 @@ async def main2(self, args: Namespace) -> int:
try:
await self.db_handler.insert_discovery_run("doip")
except Exception as e:
logger.warning(
f"Could not write the discovery run to the database: {e!r}"
)
logger.warning(f"Could not write the discovery run to the database: {e!r}")

# Discover Hostname and Port
tgt_hostname: str
tgt_port: int
if (
target is not None
and target.hostname is not None
and target.port is not None
):
if target is not None and target.hostname is not None and target.port is not None:
logger.notice("[📋] Skipping host/port discovery because given by --target")
tgt_hostname = target.hostname
tgt_port = target.port
Expand All @@ -113,9 +107,7 @@ async def main2(self, args: Namespace) -> int:
hosts = await self.run_udp_discovery()

if len(hosts) != 1:
logger.error(
"[🍃] Can only continue with a single DoIP host! Give me a --target!"
)
logger.error("[🍃] Can only continue with a single DoIP host! Give me a --target!")
return 11

tgt_hostname, tgt_port = hosts[0]
Expand All @@ -124,9 +116,7 @@ async def main2(self, args: Namespace) -> int:
rat_success: list[int] = []
rat_wrong_source: list[int] = []
if target is not None and "activation_type" in parse_qs(target.query):
logger.notice(
"[📋] Skipping RoutingActivationType discovery because given by --target"
)
logger.notice("[📋] Skipping RoutingActivationType discovery because given by --target")
rat_success = [int(parse_qs(target.query)["activation_type"][0], 0)]
else:
logger.notice("[🔍] Enumerating all RoutingActivationTypes")
Expand All @@ -150,9 +140,7 @@ async def main2(self, args: Namespace) -> int:

# Discovering correct source address for suitable RoutingActivationRequests
if target is not None and "src_addr" in parse_qs(target.query):
logger.notice(
"[📋] Skipping SourceAddress discovery because given by --target"
)
logger.notice("[📋] Skipping SourceAddress discovery because given by --target")
targets = [
f"doip://{tgt_hostname}:{tgt_port}?activation_type={rat:#x}&src_addr={parse_qs(target.query)['src_addr'][0]}"
for rat in rat_success
Expand Down Expand Up @@ -228,14 +216,9 @@ async def enumerate_routing_activation_types(
f"[🤯] Holy moly, it actually worked for activation_type {routing_activation_type:#x} and src_addr {src_addr:#x}!!!"
)
except DoIPRoutingActivationDeniedError as e:
logger.info(
f"[🌟] splendid, {routing_activation_type:#x} yields {e.rac_code.name}"
)
logger.info(f"[🌟] splendid, {routing_activation_type:#x} yields {e.rac_code.name}")

if (
e.rac_code
!= RoutingActivationResponseCodes.UnsupportedActivationType
):
if e.rac_code != RoutingActivationResponseCodes.UnsupportedActivationType:
rat_not_unsupported.append(routing_activation_type)

if e.rac_code == RoutingActivationResponseCodes.UnknownSourceAddress:
Expand Down Expand Up @@ -264,9 +247,7 @@ async def enumerate_target_addresses( # noqa: PLR0913
responsive_targets = []
search_space = range(start, stop + 1)

conn = await self.create_DoIP_conn(
tgt_hostname, tgt_port, correct_rat, correct_src, 0xAFFE
)
conn = await self.create_DoIP_conn(tgt_hostname, tgt_port, correct_rat, correct_src, 0xAFFE)

for target_addr in search_space:
logger.debug(f"[🚧] Attempting connection to {target_addr:#x}")
Expand All @@ -280,9 +261,7 @@ async def enumerate_target_addresses( # noqa: PLR0913

# If we reach this, the request was not denied due to unknown TargetAddress
known_targets.append(current_target)
logger.notice(
f"[🥇] HEUREKA: target address {target_addr:#x} is valid! "
)
logger.notice(f"[🥇] HEUREKA: target address {target_addr:#x} is valid! ")
async with aiofiles.open(
self.artifacts_dir.joinpath("3_valid_targets.txt"), "a"
) as f:
Expand All @@ -293,8 +272,7 @@ async def enumerate_target_addresses( # noqa: PLR0913
while True:
pot_broadcast, data = await asyncio.wait_for(
self.read_diag_request_custom(conn),
TimingAndCommunicationParameters.DiagnosticMessageMessageTimeout
/ 1000
TimingAndCommunicationParameters.DiagnosticMessageMessageTimeout / 1000
if timeout is None
else timeout,
)
Expand All @@ -312,9 +290,7 @@ async def enumerate_target_addresses( # noqa: PLR0913
)

resp = TesterPresentResponse.parse_static(data)
logger.notice(
f"[🥳] It cannot get nicer: {target_addr:#x} responded: {resp}"
)
logger.notice(f"[🥳] It cannot get nicer: {target_addr:#x} responded: {resp}")
responsive_targets.append(current_target)
async with aiofiles.open(
self.artifacts_dir.joinpath("4_responsive_targets.txt"), "a"
Expand All @@ -324,10 +300,7 @@ async def enumerate_target_addresses( # noqa: PLR0913
await self.db_handler.insert_discovery_result(current_target)

except DoIPNegativeAckError as e:
if (
e.nack_code
== DiagnosticMessageNegativeAckCodes.UnknownTargetAddress
):
if e.nack_code == DiagnosticMessageNegativeAckCodes.UnknownTargetAddress:
logger.info(f"[🫥] {target_addr:#x} is an unknown target address")
continue
elif e.nack_code == DiagnosticMessageNegativeAckCodes.TargetUnreachable:
Expand All @@ -349,9 +322,7 @@ async def enumerate_target_addresses( # noqa: PLR0913
continue

except asyncio.TimeoutError: # This triggers when DoIP ACK but no UDS reply
logger.info(
f"[🙊] Presumably no active ECU on target address {target_addr:#x}"
)
logger.info(f"[🙊] Presumably no active ECU on target address {target_addr:#x}")
async with aiofiles.open(
self.artifacts_dir.joinpath("5_unresponsive_targets.txt"), "a"
) as f:
Expand All @@ -360,9 +331,7 @@ async def enumerate_target_addresses( # noqa: PLR0913

except (ConnectionError, ConnectionResetError) as e:
# Whenever this triggers, but sometimes connections are closed not by us
logger.warn(
f"[🫦] Sexy, but unexpected: {target_addr:#x} triggered {e}"
)
logger.warn(f"[🫦] Sexy, but unexpected: {target_addr:#x} triggered {e}")
async with aiofiles.open(
self.artifacts_dir.joinpath("7_targets_with_errors.txt"), "a"
) as f:
Expand Down Expand Up @@ -425,9 +394,7 @@ async def create_DoIP_conn( # noqa: PLR0913
continue
return conn

async def read_diag_request_custom(
self, conn: DoIPConnection
) -> tuple[int | None, bytes]:
async def read_diag_request_custom(self, conn: DoIPConnection) -> tuple[int | None, bytes]:
while True:
hdr, payload = await conn.read_frame()
if not isinstance(payload, DiagnosticMessage):
Expand Down Expand Up @@ -468,9 +435,7 @@ async def enumerate_source_addresses(
try:
await conn.write_routing_activation_request(routing_activation_type)
except DoIPRoutingActivationDeniedError as e:
logger.info(
f"[🌟] splendid, {source_address:#x} yields {e.rac_code.name}"
)
logger.info(f"[🌟] splendid, {source_address:#x} yields {e.rac_code.name}")

if e.rac_code != RoutingActivationResponseCodes.UnknownSourceAddress:
denied_sourceAddresses.append(source_address)
Expand Down Expand Up @@ -522,9 +487,7 @@ async def run_udp_discovery(self) -> list[tuple[str, int]]:
all_ips.append(ip)

for ip in all_ips:
logger.info(
f"[💌] Sending DoIP VehicleIdentificationRequest to {ip.broadcast}"
)
logger.info(f"[💌] Sending DoIP VehicleIdentificationRequest to {ip.broadcast}")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.settimeout(2)
Expand All @@ -551,9 +514,7 @@ async def run_udp_discovery(self) -> list[tuple[str, int]]:
for item in found:
url = f"doip://{item[0]}:{item[1]}"
logger.notice(url)
async with aiofiles.open(
self.artifacts_dir.joinpath("0_valid_hosts.txt"), "a"
) as f:
async with aiofiles.open(self.artifacts_dir.joinpath("0_valid_hosts.txt"), "a") as f:
await f.write(f"{url}\n")

return found
16 changes: 4 additions & 12 deletions src/gallia/commands/discover/find_xcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None:
self.socket: socket.socket

def configure_parser(self) -> None:
subparsers = self.parser.add_subparsers(
dest="mode", required=True, help="Transport mode"
)
subparsers = self.parser.add_subparsers(dest="mode", required=True, help="Transport mode")

sp = subparsers.add_parser("can")
sp.add_argument(
Expand All @@ -42,9 +40,7 @@ def configure_parser(self) -> None:
required=True,
help="CAN interface used for XCP communication",
)
sp.add_argument(
"--can-fd", action="store_true", default=False, help="use can FD"
)
sp.add_argument("--can-fd", action="store_true", default=False, help="use can FD")
sp.add_argument(
"--extended",
action="store_true",
Expand Down Expand Up @@ -256,9 +252,7 @@ def test_eth_broadcast(self, args: Namespace) -> None:
logger.info(f"xcp interface ip for multicast group: {addr}")

self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setsockopt(
socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(addr)
)
self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(addr))
self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32)
self.socket.settimeout(2)

Expand All @@ -276,6 +270,4 @@ def test_eth_broadcast(self, args: Namespace) -> None:
except socket.timeout:
logger.info("Timeout")

logger.result(
f"Finished; Found {len(endpoints)} XCP endpoints via multicast group"
)
logger.result(f"Finished; Found {len(endpoints)} XCP endpoints via multicast group")
8 changes: 2 additions & 6 deletions src/gallia/commands/discover/uds/isotp.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,7 @@ async def main(self, args: Namespace) -> None:
try:
addr, payload = await transport.recvfrom(timeout=0.1)
if addr == ID:
logger.info(
f"The same CAN ID {can_id_repr(ID)} answered. Skipping…"
)
logger.info(f"The same CAN ID {can_id_repr(ID)} answered. Skipping…")
continue
except asyncio.TimeoutError:
continue
Expand Down Expand Up @@ -214,9 +212,7 @@ async def main(self, args: Namespace) -> None:
)
target_args = {}
target_args["is_fd"] = str(transport.config.is_fd).lower()
target_args["is_extended"] = str(
transport.config.is_extended
).lower()
target_args["is_extended"] = str(transport.config.is_extended).lower()

if args.extended_addr:
target_args["ext_address"] = hex(ID)
Expand Down
Loading

0 comments on commit 598d7c3

Please sign in to comment.