Skip to content

Commit

Permalink
Adding Capabilities and Doors
Browse files Browse the repository at this point in the history
  • Loading branch information
tillsteinbach committed Dec 30, 2024
1 parent 2435b0a commit 9af0eda
Show file tree
Hide file tree
Showing 3 changed files with 300 additions and 7 deletions.
64 changes: 64 additions & 0 deletions src/carconnectivity_connectors/volkswagen/capability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Module for volkswagen vehicle capability class."""
from __future__ import annotations
from typing import TYPE_CHECKING

from enum import IntEnum

from carconnectivity.objects import GenericObject
from carconnectivity.attributes import StringAttribute, BooleanAttribute, DateAttribute

if TYPE_CHECKING:
from carconnectivity_connectors.volkswagen.vehicle import VolkswagenVehicle


class Capability(GenericObject):
"""
Represents a capability of a Volkswagen vehicle.
"""

def __init__(self, capability_id: str, vehicle: VolkswagenVehicle) -> None:
if vehicle is None:
raise ValueError('Cannot create capability without vehicle')
if id is None:
raise ValueError('Capability ID cannot be None')
super().__init__(object_id=capability_id, parent=vehicle)
self.capability_id = StringAttribute("id", self, capability_id)
self.expiration_date = DateAttribute("expiration_date", self)
self.user_disabling_allowed = BooleanAttribute("user_disabling_allowed", self)
self.statuses = list[Capability.Status]
self.enabled = True

def __str__(self) -> str:
return_string = f'{self.capability_id}'
if self.expiration_date.enabled:
return_string += f' expires: {self.expiration_date}'
if self.user_disabling_allowed.enabled:
return_string += f' user_disabling_allowed: {self.user_disabling_allowed}'
return return_string

class Status(IntEnum):
"""
Enum for capability status.
"""
UNKNOWN = 0
DEACTIVATED = 1001
INITIALLY_DISABLED = 1003
DISABLED_BY_USER = 1004
OFFLINE_MODE = 1005
WORKSHOP_MODE = 1006
MISSING_OPERATION = 1007
MISSING_SERVICE = 1008
PLAY_PROTECTION = 1009
POWER_BUDGET_REACHED = 1010
DEEP_SLEEP = 1011
LOCATION_DATA_DISABLED = 1013
LICENSE_INACTIVE = 2001
LICENSE_EXPIRED = 2002
MISSING_LICENSE = 2003
USER_NOT_VERIFIED = 3001
TERMS_AND_CONDITIONS_NOT_ACCEPTED = 3002
INSUFFICIENT_RIGHTS = 3003
CONSENT_MISSING = 3004
LIMITED_FEATURE = 3005
AUTH_APP_CERT_ERROR = 3006
STATUS_UNSUPPORTED = 4001
167 changes: 160 additions & 7 deletions src/carconnectivity_connectors/volkswagen/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
from datetime import datetime, timedelta
import requests

from carconnectivity.vehicle import GenericVehicle
from carconnectivity.garage import Garage
from carconnectivity.errors import AuthenticationError, TooManyRequestsError, RetrievalError
from carconnectivity.errors import AuthenticationError, TooManyRequestsError, RetrievalError, APIError
from carconnectivity.util import robust_time_parse, log_extra_keys
from carconnectivity.units import Length
from carconnectivity.vehicle import GenericVehicle
from carconnectivity.doors import Doors
from carconnectivity_connectors.base.connector import BaseConnector
from carconnectivity_connectors.volkswagen.auth.session_manager import SessionManager, SessionUser, Service
from carconnectivity_connectors.volkswagen.vehicle import VolkswagenVehicle, VolkswagenElectricVehicle, VolkswagenCombustionVehicle, VolkswagenHybridVehicle
from carconnectivity_connectors.volkswagen.capability import Capability


if TYPE_CHECKING:
Expand All @@ -22,7 +27,8 @@

from carconnectivity.carconnectivity import CarConnectivity

LOG: logging.Logger = logging.getLogger("carconnectivity-connector-skoda")
LOG: logging.Logger = logging.getLogger("carconnectivity-connector-volkswagen")
LOG_API_DEBUG: logging.Logger = logging.getLogger("carconnectivity-connector-volkswagen-api-debug")


class Connector(BaseConnector):
Expand Down Expand Up @@ -122,14 +128,17 @@ def fetch_vehicles(self) -> None:
garage: Garage = self.car_connectivity.garage
url = 'https://emea.bff.cariad.digital/vehicle/v1/vehicles'
data: Dict[str, Any] | None = self._fetch_data(url, session=self._session)
print(data)

seen_vehicle_vins: set[str] = set()
if data is not None:
if 'data' in data and data['data'] is not None:
for vehicle_dict in data['data']:
if 'vin' in vehicle_dict and vehicle_dict['vin'] is not None:
vehicle: Optional[GenericVehicle] = garage.get_vehicle(vehicle_dict['vin'])
seen_vehicle_vins.add(vehicle_dict['vin'])
vehicle: Optional[VolkswagenVehicle] = garage.get_vehicle(vehicle_dict['vin']) # pyright: ignore[reportAssignmentType]
if vehicle is None:
vehicle = GenericVehicle(vin=vehicle_dict['vin'], garage=garage)
vehicle = VolkswagenVehicle(vin=vehicle_dict['vin'], garage=garage)
garage.add_vehicle(vehicle_dict['vin'], vehicle)

if 'nickname' in vehicle_dict and vehicle_dict['nickname'] is not None:
Expand All @@ -138,9 +147,34 @@ def fetch_vehicles(self) -> None:
if 'model' in vehicle_dict and vehicle_dict['model'] is not None:
vehicle.model._set_value(vehicle_dict['model']) # pylint: disable=protected-access

if 'capabilities' in vehicle_dict and vehicle_dict['capabilities'] is not None:
found_capabilities = set()
for capability_dict in vehicle_dict['capabilities']:
if 'id' in capability_dict and capability_dict['id'] is not None:
capability_id = capability_dict['id']
found_capabilities.add(capability_id)
if capability_id in vehicle.capabilities:
capability: Capability = vehicle.capabilities[capability_id]
else:
capability = Capability(capability_id=capability_id, vehicle=vehicle)
vehicle.capabilities[capability_id] = capability
if 'expirationDate' in capability_dict and capability_dict['expirationDate'] is not None:
expiration_date: datetime = robust_time_parse(capability_dict['expirationDate'])
capability.expiration_date._set_value(expiration_date) # pylint: disable=protected-access
if 'userDisablingAllowed' in capability_dict and capability_dict['userDisablingAllowed'] is not None:
# pylint: disable-next=protected-access
capability.user_disabling_allowed._set_value(capability_dict['userDisablingAllowed'])
for capability_id in vehicle.capabilities.keys() - found_capabilities:
vehicle.capabilities[capability_id].enabled = False
vehicle.capabilities.pop(capability_id)

self.fetch_vehicle_status(vehicle)
for vin in set(garage.list_vehicle_vins()) - seen_vehicle_vins:
vehicle_to_remove = garage.get_vehicle(vin)
if vehicle_to_remove is not None and vehicle_to_remove.is_managed_by_connector(self):
garage.remove_vehicle(vin)

def fetch_vehicle_status(self, vehicle: GenericVehicle) -> None:
def fetch_vehicle_status(self, vehicle: VolkswagenVehicle) -> None:
"""
Fetches the status of a vehicle from the Volkswagen API.
Expand All @@ -151,8 +185,127 @@ def fetch_vehicle_status(self, vehicle: GenericVehicle) -> None:
None
"""
vin = vehicle.vin.value
url = f'https://emea.bff.cariad.digital/vehicle/v1/vehicles/{vin}/selectivestatus?jobs=measurements'
known_capabilities: list[str] = ['access',
'activeventilation',
'automation',
'auxiliaryheating',
'userCapabilities'
'charging',
'chargingProfiles',
'batteryChargingCare',
'climatisation',
'climatisationTimers'
'departureTimers',
'fuelStatus',
'vehicleLights',
'lvBattery',
'readiness',
'vehicleHealthInspection',
'vehicleHealthWarnings',
'oilLevel',
'measurements',
'batterySupport']
jobs: list[str] = []
for capability_id in known_capabilities:
if capability_id in vehicle.capabilities.keys() and vehicle.capabilities[capability_id].enabled:
jobs.append(capability_id)

url = f'https://emea.bff.cariad.digital/vehicle/v1/vehicles/{vin}/selectivestatus?jobs=' + ','.join(jobs)
data: Dict[str, Any] | None = self._fetch_data(url, self._session)
if data is not None:
if 'measurements' in data and data['measurements'] is not None:
if 'fuelLevelStatus' in data['measurements'] and data['measurements']['fuelLevelStatus'] is not None:
if 'value' in data['measurements']['fuelLevelStatus'] and data['measurements']['fuelLevelStatus']['value'] is not None:
fuel_level_status = data['measurements']['fuelLevelStatus']['value']
captured_at: datetime = robust_time_parse(fuel_level_status['carCapturedTimestamp'])
# Check vehicle type and if it does not match the current vehicle type, create a new vehicle object using copy constructor
if 'carType' in fuel_level_status and fuel_level_status['carType'] is not None:
try:
car_type = GenericVehicle.Type(fuel_level_status['carType'])
if car_type == GenericVehicle.Type.ELECTRIC and not isinstance(vehicle, VolkswagenElectricVehicle):
vehicle = VolkswagenElectricVehicle(origin=vehicle)
elif car_type in [GenericVehicle.Type.FUEL,
GenericVehicle.Type.GASOLINE,
GenericVehicle.Type.PETROL,
GenericVehicle.Type.DIESEL,
GenericVehicle.Type.CNG,
GenericVehicle.Type.LPG] \
and not isinstance(vehicle, VolkswagenCombustionVehicle):
vehicle = VolkswagenCombustionVehicle(origin=vehicle)
elif car_type == GenericVehicle.Type.HYBRID and not isinstance(vehicle, VolkswagenHybridVehicle):
vehicle = VolkswagenHybridVehicle(origin=vehicle)
vehicle.type._set_value(car_type) # pylint: disable=protected-access
except ValueError:
LOG_API_DEBUG.warning('Unknown car type %s', fuel_level_status['carType'])
log_extra_keys(LOG_API_DEBUG, 'fuelLevelStatus', data['measurements']['fuelLevelStatus'], {'carCapturedTimestamp', 'carType'})
if 'rangeStatus' in data['measurements'] and data['measurements']['rangeStatus'] is not None:
if 'value' in data['measurements']['rangeStatus'] and data['measurements']['rangeStatus']['value'] is not None:
range_status = data['measurements']['rangeStatus']['value']
# TODO: Implement the rangeStatus
log_extra_keys(LOG_API_DEBUG, 'rangeStatus', range_status, set())
if 'odometerStatus' in data['measurements'] and data['measurements']['odometerStatus'] is not None:
if 'value' in data['measurements']['odometerStatus'] and data['measurements']['odometerStatus']['value'] is not None:
odometer_status = data['measurements']['odometerStatus']['value']
if 'carCapturedTimestamp' not in odometer_status or odometer_status['carCapturedTimestamp'] is None:
raise APIError('Could not fetch vehicle status, carCapturedTimestamp missing')
captured_at: datetime = robust_time_parse(odometer_status['carCapturedTimestamp'])
if 'odometer' in odometer_status and odometer_status['odometer'] is not None:
# pylint: disable-next=protected-access
vehicle.odometer._set_value(value=odometer_status['odometer'], measured=captured_at, unit=Length.KM)
log_extra_keys(LOG_API_DEBUG, 'odometerStatus', odometer_status, {'carCapturedTimestamp', 'odometer'})
log_extra_keys(LOG_API_DEBUG, 'measurements', data['measurements'], {'fuelLevelStatus', 'rangeStatus', 'odometerStatus'})
if 'access' in data and data['access'] is not None:
if 'accessStatus' in data['access'] and data['access']['accessStatus'] is not None:
if 'value' in data['access']['accessStatus'] and data['access']['accessStatus']['value'] is not None:
access_status = data['access']['accessStatus']['value']
if 'carCapturedTimestamp' not in access_status or access_status['carCapturedTimestamp'] is None:
raise APIError('Could not fetch vehicle status, carCapturedTimestamp missing')
captured_at: datetime = robust_time_parse(access_status['carCapturedTimestamp'])
if 'doors' in access_status and access_status['doors'] is not None:
seen_door_ids: set[str] = set()
all_doors_closed = True
for door_status in access_status['doors']:
door_id = door_status['name']
seen_door_ids.add(door_id)
if door_id in vehicle.doors.doors:
door: Doors.Door = vehicle.doors.doors[door_id]
else:
door = Doors.Door(door_id=door_id, doors=vehicle.doors)
vehicle.doors.doors[door_id] = door
if 'status' in door_status and door_status['status'] is not None:
if 'locked' in door_status['status']:
door.lock_state._set_value(Doors.LockState.LOCKED, measured=captured_at) # pylint: disable=protected-access
elif 'unlocked' in door_status['status']:
door.lock_state._set_value(Doors.LockState.UNLOCKED, measured=captured_at) # pylint: disable=protected-access
else:
door.lock_state._set_value(Doors.LockState.UNKNOWN, measured=captured_at) # pylint: disable=protected-access
if 'open' in door_status['status']:
all_doors_closed = False
door.open_state._set_value(Doors.OpenState.OPEN, measured=captured_at) # pylint: disable=protected-access
elif 'closed' in door_status['status']:
door.open_state._set_value(Doors.OpenState.CLOSED, measured=captured_at) # pylint: disable=protected-access
elif 'unsupported' in door_status['status']:
door.open_state._set_value(Doors.OpenState.UNSUPPORTED, measured=captured_at) # pylint: disable=protected-access
else:
door.open_state._set_value(Doors.OpenState.UNKNOWN, measured=captured_at) # pylint: disable=protected-access
LOG_API_DEBUG.warning('Unknown door status %s', door_status['status'])
log_extra_keys(LOG_API_DEBUG, 'doors', door_status, {'name', 'status'})
if all_doors_closed:
vehicle.doors.open_state._set_value(Doors.OpenState.CLOSED, measured=captured_at) # pylint: disable=protected-access
else:
vehicle.doors.open_state._set_value(Doors.OpenState.OPEN, measured=captured_at) # pylint: disable=protected-access
for door_id in vehicle.doors.doors.keys() - seen_door_ids:
vehicle.doors.doors[door_id].enabled = False
vehicle.doors.doors.pop(door_id)
if 'overallStatus' in access_status and access_status['overallStatus'] is not None:
if access_status['overallStatus'] == 'safe':
vehicle.doors.lock_state._set_value(Doors.LockState.LOCKED, measured=captured_at) # pylint: disable=protected-access
elif access_status['overallStatus'] == 'unsafe':
vehicle.doors.lock_state._set_value(Doors.LockState.UNLOCKED, measured=captured_at) # pylint: disable=protected-access
log_extra_keys(LOG_API_DEBUG, 'accessStatus', access_status, {'carCapturedTimestamp', 'doors', 'overallStatus'})
log_extra_keys(LOG_API_DEBUG, 'access', data['access'], {'accessStatus'})
log_extra_keys(LOG_API_DEBUG, 'selectivestatus', data, {'measurements', 'access'})

print(data)

def _record_elapsed(self, elapsed: timedelta) -> None:
Expand Down
Loading

0 comments on commit 9af0eda

Please sign in to comment.