Skip to content

Commit

Permalink
Add retention policy and continuous query check/creation on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
David00 committed Feb 6, 2023
1 parent 10240de commit 686f843
Showing 1 changed file with 103 additions and 7 deletions.
110 changes: 103 additions & 7 deletions rpi_power_monitor/power_monitor.py
Original file line number Diff line number Diff line change
@@ -42,7 +42,15 @@
parser.add_argument('-v', '--verbose', help='Increases verbosity of program output.', action='store_true')
parser.add_argument('-V', '--version', help='Displays the power monitor software version and exits.', action='store_true')


# Retention Policy Settings
retention_policies = {
'rp_5min' : {
'duration' : 'INF'
},
'autogen' : { # This is the default retention policy.
'duration' : '30d'
}
}

class RPiPowerMonitor:
""" Class to take readings from the MCP3008 and calculate power """
@@ -69,20 +77,110 @@ def __init__(self, mode, config, spi=None):
self.cleanup(-1)

# Other Initializations
# Validate continuous queries and retention policies
self.validate_rps()
self.validate_cqs()
self.points_buffer = [] # A buffer to hold sublists of points so that they can be written altogether (reduces DB overhead)
self.def_cal = 0.88 # This is the default calibration factor for all CTs from my shop.
self.terminal_mode = False


def validate_cqs(self):
'''Ensures that the continuous queries exist in the configured Influx database, and creates them if not.'''

retention_policies = {
'5m' : 'rp_5min'
}

try:
db_name = self.config['database']['database_name']
cqs = self.client.get_list_continuous_queries()
existing_cqs = []
for db in cqs:
if db_name in db.keys():
if len(db[db_name]) > 0:
existing_cqs = [cq['name'] for cq in db[db_name]]

# Home Power, Energy
if 'cq_home_power_5m' not in existing_cqs:
for duration, rp_name in retention_policies.items():
self.client.create_continuous_query(f'cq_home_power_{duration}', f'SELECT mean("power") AS "power", mean("current") AS "current" INTO "{rp_name}"."home_load_{duration}" FROM "home_load" GROUP BY time({duration})')
logger.debug(f"Created continuous query: cq_home_power_{duration}")

if 'cq_home_energy_5m' not in existing_cqs:
for duration, rp_name in retention_policies.items():
self.client.create_continuous_query(f'cq_home_energy_{duration}', f'''SELECT integral("power") / 3600000 AS "energy" INTO "{rp_name}"."home_energy_{duration}" FROM "home_load" GROUP BY time({duration})''')
logger.debug(f"Created continuous query: cq_home_energy_{duration}")

# Net Power, Energy
if 'cq_net_power_5m' not in existing_cqs:
for duration, rp_name in retention_policies.items():
self.client.create_continuous_query(f'cq_net_power_{duration}', f'SELECT mean("power") AS "power", mean("current") AS "current" INTO "{rp_name}"."net_{duration}" FROM "net" GROUP BY time({duration})')
logger.debug(f"Created continuous query: cq_net_power_{duration}")

if 'cq_net_energy_5m' not in existing_cqs:
for duration, rp_name in retention_policies.items():
self.client.create_continuous_query(f'cq_net_energy_{duration}', f'''SELECT integral("power") / 3600000 AS "energy" INTO "{rp_name}"."net_energy_{duration}" FROM "net" GROUP BY time({duration})''')
logger.debug(f"Created continuous query: cq_net_energy_{duration}")

# Solar Power, Energy
if 'cq_solar_power_5m' not in existing_cqs:
for duration, rp_name in retention_policies.items():
self.client.create_continuous_query(f'cq_solar_power_{duration}', f'SELECT mean("real_power") AS "power", mean("current") AS "current" INTO "{rp_name}"."solar_{duration}" FROM "solar" GROUP BY time({duration})')
logger.debug(f"Created continuous query: cq_solar_power_{duration}")

if 'cq_solar_energy_5m' not in existing_cqs:
for duration, rp_name in retention_policies.items():
self.client.create_continuous_query(f'cq_solar_energy_{duration}', f'''SELECT integral("real_power") / 3600000 AS "energy" INTO "{rp_name}"."solar_energy_{duration}" FROM "sg_solar" GROUP BY time({duration})''')
logger.debug(f"Created continuous query: cq_solar_energy_{duration}")

# Individual CT Energies
for chan in range(1, 7):
if f'cq_ct{chan}_power_5m' not in existing_cqs:
for duration, rp_name in retention_policies.items():
self.client.create_continuous_query(f'cq_ct{chan}_power_{duration}', f'''SELECT mean("power") AS "power", mean("current") AS "current" INTO "{rp_name}"."ct{chan}_power_{duration}" FROM "raw_cts" WHERE "ct" = '{chan}' GROUP BY time({duration})''')
logger.debug(f"Created continuous query: cq_ct{chan}_power_{duration}")

# Individual CT Power, Energy
if f'cq_ct{chan}_energy_5m' not in existing_cqs:
for duration, rp_name in retention_policies.items():
self.client.create_continuous_query(f'cq_ct{chan}_energy_{duration}', f'''SELECT integral("real_power") / 3600000 AS "energy" INTO "{rp_name}"."ct{chan}_energy_{duration}" FROM "raw_cts" WHERE "ct" = '{chan}' GROUP BY time({duration})''')
logger.debug(f"Created continuous query: cq_ct{chan}_energy_{duration}")

except Exception as e:
logger.error(f"Failed to create one or more continuous queries. Message: {e}")

return

def validate_rps(self):
'''Ensures that the retention policies exist in the configured Influx database, and creates them if not.'''

# Validate retention policies and continuous queries.
try:
existing_rps = self.client.get_list_retention_policies()
rp_names = [rp['name'] for rp in existing_rps]
except Exception as e:
logger.warning(f"Failed to retrieve InfluxDB Retention Policies. Is Influx running?")
self.cleanup(-1)

try:
for rp in retention_policies.keys():
if rp not in rp_names:
self.client.create_retention_policy(rp, retention_policies[rp]['duration'], 1, default = (True if rp == 'autogen' else False))
logger.debug(f"Created retention policy {rp}")
except Exception as e:
logger.warning("Failed to create one or more retention policies!")

return

def load_config(self, config_file=os.path.join('/home/pi/rpi-power-monitor/rpi_power_monitor', 'config.toml')):
'''Loads the user's config.toml file and validates entries.'''

logger.debug(f"Attempting to loading config from {config_file}")
invalid_settings = False
logger.debug(f"Config file path: {config_file}")
if not os.path.exists(config_file):
logger.error(f"Could not find your config.toml file at rpi_power_monitor/config.toml. Please ensure it exists, or, provide the config file location with the -c flag when launching the program.")

logger.debug(f"Attempting to loading config from {config_file}")

try:
with open(config_file, 'rb') as f:
config = tomli.load(f)
@@ -175,7 +273,6 @@ def get_db_client(self):
f"It appears that the database host value of {host} is not a valid IP or DNS name. Or, DNS name resolution failed. Please check this setting and your networking settings and try again."))
self.cleanup(-1)


try:
client = InfluxDBClient(
host=host,
@@ -192,12 +289,11 @@ def get_db_client(self):
self.client = None
self.cleanup(-1)


# Test Client
try:
self.client.create_database(self.config['database']['database_name'])
except ConnectionRefusedError:
logger.warning("DB connection refused - is Influx running?.")
logger.warning("DB connection refused - is Influx running?")
self.cleanup(-1)
except Exception as e:
logger.warning(f"Failed to connect to the Influx database at {host}:{port}.")

0 comments on commit 686f843

Please sign in to comment.