Skip to content

Switch to MariaDB #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,7 @@ dmypy.json
*.db
*.zip
*.bk
*.db-wal
*.db-shm
*.csv
*.htm
4 changes: 3 additions & 1 deletion api.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# API Environment

export DATABASE_PATH=
export KUPO_URL=
export KUPO_PORT=
export DB_USER=
export DB_PASS=
export DB_DATABASE=
2 changes: 1 addition & 1 deletion htmx/diagnostics.service
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ User = orcfax
Type = simple
WorkingDirectory = /home/orcfax/itn-api/
ExecStart = /bin/bash -c '/home/orcfax/itn-api/htmx/start_diag.sh'
KillSignal = SIGINT
KillSignal = SIGKILL
TimeoutStopSec = 300
LimitNOFILE = 32768
Restart = always
Expand Down
2 changes: 1 addition & 1 deletion htmx/index.htm
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ <h2>Node counts</h2>
<h2>Active collector counts</h2>
<div
hx-get="{baseURL}/online_collectors"
hx-trigger="every 30s"
hx-trigger="load, every 90s"
hx-swap="innerHTML"
/>
</div>
Expand Down
10 changes: 5 additions & 5 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# requirements for the production project.

apsw==3.46.1.0
fastapi==0.115.3
humanize==4.11.0
fastapi==0.115.12
humanize==4.12.2
mariadb==1.1.12
simple-sign==0.1.0-rc.6
uvicorn==0.32.0
folium==0.19.3
uvicorn==0.34.0
folium==0.19.5
2 changes: 1 addition & 1 deletion service/itn-api.service
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ User = orcfax
Type = simple
WorkingDirectory = /home/orcfax/itn-api/
ExecStart = /bin/bash -c '/home/orcfax/itn-api/service/start_itn_api.sh'
KillSignal = SIGINT
KillSignal = SIGKILL
TimeoutStopSec = 300
LimitNOFILE = 32768
Restart = always
Expand Down
79 changes: 44 additions & 35 deletions src/itn_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
import os
import time
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Final

import apsw
import apsw.bestpractice
import mariadb
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
Expand Down Expand Up @@ -80,21 +78,23 @@
]


def _enable_best_practice(connection: apsw.Connection):
"""Enable aspw best practice."""
apsw.bestpractice.connection_wal(connection)
apsw.bestpractice.library_logging()
def _get_database_connection() -> mariadb.Connection:
"""Get a MriaDB database connection."""
connection = mariadb.connect(
user=os.environ["DB_USER"],
password=os.environ["DB_PASS"],
host="127.0.0.1",
port=3306,
database=os.environ["DB_DATABASE"],
autocommit=True,
)
return connection


@asynccontextmanager
async def lifespan(app: FastAPI):
"""Load the database connection for the life of the app.s"""
db_path = Path(os.environ["DATABASE_PATH"])
logger.info("validator database: %s", db_path)
app.state.connection = apsw.Connection(
str(db_path), flags=apsw.SQLITE_OPEN_READONLY
)
_enable_best_practice(app.state.connection)
app.state.connection = _get_database_connection()
app.state.kupo_url = os.environ["KUPO_URL"]
app.state.kupo_port = os.environ["KUPO_PORT"]
yield
Expand Down Expand Up @@ -141,35 +141,37 @@ def redirect_root_to_docs():
@app.get("/get_active_participants", tags=[TAG_STATISTICS])
async def get_active_participants():
"""Return participants in the ITN database."""
cursor = app.state.connection.cursor()
try:
participants = app.state.connection.execute(
"select distinct address from data_points;"
)
except apsw.SQLError as err:
cursor.execute("select distinct address from data_points;")
except mariadb.Error as err:
return {"error": f"{err}"}
data = [participant[0] for participant in participants]
data = [participant[0] for participant in cursor]
cursor.close()
return data


@app.get("/get_participants_counts_total", tags=[TAG_STATISTICS])
async def get_participants_counts_total():
"""Return participants total counts."""
cursor = app.state.connection.cursor()
try:
participants_count_total = app.state.connection.execute(
cursor.execute(
"select count(*) as count, address from data_points group by address order by count desc;"
)
except apsw.SQLError as err:
except mariadb.Error as err:
return {"error": f"{err}"}
return participants_count_total
res = list(cursor)
cursor.close()
return res


@app.get("/get_participants_counts_day", tags=[TAG_STATISTICS])
async def get_participants_counts_day(
date_start: str = "1970-01-01", date_end: str = "1970-01-03"
):
"""Return participants in ITN."""

report = reports.get_participants_counts_date_range(app, date_start, date_end)
report = await reports.get_participants_counts_date_range(app, date_start, date_end)
return report


Expand All @@ -180,7 +182,9 @@ async def get_participants_counts_day_csv(
date_start: str = "1970-01-01", date_end: str = "1970-01-03"
) -> str:
"""Return participants in ITN."""
report = reports.get_participants_counts_date_range(app, date_start, date_end)
logger.info("generating participant csv: get db data")
report = await reports.get_participants_counts_date_range(app, date_start, date_end)
logger.info("data retrieved for participant csv: creating count csv")
csv_report = reports.generate_participant_count_csv(report)
return csv_report

Expand Down Expand Up @@ -231,28 +235,33 @@ async def get_itn_participants() -> str:
@app.get("/online_collectors", tags=[TAG_HTMX], response_class=HTMLResponse)
async def get_online_collectors() -> str:
"""Return ITN aliases and collector counts."""
cursor = app.state.connection.cursor()
try:
participants_count = app.state.connection.execute(
cursor.execute(
"""SELECT address, COUNT(*) AS total_count,
SUM(CASE WHEN datetime(date_time) >= datetime('now', '-24 hours')
SUM(CASE WHEN date_time >= (SELECT DATE_SUB(NOW(), INTERVAL 1 DAY))
THEN 1 ELSE 0 END) AS count_24hr
FROM data_points
GROUP BY address ORDER BY total_count DESC;
"""
)
except apsw.SQLError:
except mariadb.Error:
return "zero collectors online"

participants_count = list(cursor)

try:
feed_count = app.state.connection.execute(
cursor.execute(
"""SELECT distinct feed_id
from data_points
where datetime(date_time) >= datetime('now', '-48 hours');
where date_time >= (SELECT DATE_SUB(NOW(), INTERVAL 1 DAY));
"""
)
except apsw.SQLError:
except mariadb.Error:
return "zero collectors online"

feed_count = list(cursor)

no_feeds = len(list(feed_count))

# FIXME: These can all be combined better, e.g. into a dataclass or
Expand Down Expand Up @@ -308,13 +317,13 @@ async def get_locations_map_hx():
@app.get("/count_active_participants", tags=[TAG_HTMX], response_class=HTMLResponse)
async def count_active_participants():
"""Count active participants."""
cursor = app.state.connection.cursor()
try:
participants = app.state.connection.execute(
"select count(distinct address) as count from data_points;"
)
except apsw.SQLError as err:
cursor.execute("select count(distinct address) as count from data_points;")
except mariadb.Error as err:
return {"error": f"{err}"}
data = list(participants)
data = list(cursor)
cursor.close()
return f"{data[0][0]}"


Expand Down
65 changes: 44 additions & 21 deletions src/itn_api/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from dataclasses import dataclass
from typing import List, Tuple

import apsw
import humanize
import mariadb
from fastapi import FastAPI

try:
Expand Down Expand Up @@ -85,7 +85,7 @@ def get_all_license_holders_csv(app: FastAPI, min_stake: int, sort: str) -> str:
)
csv = "idx,staking,license,value\n"
for idx, data in enumerate(alias_addr_data, 1):
stake = humanize.intcomma(data.staked).replace(",", ".")
stake = humanize.intcomma(data.staked).replace(",", "")
csv = f"{csv}{idx:0>4}, {data.staking}, {' '.join(data.licenses)}, {stake}\n"
return csv

Expand Down Expand Up @@ -158,7 +158,7 @@ def _get_addr_minute_feed_dicts(data: list, addresses: list):
for item in data:
if item[0] != addr:
continue
minutes = item[1].rsplit(":", 1)[0].strip()
minutes = str(item[1]).rsplit(":", 1)[0].strip()
feed = item[2].strip()
addr_minute_values = helpers.update_dict(
addr_minute_values, addr, f"{feed}|{minutes}"
Expand Down Expand Up @@ -232,7 +232,9 @@ def get_participants_counts_date_range(
logger.info("no addresses: '%s'", len(feeds))
addr_minute_values, addr_feed_values = _get_addr_minute_feed_dicts(data, addresses)
addr_minute_values = helpers.dedupe_dicts(addr_minute_values)
logger.info("retrieving data from kupo")
address_data = _get_basic_addr_data(app.state.kupo_url, app.state.kupo_port)
logger.info("processing json report")
report = _process_json_report(
address_data, date_start, date_end, addr_minute_values, addr_feed_values, feeds
)
Expand All @@ -243,16 +245,19 @@ def _get_participant_data_by_date_range(
app: FastAPI, date_start: str, date_end: str
) -> list:
"""Query the database and get the results."""
participants = app.state.connection.execute(
cursor = app.state.connection.cursor()
cursor.execute(
f"""
select address, date_time, feed_id
from data_points
where date_time > date('{date_start}')
and date_time < date('{date_end}')
where date_time > '{date_start}'
and date_time < '{date_end}'
order by address;
"""
)
return list(participants)
res = list(cursor)
cursor.close()
return res


def generate_participant_count_csv(report: dict) -> str:
Expand All @@ -268,8 +273,18 @@ def generate_participant_count_csv(report: dict) -> str:
rows = []
for stake_addr, value in data.items():
participant = stake_addr
license_no = value.get("license", "").replace("Validator License", "").strip()
stake = humanize.intcomma(int(value.get("stake", 0))).replace(",", ".")
try:
license_no = (
value.get("license", "").replace("Validator License", "").strip()
)
except AttributeError:
logger.error(
"cannot retrieve license no from: value '%s' stake: '%s'",
value,
stake_addr,
)
continue
stake = humanize.intcomma(int(value.get("stake", 0))).replace(",", "")
total_data_points = value.get("total_data_points", 0)
average_per_feed = value.get("average_mins_collecting_per_feed", 0)
total_collected = value.get("number_of_feeds_collected", 0)
Expand Down Expand Up @@ -297,10 +312,10 @@ def generate_participant_count_csv(report: dict) -> str:

async def get_date_ranges(app: FastAPI):
"""Return min and max dates from the database."""
min_max_dates = app.state.connection.execute(
"select min(date_time), max(date_time) from data_points;"
)
dates = list(min_max_dates)[0]
cursor = app.state.connection.cursor()
cursor.execute("select min(date_time), max(date_time) from data_points;")
dates = list(cursor)[0]
cursor.close()
return {
"earliest_date": dates[0],
"latest_date": dates[1],
Expand All @@ -315,14 +330,20 @@ async def get_locations(app: FastAPI) -> list:
* https://stackoverflow.com/a/571487

"""
cursor = app.state.connection.cursor()
try:
unique_raw_data = app.state.connection.execute(
"select min(node_id), raw_data from data_points group by node_id;"
cursor.execute(
"""select min(node_id), raw_data
from data_points
where date_time >= (SELECT date_sub(Now(), interval 60 minute))
group by node_id;
"""
)
except apsw.SQLError:
except mariadb.Error:
return "zero collectors online"

res = list(unique_raw_data)
res = list(cursor)
cursor.close()
countries = []
for item in res:
node = item[0]
Expand Down Expand Up @@ -356,18 +377,20 @@ async def get_locations_stake_key(app: FastAPI) -> list:
* https://stackoverflow.com/a/571487

"""
cursor = app.state.connection.cursor()
try:
unique_raw_data = app.state.connection.execute(
cursor.execute(
"""select node_id, raw_data, min(address), date_time
from data_points
where datetime(date_time) >= datetime('now', '-24 hours')
where date_time >= (SELECT DATE_SUB(NOW(), INTERVAL 60 minute))
group by address;
"""
)
except apsw.SQLError:
except mariadb.Error:
return "zero collectors online"

res = list(unique_raw_data)
res = list(cursor)
cursor.close()
key_loc = {}
for item in res:
node = item[0]
Expand Down
Loading