diff --git a/rpi_power_monitor/power_monitor.py b/rpi_power_monitor/power_monitor.py index f7e9330..4c41286 100644 --- a/rpi_power_monitor/power_monitor.py +++ b/rpi_power_monitor/power_monitor.py @@ -42,7 +42,15 @@ parser.add_argument('-v', '--verbose', help='Increases verbosity of program output.', action='store_true') parser.add_argument('-V', '--version', help='Displays the power monitor software version and exits.', action='store_true') - +# Retention Policy Settings +retention_policies = { + 'rp_5min' : { + 'duration' : 'INF' + }, + 'autogen' : { # This is the default retention policy. + 'duration' : '30d' + } +} class RPiPowerMonitor: """ Class to take readings from the MCP3008 and calculate power """ @@ -69,20 +77,110 @@ def __init__(self, mode, config, spi=None): self.cleanup(-1) # Other Initializations + # Validate continuous queries and retention policies + self.validate_rps() + self.validate_cqs() self.points_buffer = [] # A buffer to hold sublists of points so that they can be written altogether (reduces DB overhead) self.def_cal = 0.88 # This is the default calibration factor for all CTs from my shop. self.terminal_mode = False + + def validate_cqs(self): + '''Ensures that the continuous queries exist in the configured Influx database, and creates them if not.''' + + retention_policies = { + '5m' : 'rp_5min' + } + + try: + db_name = self.config['database']['database_name'] + cqs = self.client.get_list_continuous_queries() + existing_cqs = [] + for db in cqs: + if db_name in db.keys(): + if len(db[db_name]) > 0: + existing_cqs = [cq['name'] for cq in db[db_name]] + + # Home Power, Energy + if 'cq_home_power_5m' not in existing_cqs: + for duration, rp_name in retention_policies.items(): + self.client.create_continuous_query(f'cq_home_power_{duration}', f'SELECT mean("power") AS "power", mean("current") AS "current" INTO "{rp_name}"."home_load_{duration}" FROM "home_load" GROUP BY time({duration})') + logger.debug(f"Created continuous query: cq_home_power_{duration}") + + if 'cq_home_energy_5m' not in existing_cqs: + for duration, rp_name in retention_policies.items(): + self.client.create_continuous_query(f'cq_home_energy_{duration}', f'''SELECT integral("power") / 3600000 AS "energy" INTO "{rp_name}"."home_energy_{duration}" FROM "home_load" GROUP BY time({duration})''') + logger.debug(f"Created continuous query: cq_home_energy_{duration}") + + # Net Power, Energy + if 'cq_net_power_5m' not in existing_cqs: + for duration, rp_name in retention_policies.items(): + self.client.create_continuous_query(f'cq_net_power_{duration}', f'SELECT mean("power") AS "power", mean("current") AS "current" INTO "{rp_name}"."net_{duration}" FROM "net" GROUP BY time({duration})') + logger.debug(f"Created continuous query: cq_net_power_{duration}") + + if 'cq_net_energy_5m' not in existing_cqs: + for duration, rp_name in retention_policies.items(): + self.client.create_continuous_query(f'cq_net_energy_{duration}', f'''SELECT integral("power") / 3600000 AS "energy" INTO "{rp_name}"."net_energy_{duration}" FROM "net" GROUP BY time({duration})''') + logger.debug(f"Created continuous query: cq_net_energy_{duration}") + + # Solar Power, Energy + if 'cq_solar_power_5m' not in existing_cqs: + for duration, rp_name in retention_policies.items(): + self.client.create_continuous_query(f'cq_solar_power_{duration}', f'SELECT mean("real_power") AS "power", mean("current") AS "current" INTO "{rp_name}"."solar_{duration}" FROM "solar" GROUP BY time({duration})') + logger.debug(f"Created continuous query: cq_solar_power_{duration}") + + if 'cq_solar_energy_5m' not in existing_cqs: + for duration, rp_name in retention_policies.items(): + self.client.create_continuous_query(f'cq_solar_energy_{duration}', f'''SELECT integral("real_power") / 3600000 AS "energy" INTO "{rp_name}"."solar_energy_{duration}" FROM "sg_solar" GROUP BY time({duration})''') + logger.debug(f"Created continuous query: cq_solar_energy_{duration}") + + # Individual CT Energies + for chan in range(1, 7): + if f'cq_ct{chan}_power_5m' not in existing_cqs: + for duration, rp_name in retention_policies.items(): + self.client.create_continuous_query(f'cq_ct{chan}_power_{duration}', f'''SELECT mean("power") AS "power", mean("current") AS "current" INTO "{rp_name}"."ct{chan}_power_{duration}" FROM "raw_cts" WHERE "ct" = '{chan}' GROUP BY time({duration})''') + logger.debug(f"Created continuous query: cq_ct{chan}_power_{duration}") + + # Individual CT Power, Energy + if f'cq_ct{chan}_energy_5m' not in existing_cqs: + for duration, rp_name in retention_policies.items(): + self.client.create_continuous_query(f'cq_ct{chan}_energy_{duration}', f'''SELECT integral("real_power") / 3600000 AS "energy" INTO "{rp_name}"."ct{chan}_energy_{duration}" FROM "raw_cts" WHERE "ct" = '{chan}' GROUP BY time({duration})''') + logger.debug(f"Created continuous query: cq_ct{chan}_energy_{duration}") + + except Exception as e: + logger.error(f"Failed to create one or more continuous queries. Message: {e}") + + return + + def validate_rps(self): + '''Ensures that the retention policies exist in the configured Influx database, and creates them if not.''' + + # Validate retention policies and continuous queries. + try: + existing_rps = self.client.get_list_retention_policies() + rp_names = [rp['name'] for rp in existing_rps] + except Exception as e: + logger.warning(f"Failed to retrieve InfluxDB Retention Policies. Is Influx running?") + self.cleanup(-1) + + try: + for rp in retention_policies.keys(): + if rp not in rp_names: + self.client.create_retention_policy(rp, retention_policies[rp]['duration'], 1, default = (True if rp == 'autogen' else False)) + logger.debug(f"Created retention policy {rp}") + except Exception as e: + logger.warning("Failed to create one or more retention policies!") + + return + def load_config(self, config_file=os.path.join('/home/pi/rpi-power-monitor/rpi_power_monitor', 'config.toml')): '''Loads the user's config.toml file and validates entries.''' + logger.debug(f"Attempting to loading config from {config_file}") invalid_settings = False - logger.debug(f"Config file path: {config_file}") if not os.path.exists(config_file): logger.error(f"Could not find your config.toml file at rpi_power_monitor/config.toml. Please ensure it exists, or, provide the config file location with the -c flag when launching the program.") - logger.debug(f"Attempting to loading config from {config_file}") - try: with open(config_file, 'rb') as f: config = tomli.load(f) @@ -175,7 +273,6 @@ def get_db_client(self): f"It appears that the database host value of {host} is not a valid IP or DNS name. Or, DNS name resolution failed. Please check this setting and your networking settings and try again.")) self.cleanup(-1) - try: client = InfluxDBClient( host=host, @@ -192,12 +289,11 @@ def get_db_client(self): self.client = None self.cleanup(-1) - # Test Client try: self.client.create_database(self.config['database']['database_name']) except ConnectionRefusedError: - logger.warning("DB connection refused - is Influx running?.") + logger.warning("DB connection refused - is Influx running?") self.cleanup(-1) except Exception as e: logger.warning(f"Failed to connect to the Influx database at {host}:{port}.")