Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
Initial release (#1)
Browse files Browse the repository at this point in the history
* Ensure plan information fetched before sensor setup

* Refactor sensor.py

* Add allowance reset sensor

* Refactor api_client

* Change session refresh time

* Added user-agent header

* Tidy up api_client

* Tweak session refresh logic

* Update documentation

* Change initial version number
  • Loading branch information
dan-r authored Apr 12, 2024
1 parent 1868420 commit 37a6f12
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 54 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# O2 for Home Assistant

An unofficial integration for fetching O2 allowance data.
An unofficial integration for fetching O2 allowance data. This has only be tested with an O2 account (not VMO2) with a single plan.

This is currently a work-in-progress.


## Installation

### HACS
Expand Down
8 changes: 4 additions & 4 deletions custom_components/o2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ async def async_setup_entry(hass, entry):
await hass.async_add_executor_job(session.create_session)

coordinator = hass.data[DOMAIN][DATA_COORDINATOR] = O2Coordinator(hass, config)


# Init coordinator
await coordinator.async_config_entry_first_refresh()

_LOGGER.debug("Initialising entities")
for component in ENTITY_TYPES:
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, component)
)

# Init fetch and state coordinators
await coordinator.async_config_entry_first_refresh()

entry.async_on_unload(entry.add_update_listener(async_update_listener))

return True
Expand Down
82 changes: 56 additions & 26 deletions custom_components/o2/api_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import requests
import logging
from time import time
from homeassistant.helpers.entity import DeviceInfo
from .const import DOMAIN
from .const import DOMAIN, USER_AGENT, INTEGRATION_VERSION

_LOGGER = logging.getLogger(__name__)

Expand All @@ -10,11 +11,15 @@ def __init__(self, email=None, password=None):
self._username = email
self._password = password
self._device_info = None

self._session_birth = 0

self.number = None

self._session = requests.Session()


# External methods

def create_session(self, username=None, password=None):
# Login URL and credentials
login_url = 'https://identity.o2.co.uk/auth/password_o2'
Expand All @@ -25,7 +30,7 @@ def create_session(self, username=None, password=None):
}

# Login to O2
login_response = self._session.post(login_url, data=login_data, allow_redirects=False)
login_response = self._session.post(login_url, data=login_data, headers={ 'User-Agent': self._get_user_agent() }, allow_redirects=False)
if login_response.status_code != 303:
raise ApiException("Failed to login to O2. Status code:", login_response.status_code)

Expand All @@ -37,46 +42,71 @@ def create_session(self, username=None, password=None):
self._username = username
self._password = password

self._session_birth = time()

_LOGGER.debug("O2 session created")
return True

def get_allowances(self):
resp = self._post('https://mymobile2.o2.co.uk/web/guest/account?p_p_id=O2UKAccountPortlet_INSTANCE_0ssTPnzpDk4K&p_p_lifecycle=2&p_p_state=normal&p_p_mode=view&p_p_resource_id=getBoltOnsAndCurrentTariff&p_p_cacheability=cacheLevelPage')

def get_device_info(self):
return self._device_info
if resp.status_code != 200:
self._session_birth = 0
raise ApiException("Failed to get allowance. Status code:", resp.status_code)

resp = resp.json()

def update_device_info(self, resp):
self.number = resp['number']
self._update_device_info(resp['tariffVM'])

return resp

self._device_info = DeviceInfo(
identifiers={(DOMAIN, "o2")},
name=resp['name'] if 'name' in resp else number,
manufacturer="O2",
model=resp['subcategory'] if 'subcategory' in resp else None,
serial_number=self.number
)

# Internal API methods

def get_csrf(self):
def _post(self, url):
# Refresh session if its over 45 mins old
if time() - self._session_birth > 2700:
self.create_session()

csrfToken = self._get_csrf()

return self._session.post(url, headers={
'X-Csrf-Token': csrfToken,
'User-Agent': self._get_user_agent()
})

def _get_csrf(self):
# Fetch account page
data_page_url = 'https://mymobile2.o2.co.uk/'
data_page_response = self._session.get(data_page_url)
data_page_response = self._session.get(data_page_url, headers={ 'User-Agent': self._get_user_agent() })

if data_page_response.status_code != 200:
raise ApiException("Failed to retrieve account page. Status code:", data_page_response.status_code)

return data_page_response.text.split("Liferay.authToken = '")[1].split("'")[0]

def get_allowances(self):
csrfToken = self.get_csrf()

allowance_url = 'https://mymobile2.o2.co.uk/web/guest/account?p_p_id=O2UKAccountPortlet_INSTANCE_0ssTPnzpDk4K&p_p_lifecycle=2&p_p_state=normal&p_p_mode=view&p_p_resource_id=getBoltOnsAndCurrentTariff&p_p_cacheability=cacheLevelPage'
allowance_response = self._session.post(allowance_url, headers={'X-Csrf-Token': csrfToken})
# Internal methods

if allowance_response.status_code != 200:
raise ApiException("Failed to get allowance. Status code:", allowance_response.status_code)
resp = allowance_response.json()
self.update_device_info(resp['tariffVM'])
def _get_user_agent(self):
return f"{USER_AGENT}/{INTEGRATION_VERSION}"

def _update_device_info(self, resp):
self.number = resp['number']

return resp
self._device_info = DeviceInfo(
identifiers={(DOMAIN, "o2")},
name=resp['name'] if 'name' in resp else number,
manufacturer="O2",
model=resp['subcategory'] if 'subcategory' in resp else None,
serial_number=self.number
)


# Simple getters

def get_device_info(self):
return self._device_info

# Exceptions
class ApiException(Exception):
Expand Down
2 changes: 2 additions & 0 deletions custom_components/o2/const.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
DOMAIN = "o2"
CONFIG_VERSION = 1
ENTITY_TYPES = ["sensor"]
USER_AGENT = "dan-r-homeassistant-o2"
INTEGRATION_VERSION = "0.1.0"

DATA_COORDINATOR = "coordinator"
DATA_APICLIENT = "apiclient"
99 changes: 76 additions & 23 deletions custom_components/o2/sensor.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,46 @@
import logging

from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity
)
from homeassistant.core import callback
from homeassistant.const import PERCENTAGE, UnitOfInformation
from .coordinator import O2Coordinator
from homeassistant.components.sensor import SensorEntity, SensorDeviceClass
from homeassistant.const import UnitOfInformation
from homeassistant.helpers.entity import generate_entity_id

from homeassistant.helpers.update_coordinator import CoordinatorEntity

from .const import DOMAIN, DATA_COORDINATOR, DATA_APICLIENT
from datetime import datetime

_LOGGER = logging.getLogger(__name__)


async def async_setup_entry(hass, config, async_add_entities):
"""Set up sensors."""
coordinator = hass.data[DOMAIN][DATA_COORDINATOR]
client = hass.data[DOMAIN][DATA_APICLIENT]

entities = [DataSensor(coordinator, hass)]
if client.number is None:
raise Exception("Fetching plan information failed")

entities = [
O2AllowanceSensor(coordinator, hass, "Data Remaining", "data", UnitOfInformation.GIGABYTES, "mdi:chart-donut"),
O2AllowanceSensor(coordinator, hass, "Texts Remaining", "text", None, "mdi:message"),
O2AllowanceSensor(coordinator, hass, "Minutes Remaining", "voice", None, "mdi:phone"),
O2AllowanceResetSensor(coordinator, hass)
]

async_add_entities(entities, update_before_add=True)


class DataSensor(CoordinatorEntity[O2Coordinator], SensorEntity):
_attr_name = "Data Remaining"
class O2AllowanceSensor(CoordinatorEntity, SensorEntity):
_attr_has_entity_name = True
_attr_device_class = SensorDeviceClass.DATA_SIZE
_attr_native_unit_of_measurement = UnitOfInformation.GIGABYTES
_attr_suggested_display_precision = 2

def __init__(self, coordinator, hass):
def __init__(self, coordinator, hass, name, data_type, unit_of_measurement, icon):
super().__init__(coordinator=coordinator)

self._client = hass.data[DOMAIN][DATA_APICLIENT]
self._attributes = {}
self._last_updated = None
self._state = None
self._attr_name = name
self._data_type = data_type
self._unit_of_measurement = unit_of_measurement
self._icon = icon

self.entity_id = generate_entity_id(
"sensor.{}", f"o2_{self._client.number}_data_remaining", hass=hass)
"sensor.{}", f"o2_{self._client.number}_{self._attr_name.lower().replace(' ', '_')}", hass=hass)

self._attr_device_info = self._client.get_device_info()

Expand All @@ -54,11 +53,65 @@ def unique_id(self) -> str:
def native_value(self):
"""Get value from data returned from API by coordinator"""
if self.coordinator.data:
return self.coordinator.data['allowancesBalance']['data'][0]['balance'] / 1024
remaining = self.coordinator.data['allowancesBalance'][self._data_type][0]
if 'balance' in remaining:
if self._unit_of_measurement == UnitOfInformation.GIGABYTES:
return remaining['balance'] / 1024
else:
return remaining['balance']
else:
return 'Unlimited'

return None

@property
def unit_of_measurement(self):
"""Return the unit of measurement of this entity, if any."""
return self._unit_of_measurement

@property
def suggested_display_precision(self):
if self._unit_of_measurement == UnitOfInformation.GIGABYTES:
return 2
return None

@property
def icon(self):
"""Icon of the sensor."""
return self._icon

class O2AllowanceResetSensor(CoordinatorEntity, SensorEntity):
_attr_has_entity_name = True
_attr_device_class = SensorDeviceClass.TIMESTAMP
_attr_name = "Allowance Reset Date"

def __init__(self, coordinator, hass):
super().__init__(coordinator=coordinator)

self._client = hass.data[DOMAIN][DATA_APICLIENT]

self.entity_id = generate_entity_id(
"sensor.{}", f"o2_{self._client.number}_{self._attr_name.lower().replace(' ', '_')}", hass=hass)

self._attr_device_info = self._client.get_device_info()

@property
def unique_id(self) -> str:
"""Return the unique ID of the sensor."""
return self.entity_id

@property
def state(self):
"""Return the state."""
if self.coordinator.data:
expiresDate = self.coordinator.data['allowancesBalance']['data'][0]['details'][0]['expiresDate']
date_obj = datetime.strptime(expiresDate, '%d %B %Y')

return date_obj.isoformat()

return None

@property
def icon(self):
"""Icon of the sensor."""
return "mdi:chart-donut"
return "mdi:calendar-range"

0 comments on commit 37a6f12

Please sign in to comment.