From c9bd4e4c0df6a03daeb2968498a32f9caf248baf Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Thu, 1 Feb 2024 23:01:31 -0600 Subject: [PATCH] FW_QUERY_CONFIGS: type annotate (#31265) * annotate * fix * clean up * test * clean up * space * fix --- selfdrive/car/fw_query_definitions.py | 7 ++----- selfdrive/car/fw_versions.py | 10 +++++----- selfdrive/car/tests/test_fw_fingerprint.py | 2 +- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/selfdrive/car/fw_query_definitions.py b/selfdrive/car/fw_query_definitions.py index e069fad13bffac..8ff0bd47c44e2c 100755 --- a/selfdrive/car/fw_query_definitions.py +++ b/selfdrive/car/fw_query_definitions.py @@ -97,15 +97,12 @@ def __post_init__(self): new_request.bus += 4 self.requests.append(new_request) - def get_all_ecus(self, offline_fw_versions: OfflineFwVersions, include_ecu_type: bool = True, - include_extra_ecus: bool = True) -> set[EcuAddrSubAddr] | set[AddrType]: + def get_all_ecus(self, offline_fw_versions: OfflineFwVersions, + include_extra_ecus: bool = True) -> set[EcuAddrSubAddr]: # Add ecus in database + extra ecus brand_ecus = {ecu for ecus in offline_fw_versions.values() for ecu in ecus} if include_extra_ecus: brand_ecus |= set(self.extra_ecus) - if not include_ecu_type: - return {(addr, subaddr) for _, addr, subaddr in brand_ecus} - return brand_ecus diff --git a/selfdrive/car/fw_versions.py b/selfdrive/car/fw_versions.py index 78a875d2d7cf72..44607d83570ebf 100755 --- a/selfdrive/car/fw_versions.py +++ b/selfdrive/car/fw_versions.py @@ -8,7 +8,7 @@ from cereal import car from openpilot.common.params import Params from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs -from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType +from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig from openpilot.selfdrive.car.interfaces import get_interface_attr from openpilot.selfdrive.car.fingerprints import FW_VERSIONS from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery @@ -18,7 +18,7 @@ ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa] FUZZY_EXCLUDE_ECUS = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug] -FW_QUERY_CONFIGS = get_interface_attr('FW_QUERY_CONFIG', ignore_none=True) +FW_QUERY_CONFIGS: dict[str, FwQueryConfig] = get_interface_attr('FW_QUERY_CONFIG', ignore_none=True) VERSIONS = get_interface_attr('FW_VERSIONS', ignore_none=True) MODEL_TO_BRAND = {c: b for b, e in VERSIONS.items() for c in e} @@ -204,7 +204,7 @@ def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]: def get_brand_ecu_matches(ecu_rx_addrs: Set[EcuAddrBusType]) -> dict[str, set[AddrType]]: """Returns dictionary of brands and matches with ECUs in their FW versions""" - brand_addrs = {brand: config.get_all_ecus(VERSIONS[brand], include_ecu_type=False) for + brand_addrs = {brand: {(addr, subaddr) for _, addr, subaddr in config.get_all_ecus(VERSIONS[brand])} for brand, config in FW_QUERY_CONFIGS.items()} brand_matches: dict[str, set[AddrType]] = {brand: set() for brand, _, _ in REQUESTS} @@ -287,8 +287,8 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, # Get versions and build capnp list to put into CarParams car_fw = [] requests = [(brand, config, r) for brand, config, r in REQUESTS if is_brand(brand, query_brand)] - for addr in tqdm(addrs, disable=not progress): - for addr_chunk in chunks(addr): + for addr_group in tqdm(addrs, disable=not progress): # split by subaddr, if any + for addr_chunk in chunks(addr_group): for brand, config, r in requests: # Skip query if no panda available if r.bus > num_pandas * 4 - 1: diff --git a/selfdrive/car/tests/test_fw_fingerprint.py b/selfdrive/car/tests/test_fw_fingerprint.py index 8b3a45da6ad756..064f2e565166a0 100755 --- a/selfdrive/car/tests/test_fw_fingerprint.py +++ b/selfdrive/car/tests/test_fw_fingerprint.py @@ -150,7 +150,7 @@ def test_missing_versions_and_configs(self): # Ensure each brand has at least 1 ECU to query, and extra ECU retrieval for brand, config in FW_QUERY_CONFIGS.items(): self.assertEqual(len(config.get_all_ecus({}, include_extra_ecus=False)), 0) - self.assertEqual(config.get_all_ecus({}, include_ecu_type=True), set(config.extra_ecus)) + self.assertEqual(config.get_all_ecus({}), set(config.extra_ecus)) self.assertGreater(len(config.get_all_ecus(VERSIONS[brand])), 0) def test_fw_request_ecu_whitelist(self):