From a21f628d66acd2a0fa27731f4a1e13844d9a886d Mon Sep 17 00:00:00 2001 From: Iuri Date: Thu, 11 Apr 2024 12:13:57 +0200 Subject: [PATCH 01/10] Create Node App to Display Database Schema --- .../clickhouse-overviewer/.env.example | 4 ++ .../database/clickhouse-overviewer/app.ts | 60 +++++++++++++++++++ .../clickhouse-overviewer/package-lock.json | 47 +++++++++++++++ .../clickhouse-overviewer/package.json | 22 +++++++ .../clickhouse-overviewer/tsconfig.json | 11 ++++ 5 files changed, 144 insertions(+) create mode 100644 packages/database/clickhouse-overviewer/.env.example create mode 100644 packages/database/clickhouse-overviewer/app.ts create mode 100644 packages/database/clickhouse-overviewer/package-lock.json create mode 100644 packages/database/clickhouse-overviewer/package.json create mode 100644 packages/database/clickhouse-overviewer/tsconfig.json diff --git a/packages/database/clickhouse-overviewer/.env.example b/packages/database/clickhouse-overviewer/.env.example new file mode 100644 index 00000000..0d9598fb --- /dev/null +++ b/packages/database/clickhouse-overviewer/.env.example @@ -0,0 +1,4 @@ +DB_USER= +DB_HOST= +DB_NAME= +DB_PASSWORD= \ No newline at end of file diff --git a/packages/database/clickhouse-overviewer/app.ts b/packages/database/clickhouse-overviewer/app.ts new file mode 100644 index 00000000..63946346 --- /dev/null +++ b/packages/database/clickhouse-overviewer/app.ts @@ -0,0 +1,60 @@ +import { createClient } from '@clickhouse/client'; +import dotenv from 'dotenv'; + +dotenv.config(); + +const client = createClient({ + host: process.env.DB_HOST, + username: process.env.DB_USER, + password: process.env.DB_PASSWORD, + database: process.env.DB_NAME, +}); + +interface Table { + name: string; +} + +interface Column { + name: string; + type: string; +} + +const fetchTables = async (): Promise => { + + const resultSet = await client.query({ + query: 'SHOW TABLES', + format: 'JSONEachRow', + }); + + return resultSet.json(); +} + +const fetchColumns = async (table: string): Promise => { + + const resultSet = await client.query({ + query: `DESCRIBE ${table}`, + format: 'JSONEachRow', + }); + + return resultSet.json(); +} + +const printSchema = async () => { + + const tables = await fetchTables(); + + console.log('Database Schema:\n'); + + for (const table of tables) { + + console.log(`- ${table.name}`); + + const columns = await fetchColumns(table.name); + + for (const column of columns) { + console.log(` - ${column.name} (${column.type})`); + } + } +}; + +printSchema(); \ No newline at end of file diff --git a/packages/database/clickhouse-overviewer/package-lock.json b/packages/database/clickhouse-overviewer/package-lock.json new file mode 100644 index 00000000..978cbac7 --- /dev/null +++ b/packages/database/clickhouse-overviewer/package-lock.json @@ -0,0 +1,47 @@ +{ + "name": "clickhouse-overviewer", + "version": "1.0.0", + "lockfileVersion": 1, + "requires": true, + "dependencies": { + "@clickhouse/client": { + "version": "0.2.10", + "resolved": "https://registry.npmjs.org/@clickhouse/client/-/client-0.2.10.tgz", + "integrity": "sha512-ZwBgzjEAFN/ogS0ym5KHVbR7Hx/oYCX01qGp2baEyfN2HM73kf/7Vp3GvMHWRy+zUXISONEtFv7UTViOXnmFrg==", + "requires": { + "@clickhouse/client-common": "0.2.10" + } + }, + "@clickhouse/client-common": { + "version": "0.2.10", + "resolved": "https://registry.npmjs.org/@clickhouse/client-common/-/client-common-0.2.10.tgz", + "integrity": "sha512-BvTY0IXS96y9RUeNCpKL4HUzHmY80L0lDcGN0lmUD6zjOqYMn78+xyHYJ/AIAX7JQsc+/KwFt2soZutQTKxoGQ==" + }, + "@types/node": { + "version": "20.12.7", + "resolved": "https://registry.npmjs.org/@types/node/-/node-20.12.7.tgz", + "integrity": "sha512-wq0cICSkRLVaf3UGLMGItu/PtdY7oaXaI/RVU+xliKVOtRna3PRY57ZDfztpDL0n11vfymMUnXv8QwYCO7L1wg==", + "dev": true, + "requires": { + "undici-types": "~5.26.4" + } + }, + "dotenv": { + "version": "16.4.5", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.4.5.tgz", + "integrity": "sha512-ZmdL2rui+eB2YwhsWzjInR8LldtZHGDoQ1ugH85ppHKwpUHL7j7rN0Ti9NCnGiQbhaZ11FpR+7ao1dNsmduNUg==" + }, + "typescript": { + "version": "5.4.5", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.4.5.tgz", + "integrity": "sha512-vcI4UpRgg81oIRUFwR0WSIHKt11nJ7SAVlYNIu+QpqeyXP+gpQJy/Z4+F0aGxSE4MqwjyXvW/TzgkLAx2AGHwQ==", + "dev": true + }, + "undici-types": { + "version": "5.26.5", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", + "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==", + "dev": true + } + } +} diff --git a/packages/database/clickhouse-overviewer/package.json b/packages/database/clickhouse-overviewer/package.json new file mode 100644 index 00000000..993001d4 --- /dev/null +++ b/packages/database/clickhouse-overviewer/package.json @@ -0,0 +1,22 @@ +{ + "name": "clickhouse-overviewer", + "version": "1.0.0", + "description": "", + "main": "index.js", + "scripts": { + "build": "npx tsc", + "start": "node dist/app.js", + "launch": "npm run build && npm start" + }, + "keywords": [], + "author": "", + "license": "ISC", + "devDependencies": { + "@types/node": "^20.12.7", + "typescript": "^5.4.5" + }, + "dependencies": { + "@clickhouse/client": "^0.2.10", + "dotenv": "^16.4.5" + } +} diff --git a/packages/database/clickhouse-overviewer/tsconfig.json b/packages/database/clickhouse-overviewer/tsconfig.json new file mode 100644 index 00000000..da4c77ee --- /dev/null +++ b/packages/database/clickhouse-overviewer/tsconfig.json @@ -0,0 +1,11 @@ +{ + "compilerOptions": { + "module": "commonjs", + "esModuleInterop": true, + "target": "es6", + "moduleResolution": "node", + "sourceMap": true, + "outDir": "dist", + }, + "lib": ["es2015"] +} \ No newline at end of file From 94da72ab227c89bea67e25280636c13a00cda299 Mon Sep 17 00:00:00 2001 From: Iuri Date: Thu, 11 Apr 2024 12:36:04 +0200 Subject: [PATCH 02/10] Migrate Queries to Clickhouse --- .../client/components/ui/SummaryOverview.tsx | 12 +- packages/server/config/db.ts | 18 + packages/server/controllers/blocks.ts | 163 ++++--- packages/server/controllers/entities.ts | 155 ++++--- packages/server/controllers/epochs.ts | 241 +++++++---- packages/server/controllers/networks.ts | 26 +- packages/server/controllers/slots.ts | 409 +++++++++++------- packages/server/controllers/transactions.ts | 84 +++- packages/server/controllers/validators.ts | 303 +++++++------ packages/server/package-lock.json | 13 + packages/server/package.json | 1 + packages/server/routes/epochs.ts | 7 - packages/server/routes/slots.ts | 10 +- packages/server/routes/validators.ts | 13 +- 14 files changed, 870 insertions(+), 585 deletions(-) diff --git a/packages/client/components/ui/SummaryOverview.tsx b/packages/client/components/ui/SummaryOverview.tsx index d842df84..3bd74d32 100644 --- a/packages/client/components/ui/SummaryOverview.tsx +++ b/packages/client/components/ui/SummaryOverview.tsx @@ -23,7 +23,7 @@ const SummaryOverview = () => { // States const [summary, setSummary] = useState(null); - const [lastValidator, setLastValidator] = useState(null); + const [countActiveValidators, setCountActiveValidators] = useState(0); useEffect(() => { if (blocks && blocks.epochs) { @@ -41,7 +41,7 @@ const SummaryOverview = () => { setSummary({ epoch: lastEpochAux, slot: lastSlotAux, block_height: lastBlockAux }); } - if (network && !lastValidator) { + if (network && !countActiveValidators) { getLastValidator(); } @@ -50,13 +50,13 @@ const SummaryOverview = () => { const getLastValidator = async () => { try { - const response = await axiosClient.get('/api/validators/last', { + const response = await axiosClient.get('/api/validators/count-active-validators', { params: { network, }, }); - setLastValidator(response.data.number_active_validators); + setCountActiveValidators(response.data.count_active_validators); } catch (error) { console.log(error); } @@ -64,7 +64,7 @@ const SummaryOverview = () => { return ( <> - {summary && lastValidator !== 0 && ( + {summary && countActiveValidators !== 0 && (

@@ -85,7 +85,7 @@ const SummaryOverview = () => {

- Active Validators: {lastValidator ?? 0} + Active Validators: {countActiveValidators ?? 0}

diff --git a/packages/server/config/db.ts b/packages/server/config/db.ts index 42ee2836..a78f063a 100644 --- a/packages/server/config/db.ts +++ b/packages/server/config/db.ts @@ -1,11 +1,14 @@ import { Pool } from 'pg'; import { EventEmitter } from 'node:events'; +import { ClickHouseClient, createClient } from '@clickhouse/client'; import dotenv from 'dotenv'; dotenv.config(); export const pgPools = {}; +export const clickhouseClients: Record = {}; + export const dbConnection = async () => { try { @@ -28,6 +31,21 @@ export const dbConnection = async () => { startListeners(network.network); } + const networksClickhouse = JSON.parse(process.env.NETWORKS_CLICKHOUSE); + + if (!networksClickhouse) { + throw new Error('No networks found'); + } + + for (const network of networksClickhouse) { + clickhouseClients[network.network] = createClient({ + host: network.host, + username: network.user, + password: network.password, + database: network.name, + }); + } + console.log('Database connected'); } catch (error) { diff --git a/packages/server/controllers/blocks.ts b/packages/server/controllers/blocks.ts index 6211141d..189a2574 100644 --- a/packages/server/controllers/blocks.ts +++ b/packages/server/controllers/blocks.ts @@ -1,5 +1,5 @@ import { Request, Response } from 'express'; -import { pgPools } from '../config/db'; +import { clickhouseClients } from '../config/db'; export const getBlocks = async (req: Request, res: Response) => { @@ -7,33 +7,49 @@ export const getBlocks = async (req: Request, res: Response) => { const { network, page = 0, limit = 32 } = req.query; - const pgPool = pgPools[network as string]; + const chClient = clickhouseClients[network as string]; const skip = Number(page) * Number(limit); - const [ blocks, count ] = + const [blocksResultSet, countResultSet] = await Promise.all([ - pgPool.query(` - SELECT f_timestamp, f_slot, f_epoch - f_el_fee_recp, f_el_gas_limit, f_el_gas_used, - f_el_transactions, f_el_block_hash, f_payload_size_bytes, - f_el_block_number - FROM t_block_metrics - WHERE f_el_block_number <> 0 - ORDER BY f_el_block_number DESC - OFFSET ${skip} - LIMIT ${Number(limit)} - `), - pgPool.query(` - SELECT COUNT(*) AS count - FROM t_block_metrics - WHERE f_el_block_number <> 0 - `), + chClient.query({ + query: ` + SELECT + f_el_block_number, + f_el_block_hash, + f_timestamp, + f_slot, + f_epoch, + f_el_fee_recp, + f_el_gas_limit, + f_el_gas_used, + f_el_transactions, + f_payload_size_bytes + FROM t_block_metrics + WHERE f_el_block_number <> 0 + ORDER BY f_el_block_number DESC + LIMIT ${Number(limit)} + OFFSET ${skip} + `, + format: 'JSONEachRow', + }), + chClient.query({ + query: ` + SELECT COUNT(*) AS count + FROM t_block_metrics + WHERE f_el_block_number <> 0 + `, + format: 'JSONEachRow', + }), ]); + const blocksResult = await blocksResultSet.json(); + const countResult = await countResultSet.json(); + res.json({ - blocks: blocks.rows, - totalCount: Number(count.rows[0].count), + blocks: blocksResult, + totalCount: Number(countResult[0].count), }); } catch (error) { @@ -51,19 +67,33 @@ export const getBlockById = async (req: Request, res: Response) => { const { id } = req.params; const { network } = req.query; - const pgPool = pgPools[network as string]; + const chClient = clickhouseClients[network as string]; + + const blockResultSet = + await chClient.query({ + query: ` + SELECT + f_el_block_number, + f_el_block_hash, + f_timestamp, + f_slot, + f_epoch, + f_el_fee_recp, + f_el_gas_limit, + f_el_gas_used, + f_el_transactions, + f_payload_size_bytes + FROM t_block_metrics + WHERE f_el_block_number = ${id} + LIMIT 1 + `, + format: 'JSONEachRow', + }); - const block = await pgPool.query(` - SELECT f_el_block_number, f_timestamp, f_slot, f_epoch, - f_el_fee_recp, f_el_gas_limit, f_el_gas_used, - f_el_transactions, f_el_block_hash, f_payload_size_bytes - FROM t_block_metrics - WHERE f_el_block_number = '${id}' - LIMIT 1 - `); + const blockResult = await blockResultSet.json(); res.json({ - block: block.rows[0], + block: blockResult[0], }); } catch (error) { @@ -80,19 +110,34 @@ export const getLatestBlock = async (req: Request, res: Response) => { const { network } = req.query; - const pgPool = pgPools[network as string]; + const chClient = clickhouseClients[network as string]; + + const blockResultSet = + await chClient.query({ + query: ` + SELECT + f_el_block_number, + f_el_block_hash, + f_timestamp, + f_slot, + f_epoch, + f_el_fee_recp, + f_el_gas_limit, + f_el_gas_used, + f_el_transactions, + f_payload_size_bytes + FROM t_block_metrics + WHERE f_el_block_number <> 0 + ORDER BY f_el_block_number DESC + LIMIT 1 + `, + format: 'JSONEachRow', + }); - const block = await pgPool.query(` - SELECT f_el_block_number, f_timestamp, f_slot, f_epoch, - f_el_fee_recp, f_el_gas_limit, f_el_gas_used, - f_el_transactions, f_el_block_hash, f_payload_size_bytes - FROM t_block_metrics - ORDER BY f_el_block_number DESC - LIMIT 1 - `); + const blockResult = await blockResultSet.json(); res.json({ - block: block.rows[0], + block: blockResult[0], }); } catch (error) { @@ -110,23 +155,29 @@ export const getTransactionsByBlock = async (req: Request, res: Response) => { const { id } = req.params; const { network } = req.query; - const pgPool = pgPools[network as string]; - - const transactions = - await pgPool.query(` - SELECT f_tx_type, - f_value, - f_gas_fee_cap, - f_to, - f_hash, - f_timestamp, - f_from - FROM t_transactions - WHERE f_el_block_number = '${id}' - `); + const chClient = clickhouseClients[network as string]; + + const transactionsResultSet = + await chClient.query({ + query: ` + SELECT + f_tx_type, + f_value, + f_gas_fee_cap, + f_to, + f_hash, + f_timestamp, + f_from + FROM t_transactions + WHERE f_el_block_number = ${id} + `, + format: 'JSONEachRow', + }); + + const transactionsResult = await transactionsResultSet.json(); res.json({ - transactions: transactions.rows, + transactions: transactionsResult, }); } catch (error) { diff --git a/packages/server/controllers/entities.ts b/packages/server/controllers/entities.ts index 467c641b..32cd9ed9 100644 --- a/packages/server/controllers/entities.ts +++ b/packages/server/controllers/entities.ts @@ -1,5 +1,5 @@ import { Request, Response } from 'express'; -import { pgPools } from '../config/db'; +import { clickhouseClients } from '../config/db'; export const getEntity = async (req: Request, res: Response) => { @@ -8,63 +8,72 @@ export const getEntity = async (req: Request, res: Response) => { const { name } = req.params; const { network, numberEpochs = 225 } = req.query; - const pgPool = pgPools[network as string]; + const chClient = clickhouseClients[network as string]; - const [ entityStats, blocksProposed, entityPerformance ] = + const [ entityStatsResultSet, blocksProposedResultSet, entityPerformanceResultSet ] = await Promise.all([ - pgPool.query(` - SELECT sum(f_balance_eth) as aggregate_balance, - COUNT(CASE f_status WHEN 0 THEN 1 ELSE null END) AS deposited, - COUNT(CASE f_status WHEN 1 THEN 1 ELSE null END) AS active, - COUNT(CASE f_status WHEN 2 THEN 1 ELSE null END) AS exited, - COUNT(CASE f_status WHEN 3 THEN 1 ELSE null END) AS slashed - FROM - t_validator_last_status - LEFT OUTER JOIN - t_eth2_pubkeys ON t_validator_last_status.f_val_idx = t_eth2_pubkeys.f_val_idx - WHERE - LOWER(f_pool_name) = '${name.toLowerCase()}' - `), - pgPool.query(` - SELECT - COUNT(CASE f_proposed WHEN true THEN 1 ELSE null END) AS f_proposed, - COUNT(CASE f_proposed WHEN false THEN 1 ELSE null END) AS f_missed - FROM - t_proposer_duties - LEFT OUTER JOIN - t_eth2_pubkeys ON t_proposer_duties.f_val_idx = t_eth2_pubkeys.f_val_idx - WHERE - LOWER(f_pool_name) = '${name.toLowerCase()}' - `), - pgPool.query(` - SELECT - SUM(aggregated_rewards) AS aggregated_rewards, - SUM(aggregated_max_rewards) AS aggregated_max_rewards, - SUM(count_sync_committee) AS count_sync_committee, - SUM(count_missing_source) AS count_missing_source, - SUM(count_missing_target) AS count_missing_target, - SUM(count_missing_head) AS count_missing_head, - SUM(count_expected_attestations) AS count_expected_attestations, - SUM(proposed_blocks_performance) AS proposed_blocks_performance, - SUM(missed_blocks_performance) AS missed_blocks_performance, - SUM(number_active_vals) AS number_active_vals - FROM ( - SELECT * - FROM t_pool_summary - WHERE LOWER(f_pool_name) = '${name.toLowerCase()}' - ORDER BY f_epoch DESC - LIMIT ${Number(numberEpochs)} - ) AS subquery; - `), + chClient.query({ + query: ` + SELECT SUM(f_balance_eth) AS aggregate_balance, + COUNT(CASE vls.f_status WHEN 0 THEN 1 ELSE null END) AS deposited, + COUNT(CASE vls.f_status WHEN 1 THEN 1 ELSE null END) AS active, + COUNT(CASE vls.f_status WHEN 2 THEN 1 ELSE null END) AS exited, + COUNT(CASE vls.f_status WHEN 3 THEN 1 ELSE null END) AS slashed + FROM + t_validator_last_status vls + LEFT OUTER JOIN + t_eth2_pubkeys pk ON (vls.f_val_idx = pk.f_val_idx) AND (LOWER(pk.f_pool_name) = '${name.toLowerCase()}') + `, + format: 'JSONEachRow', + }), + chClient.query({ + query: ` + SELECT + COUNT(CASE pd.f_proposed WHEN true THEN 1 ELSE null END) AS f_proposed, + COUNT(CASE pd.f_proposed WHEN false THEN 1 ELSE null END) AS f_missed + FROM + t_proposer_duties pd + LEFT OUTER JOIN + t_eth2_pubkeys pk ON (pd.f_val_idx = pk.f_val_idx) AND (LOWER(pk.f_pool_name) = '${name.toLowerCase()}') + `, + format: 'JSONEachRow', + }), + chClient.query({ + query: ` + SELECT + SUM(aggregated_rewards) AS aggregated_rewards, + SUM(aggregated_max_rewards) AS aggregated_max_rewards, + SUM(count_sync_committee) AS count_sync_committee, + SUM(count_missing_source) AS count_missing_source, + SUM(count_missing_target) AS count_missing_target, + SUM(count_missing_head) AS count_missing_head, + SUM(count_expected_attestations) AS count_expected_attestations, + SUM(proposed_blocks_performance) AS proposed_blocks_performance, + SUM(missed_blocks_performance) AS missed_blocks_performance, + SUM(number_active_vals) AS number_active_vals + FROM ( + SELECT * + FROM t_pool_summary + WHERE LOWER(f_pool_name) = '${name.toLowerCase()}' + ORDER BY f_epoch DESC + LIMIT ${Number(numberEpochs)} + ) AS subquery; + `, + format: 'JSONEachRow', + }), ]); + const entityStatsResult = await entityStatsResultSet.json(); + const blocksProposedResult = await blocksProposedResultSet.json(); + const entityPerformanceResult = await entityPerformanceResultSet.json(); + let entity = null; - if (entityStats.rows.length > 0) { + if (entityStatsResult[0]) { entity = { - ...entityStats.rows[0], - proposed_blocks: blocksProposed.rows[0], - ...entityPerformance.rows[0] + ...entityStatsResult[0], + proposed_blocks: blocksProposedResult[0], + ...entityPerformanceResult[0] }; } @@ -80,36 +89,46 @@ export const getEntity = async (req: Request, res: Response) => { } }; - export const getEntities = async (req: Request, res: Response) => { try { const { network } = req.query; - const pgPool = pgPools[network as string]; + const chClient = clickhouseClients[network as string]; - const [entities, count] = + const [entitiesResultSet, countResultSet] = await Promise.all([ - pgPool.query(` - SELECT count(CASE f_status WHEN 1 THEN 1 ELSE null END) as act_number_validators, f_pool_name - FROM t_eth2_pubkeys - LEFT OUTER JOIN - t_validator_last_status ON t_validator_last_status.f_val_idx = t_eth2_pubkeys.f_val_idx - GROUP BY f_pool_name - `), - pgPool.query(` - SELECT COUNT(DISTINCT(f_pool_name)) AS count - FROM t_eth2_pubkeys - `), + chClient.query({ + query: ` + SELECT + COUNT(CASE vls.f_status WHEN 1 THEN 1 ELSE null END) AS act_number_validators, + pk.f_pool_name + FROM + t_validator_last_status vls + LEFT OUTER JOIN + t_eth2_pubkeys pk ON (vls.f_val_idx = pk.f_val_idx) + GROUP BY pk.f_pool_name + `, + format: 'JSONEachRow', + }), + chClient.query({ + query: ` + SELECT COUNT(DISTINCT(f_pool_name)) AS count + FROM t_eth2_pubkeys + `, + format: 'JSONEachRow', + }), ]); + + const entitiesResult = await entitiesResultSet.json(); + const countResult = await countResultSet.json(); res.json({ - entities: entities.rows, - totalCount: Number(count.rows[0].count), + entities: entitiesResult, + totalCount: Number(countResult[0].count), }); - } catch (error) { console.log(error); return res.status(500).json({ diff --git a/packages/server/controllers/epochs.ts b/packages/server/controllers/epochs.ts index 0b20bbde..15528a5b 100644 --- a/packages/server/controllers/epochs.ts +++ b/packages/server/controllers/epochs.ts @@ -1,5 +1,5 @@ import { Request, Response } from 'express'; -import { pgPools, pgListeners } from '../config/db'; +import { clickhouseClients, pgListeners } from '../config/db'; export const getEpochsStatistics = async (req: Request, res: Response) => { @@ -7,40 +7,66 @@ export const getEpochsStatistics = async (req: Request, res: Response) => { const { network, page = 0, limit = 10 } = req.query; - const pgPool = pgPools[network as string]; + const clickhouseClient = clickhouseClients[network as string]; const skip = Number(page) * Number(limit); - const [ epochsStats, blocksStats, epochsCount ] = - await Promise.all([ - pgPool.query(` - SELECT f_epoch, f_slot, f_num_att_vals, f_num_active_vals, - f_att_effective_balance_eth, f_total_effective_balance_eth, - f_missing_source, f_missing_target, f_missing_head - FROM t_epoch_metrics_summary - ORDER BY f_epoch DESC - OFFSET ${skip} - LIMIT ${Number(limit)} - `), - pgPool.query(` - SELECT (f_proposer_slot/32) AS epoch, - ARRAY_AGG(CASE WHEN f_proposed = true THEN 1 ELSE 0 END ORDER BY f_proposer_slot ASC) AS proposed_blocks - FROM t_proposer_duties - GROUP BY epoch - ORDER BY epoch DESC - OFFSET ${skip} - LIMIT ${Number(limit) + 1} - `), - pgPool.query(` - SELECT COUNT(*) AS count - FROM t_epoch_metrics_summary - `), - ]); + const [ epochsStatsResultSet, blocksStatsResultSet, epochsCountResultSet ] = + await Promise.all([ + clickhouseClient.query({ + query: ` + SELECT + f_epoch, + f_slot, + f_num_att_vals, + f_num_active_vals, + f_att_effective_balance_eth, + f_total_effective_balance_eth, + f_missing_source, + f_missing_target, + f_missing_head + FROM + t_epoch_metrics_summary + ORDER BY + f_epoch DESC + LIMIT ${Number(limit)} + OFFSET ${skip} + `, + format: 'JSONEachRow', + }), + clickhouseClient.query({ + query: ` + SELECT + CAST((f_proposer_slot / 32) AS INT) AS epoch, + groupArray(f_proposed) AS proposed_blocks + FROM + t_proposer_duties + GROUP BY + epoch + ORDER BY + epoch DESC + LIMIT ${Number(limit) + 1} + OFFSET ${skip} + `, + format: 'JSONEachRow', + }), + clickhouseClient.query({ + query: ` + SELECT COUNT(*) AS count + FROM t_epoch_metrics_summary + `, + format: 'JSONEachRow', + }), + ]); + + const epochsStatsResult: any[] = await epochsStatsResultSet.json(); + const blocksStatsResult: any[] = await blocksStatsResultSet.json(); + const epochsCountResult = await epochsCountResultSet.json(); let arrayEpochs = []; - epochsStats.rows.forEach((epoch: any) => { - const aux = blocksStats.rows.find((blocks: any) => blocks.epoch === epoch.f_epoch); + epochsStatsResult.forEach((epoch: any) => { + const aux = blocksStatsResult.find((blocks: any) => blocks.epoch === epoch.f_epoch); arrayEpochs.push({ ...epoch, ...aux, @@ -49,7 +75,7 @@ export const getEpochsStatistics = async (req: Request, res: Response) => { res.json({ epochsStats: arrayEpochs, - totalCount: Number(epochsCount.rows[0].count), + totalCount: Number(epochsCountResult[0].count), }); } catch (error) { @@ -67,34 +93,62 @@ export const getEpochById = async (req: Request, res: Response) => { const { id } = req.params; const { network } = req.query; - const pgPool = pgPools[network as string]; - - const [ epochStats, blocksProposed, withdrawals ] = + const clickhouseClient = clickhouseClients[network as string]; + + const [ epochStatsResultSet, blocksProposedResultSet, withdrawalsResultSet ] = await Promise.all([ - pgPool.query(` - SELECT f_epoch, f_slot, f_num_att_vals, f_num_active_vals, - f_att_effective_balance_eth, f_total_effective_balance_eth, - f_missing_source, f_missing_target, f_missing_head - FROM t_epoch_metrics_summary - WHERE f_epoch = '${id}' - `), - pgPool.query(` - SELECT COUNT(*) AS proposed_blocks - FROM t_proposer_duties - WHERE f_proposer_slot/32 = '${id}' AND f_proposed = true - `), - pgPool.query(` - SELECT SUM(f_amount) AS total_withdrawals - FROM t_withdrawals - WHERE f_slot/32 = '${id}' - `), + clickhouseClient.query({ + query: ` + SELECT + f_epoch, + f_slot, + f_num_att_vals, + f_num_active_vals, + f_att_effective_balance_eth, + f_total_effective_balance_eth, + f_missing_source, + f_missing_target, + f_missing_head + FROM + t_epoch_metrics_summary + WHERE + f_epoch = ${id} + `, + format: 'JSONEachRow', + }), + clickhouseClient.query({ + query: ` + SELECT + COUNT(*) AS proposed_blocks + FROM + t_proposer_duties + WHERE + CAST((f_proposer_slot / 32) AS INT) = ${id} AND f_proposed = 1 + `, + format: 'JSONEachRow', + }), + clickhouseClient.query({ + query: ` + SELECT + SUM(f_amount) AS total_withdrawals + FROM + t_withdrawals + WHERE + CAST((f_slot / 32) AS INT) = ${id} + `, + format: 'JSONEachRow', + }), ]); + const epochStatsResult = await epochStatsResultSet.json(); + const blocksProposedResult = await blocksProposedResultSet.json(); + const withdrawalsResult = await withdrawalsResultSet.json(); + res.json({ epoch: { - ...epochStats.rows[0], - ...blocksProposed.rows[0], - withdrawals: withdrawals.rows[0].total_withdrawals, + ...epochStatsResult[0], + ...blocksProposedResult[0], + withdrawals: withdrawalsResult[0].total_withdrawals, } }); @@ -106,32 +160,6 @@ export const getEpochById = async (req: Request, res: Response) => { } }; -export const getEpochStats = async (req: Request, res: Response) => { - - try { - - const { network } = req.query; - - const pgPool = pgPools[network as string]; - - const stats = - await pgPool.query(` - SELECT MIN(f_epoch) AS first, MAX(f_epoch) AS last, COUNT(DISTINCT(f_epoch)) AS count - FROM t_epoch_metrics_summary - `); - - res.json({ - stats: stats.rows[0] - }); - - } catch (error) { - console.log(error); - return res.status(500).json({ - msg: 'An error occurred on the server' - }); - } -}; - export const getSlotsByEpoch = async (req: Request, res: Response) => { try { @@ -139,28 +167,49 @@ export const getSlotsByEpoch = async (req: Request, res: Response) => { const { id } = req.params; const { network } = req.query; - const pgPool = pgPools[network as string]; - - const [ slotsEpoch, withdrawals ] = + const clickhouseClient = clickhouseClients[network as string]; + + const [ slotsEpochResultSet, withdrawalsResultSet ] = await Promise.all([ - pgPool.query(` - SELECT t_proposer_duties.*, t_eth2_pubkeys.f_pool_name - FROM t_proposer_duties - LEFT OUTER JOIN t_eth2_pubkeys ON t_proposer_duties.f_val_idx = t_eth2_pubkeys.f_val_idx - WHERE f_proposer_slot/32 = '${id}' - ORDER BY f_proposer_slot DESC - `), - pgPool.query(` - SELECT f_slot, f_amount - FROM t_withdrawals - WHERE f_slot/32 = '${id}' - `) + clickhouseClient.query({ + query: ` + SELECT + pd.f_val_idx, + pd.f_proposer_slot, + pd.f_proposed, + pk.f_pool_name + FROM + t_proposer_duties pd + LEFT OUTER JOIN + t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx + WHERE + CAST((pd.f_proposer_slot / 32) AS INT) = ${id} + ORDER BY + pd.f_proposer_slot DESC + `, + format: 'JSONEachRow', + }), + clickhouseClient.query({ + query: ` + SELECT + f_slot, + f_amount + FROM + t_withdrawals + WHERE + CAST((f_slot / 32) AS INT) = ${id} + `, + format: 'JSONEachRow', + }), ]); - const slots = slotsEpoch.rows.map((slot: any) => ({ + const slotsEpochResult: any[] = await slotsEpochResultSet.json(); + const withdrawalsResult: any[] = await withdrawalsResultSet.json(); + + const slots = slotsEpochResult.map((slot: any) => ({ ...slot, withdrawals: - withdrawals.rows + withdrawalsResult .filter((withdrawal: any) => withdrawal.f_slot === slot.f_proposer_slot) .reduce((acc: number, withdrawal: any) => acc + Number(withdrawal.f_amount), 0), })); diff --git a/packages/server/controllers/networks.ts b/packages/server/controllers/networks.ts index c66de223..033f95c9 100644 --- a/packages/server/controllers/networks.ts +++ b/packages/server/controllers/networks.ts @@ -1,12 +1,10 @@ import { Request, Response } from 'express'; -import { pgPools } from '../config/db'; - - +import { clickhouseClients } from '../config/db'; export const getNetworks = async (req: Request, res: Response) => { try { - const networksEnv = process.env.NETWORKS; + const networksEnv = process.env.NETWORKS_CLICKHOUSE; if (!networksEnv) { throw new Error("NETWORKS environment variable not set"); @@ -35,19 +33,23 @@ export const getBlockGenesis = async (req: Request, res: Response) => { const { network } = req.query; - const pgPool = pgPools[network as string]; + const chClient = clickhouseClients[network as string]; - const block_genesis = - await pgPool.query(` - SELECT f_genesis_time - FROM t_genesis; - `) + const blockGenesisResultSet = await chClient.query({ + query: ` + SELECT f_genesis_time + FROM t_genesis + LIMIT 1 + `, + format: 'JSONEachRow', + }); + + const blockGenesisResult = await blockGenesisResultSet.json(); res.json({ - block_genesis: Number(block_genesis.rows[0].f_genesis_time) * 1000 + block_genesis: Number(blockGenesisResult[0].f_genesis_time) * 1000 }); - } catch (error) { console.log(error); return res.status(500).json({ diff --git a/packages/server/controllers/slots.ts b/packages/server/controllers/slots.ts index f72e8a08..682ee914 100644 --- a/packages/server/controllers/slots.ts +++ b/packages/server/controllers/slots.ts @@ -1,5 +1,5 @@ import { Request, Response } from 'express'; -import { pgPools, pgListeners } from '../config/db'; +import { clickhouseClients, pgListeners } from '../config/db'; export const getSlots = async (req: Request, res: Response) => { @@ -20,7 +20,7 @@ export const getSlots = async (req: Request, res: Response) => { clients, } = req.query; - const pgPool = pgPools[network as string]; + const chClient = clickhouseClients[network as string]; const skip = Number(page) * Number(limit); @@ -77,54 +77,63 @@ export const getSlots = async (req: Request, res: Response) => { where.push(`(LOWER(scg.f_best_guess_single) IN (${clientsArray.join(',')}))`); } - const [ slots, count ] = + const [ slotsResultSet, countResultSet ] = await Promise.all([ - pgPool.query(` - SELECT - pd.f_proposer_slot, - pd.f_val_idx, - pd.f_proposed, - pk.f_pool_name, - COALESCE(SUM(w.f_amount), 0) AS withdrawals - FROM - t_proposer_duties pd - LEFT OUTER JOIN - t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx - LEFT OUTER JOIN - t_withdrawals w ON pd.f_proposer_slot = w.f_slot - LEFT OUTER JOIN - t_orphans o ON pd.f_proposer_slot = o.f_slot - CROSS JOIN - t_genesis g - ${joinClient} - ${where.length > 0 ? `WHERE ${where.join(' AND ')}` : ''} - GROUP BY - pd.f_proposer_slot, pd.f_val_idx, pd.f_proposed, pk.f_pool_name - ORDER BY - pd.f_proposer_slot DESC - OFFSET ${skip} - LIMIT ${Number(limit)} - `), - pgPool.query(` - SELECT - COUNT(*) AS count - FROM - t_proposer_duties pd - LEFT OUTER JOIN - t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx - LEFT OUTER JOIN - t_orphans o ON pd.f_proposer_slot = o.f_slot - CROSS JOIN - t_genesis g - ${joinClient} - ${where.length > 0 ? `WHERE ${where.join(' AND ')}` : ''} - `) + chClient.query({ + query: ` + SELECT + pd.f_proposer_slot AS f_proposer_slot, + pd.f_val_idx AS f_val_idx, + pd.f_proposed AS f_proposed, + pk.f_pool_name AS f_pool_name, + COALESCE(SUM(w.f_amount), 0) AS withdrawals + FROM + t_proposer_duties pd + LEFT OUTER JOIN + t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx + LEFT OUTER JOIN + t_withdrawals w ON pd.f_proposer_slot = w.f_slot + LEFT OUTER JOIN + t_orphans o ON pd.f_proposer_slot = o.f_slot + CROSS JOIN + t_genesis g + ${joinClient} + ${where.length > 0 ? `WHERE ${where.join(' AND ')}` : ''} + GROUP BY + pd.f_proposer_slot, pd.f_val_idx, pd.f_proposed, pk.f_pool_name + ORDER BY + pd.f_proposer_slot DESC + LIMIT ${Number(limit)} + OFFSET ${skip} + `, + format: 'JSONEachRow', + }), + chClient.query({ + query: ` + SELECT + COUNT(*) AS count + FROM + t_proposer_duties pd + LEFT OUTER JOIN + t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx + LEFT OUTER JOIN + t_orphans o ON pd.f_proposer_slot = o.f_slot + CROSS JOIN + t_genesis g + ${joinClient} + ${where.length > 0 ? `WHERE ${where.join(' AND ')}` : ''} + `, + format: 'JSONEachRow', + }), ]); + const slotsResult: any[] = await slotsResultSet.json(); + const countResult: any[] = await countResultSet.json(); + res.json({ - slots: slots.rows, - totalCount: Number(count.rows[0].count), - }); + slots: slotsResult, + totalCount: Number(countResult[0].count), + }); } catch (error) { console.log(error); @@ -140,73 +149,114 @@ export const getBlocks = async (req: Request, res: Response) => { const { network, page = 0, limit = 128 } = req.query; - const pgPool = pgPools[network as string]; + const chClient = clickhouseClients[network as string]; const skip = Number(page) * Number(limit); const select = network === 'mainnet' - ? ', t_slot_client_guesses.f_best_guess_single AS f_cl_client' + ? ', scg.f_best_guess_single AS f_cl_client' : ''; const joinDuties = network === 'mainnet' - ? 'LEFT OUTER JOIN t_slot_client_guesses ON t_proposer_duties.f_proposer_slot = t_slot_client_guesses.f_slot' + ? 'LEFT OUTER JOIN t_slot_client_guesses scg ON pd.f_proposer_slot = scg.f_slot' : ''; if (Number(page) > 0) { - const blocks = await pgPool.query(` - SELECT (f_proposer_slot/32) AS f_epoch, f_proposer_slot AS f_slot, f_proposed, t_eth2_pubkeys.f_pool_name, - t_proposer_duties.f_val_idx AS f_proposer_index ${select} - FROM t_proposer_duties - LEFT OUTER JOIN t_eth2_pubkeys ON t_proposer_duties.f_val_idx = t_eth2_pubkeys.f_val_idx - ${joinDuties} - ORDER BY f_proposer_slot DESC - OFFSET ${skip} - LIMIT ${Number(limit)} - `); + const blocksResultSet = await chClient.query({ + query: ` + SELECT + CAST((pd.f_proposer_slot / 32) AS INT) AS f_epoch, + pd.f_proposer_slot AS f_slot, + pd.f_proposed, + pk.f_pool_name, + pd.f_val_idx AS f_proposer_index + ${select} + FROM + t_proposer_duties pd + LEFT OUTER JOIN + t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx + ${joinDuties} + ORDER BY + pd.f_proposer_slot DESC + LIMIT ${Number(limit)} + OFFSET ${skip} + `, + format: 'JSONEachRow', + }); + + const blocksResult: any[] = await blocksResultSet.json(); res.json({ - blocks: blocks.rows + blocks: blocksResult, }); } else { const joinMetrics = network === 'mainnet' - ? 'LEFT OUTER JOIN t_slot_client_guesses ON t_block_metrics.f_slot = t_slot_client_guesses.f_slot' + ? 'LEFT OUTER JOIN t_slot_client_guesses scg ON bm.f_slot = scg.f_slot' : ''; - const [actualBlocks, finalBlocks] = await Promise.all([ - pgPool.query(` - SELECT (f_proposer_slot/32) AS f_epoch, f_proposer_slot AS f_slot, f_proposed, t_eth2_pubkeys.f_pool_name, - t_proposer_duties.f_val_idx AS f_proposer_index ${select} - FROM t_proposer_duties - LEFT OUTER JOIN t_eth2_pubkeys ON t_proposer_duties.f_val_idx = t_eth2_pubkeys.f_val_idx - ${joinDuties} - ORDER BY f_proposer_slot DESC - OFFSET ${skip} - LIMIT ${Number(limit)} - `), - pgPool.query(` - SELECT t_block_metrics.f_epoch, t_block_metrics.f_slot, t_eth2_pubkeys.f_pool_name, t_block_metrics.f_proposed, t_block_metrics.f_proposer_index, t_block_metrics.f_graffiti, t_block_metrics.f_el_block_number ${select} - FROM t_block_metrics - LEFT OUTER JOIN t_eth2_pubkeys ON t_block_metrics.f_proposer_index = t_eth2_pubkeys.f_val_idx - ${joinMetrics} - WHERE t_block_metrics.f_epoch IN ( - SELECT DISTINCT(f_epoch) - FROM t_block_metrics - ORDER BY f_epoch DESC - LIMIT 2 - ) - ORDER BY t_block_metrics.f_slot DESC - `) - ]) - - let arrayEpochs: any[] = [...actualBlocks.rows, ...finalBlocks.rows]; - + const [actualBlocksResultSet, finalBlocksResultSet] = + await Promise.all([ + chClient.query({ + query: ` + SELECT + CAST((pd.f_proposer_slot / 32) AS INT) AS f_epoch, + pd.f_proposer_slot AS f_slot, + pd.f_proposed, + pk.f_pool_name, + pd.f_val_idx AS f_proposer_index + ${select} + FROM + t_proposer_duties pd + LEFT OUTER JOIN + t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx + ${joinDuties} + ORDER BY + pd.f_proposer_slot DESC + LIMIT ${Number(limit)} + OFFSET ${skip} + `, + format: 'JSONEachRow', + }), + chClient.query({ + query: ` + SELECT + bm.f_epoch, + bm.f_slot AS f_slot, + pk.f_pool_name, + bm.f_proposed, + bm.f_proposer_index AS f_proposer_index, + bm.f_graffiti, + bm.f_el_block_number + ${select} + FROM + t_block_metrics bm + LEFT OUTER JOIN + t_eth2_pubkeys pk ON bm.f_proposer_index = pk.f_val_idx + ${joinMetrics} + WHERE bm.f_epoch IN ( + SELECT DISTINCT(f_epoch) + FROM t_block_metrics + ORDER BY f_epoch DESC + LIMIT 2 + ) + ORDER BY + bm.f_slot DESC + `, + format: 'JSONEachRow', + }), + ]); + + const actualBlocksResult: any[] = await actualBlocksResultSet.json(); + const finalBlocksResult: any[] = await finalBlocksResultSet.json(); + + let arrayEpochs: any[] = [...actualBlocksResult, ...finalBlocksResult]; + res.json({ - blocks: arrayEpochs - }); - + blocks: arrayEpochs, + }); } } catch (error) { @@ -224,47 +274,74 @@ export const getSlotById = async (req: Request, res: Response) => { const { id } = req.params; const { network } = req.query; - const pgPool = pgPools[network as string]; + const chClient = clickhouseClients[network as string]; const select = network === 'mainnet' - ? ', t_slot_client_guesses.f_best_guess_single AS f_cl_client' + ? ', scg.f_best_guess_single AS f_cl_client' : ''; const join = network === 'mainnet' - ? 'LEFT OUTER JOIN t_slot_client_guesses ON t_block_metrics.f_slot = t_slot_client_guesses.f_slot' + ? 'LEFT OUTER JOIN t_slot_client_guesses scg ON bm.f_slot = scg.f_slot' : ''; - const [ block, proposerDuties ] = + const [ blockResultSet, proposerDutiesResultSet ] = await Promise.all([ - pgPool.query(` - SELECT t_block_metrics.f_timestamp, t_block_metrics.f_epoch, t_block_metrics.f_slot, - t_block_metrics.f_graffiti, t_block_metrics.f_proposer_index, t_block_metrics.f_proposed, - t_block_metrics.f_attestations, t_block_metrics.f_deposits, t_block_metrics.f_proposer_slashings, - t_block_metrics.f_att_slashings, t_block_metrics.f_voluntary_exits, t_block_metrics.f_sync_bits, - t_block_metrics.f_el_fee_recp, t_block_metrics.f_el_gas_limit, t_block_metrics.f_el_gas_used, - t_block_metrics.f_el_transactions, t_block_metrics.f_el_block_hash, t_eth2_pubkeys.f_pool_name, - t_block_metrics.f_el_block_number ${select} - FROM t_block_metrics - LEFT OUTER JOIN t_eth2_pubkeys ON t_block_metrics.f_proposer_index = t_eth2_pubkeys.f_val_idx - ${join} - WHERE t_block_metrics.f_slot = '${id}' - `), - pgPool.query(` - SELECT f_proposed - FROM t_proposer_duties - WHERE f_proposer_slot = '${id}' - `), + chClient.query({ + query: ` + SELECT + bm.f_timestamp, + bm.f_epoch, + bm.f_slot, + bm.f_graffiti, + bm.f_proposer_index, + bm.f_proposed, + bm.f_attestations, + bm.f_deposits, + bm.f_proposer_slashings, + bm.f_voluntary_exits, + bm.f_sync_bits, + bm.f_el_fee_recp, + bm.f_el_gas_limit, + bm.f_el_gas_used, + bm.f_el_transactions, + bm.f_el_block_hash, + bm.f_el_block_number, + bm.f_attester_slashings AS f_att_slashings, + pk.f_pool_name + ${select} + FROM + t_block_metrics bm + LEFT OUTER JOIN + t_eth2_pubkeys pk ON bm.f_proposer_index = pk.f_val_idx + ${join} + WHERE + bm.f_slot = ${id} + `, + format: 'JSONEachRow', + }), + chClient.query({ + query: ` + SELECT + f_proposed + FROM + t_proposer_duties + WHERE + f_proposer_slot = ${id} + `, + format: 'JSONEachRow', + }), ]); - if (block.rows[0]) { - if (proposerDuties.rows.length > 0) { - block.rows[0].f_proposed = proposerDuties.rows[0].f_proposed; + const blockResult: any[] = await blockResultSet.json(); + const proposerDutiesResult: any[] = await proposerDutiesResultSet.json(); + + if (blockResult.length > 0) { + if (proposerDutiesResult.length > 0) { + blockResult[0].f_proposed = proposerDutiesResult[0].f_proposed; } res.json({ - block: { - ...block.rows[0], - }, + block: blockResult[0], }); } else { res.json({}); @@ -278,57 +355,44 @@ export const getSlotById = async (req: Request, res: Response) => { } }; -export const getSlotsStats = async (req: Request, res: Response) => { - - try { - - const { network } = req.query; - - const pgPool = pgPools[network as string]; - - const stats = - await pgPool.query(` - SELECT MIN(f_proposer_slot) AS first, MAX(f_proposer_slot) AS last, COUNT(DISTINCT(f_proposer_slot)) AS count - FROM t_proposer_duties - `); - - res.json({ - stats: stats.rows[0] - }); - - } catch (error) { - console.log(error); - return res.status(500).json({ - msg: 'An error occurred on the server' - }); - } -}; - export const getSlotsByGraffiti = async (req: Request, res: Response) => { try { + const { search } = req.params; const { network, page = 0, limit = 10 } = req.query; - const pgPool = pgPools[network as string]; - const skip = Number(page) * Number(limit); - const { id } = req.params; - - const blocks = - await pgPool.query(` - SELECT t_block_metrics.*, t_eth2_pubkeys.f_pool_name - FROM t_block_metrics - LEFT OUTER JOIN t_eth2_pubkeys ON t_block_metrics.f_proposer_index = t_eth2_pubkeys.f_val_idx - WHERE f_graffiti LIKE '%${id}%' - ORDER BY f_slot DESC - OFFSET ${skip} + const chClient = clickhouseClients[network as string]; + + const blocksResultSet = await chClient.query({ + query: ` + SELECT + bm.f_epoch, + bm.f_slot, + bm.f_graffiti, + bm.f_proposer_index, + bm.f_proposed, + pk.f_pool_name + FROM + t_block_metrics bm + LEFT OUTER JOIN + t_eth2_pubkeys pk ON bm.f_proposer_index = pk.f_val_idx + WHERE + bm.f_graffiti LIKE '%${search}%' + ORDER BY + bm.f_slot DESC LIMIT ${Number(limit)} - `); + OFFSET ${skip} + `, + format: 'JSONEachRow', + }); + + const blocksResult: any[] = await blocksResultSet.json(); res.json({ - blocks: blocks.rows, + blocks: blocksResult, }); } catch (error) { @@ -346,17 +410,26 @@ export const getWithdrawalsBySlot = async (req: Request, res: Response) => { const { id } = req.params; const { network } = req.query; - const pgPool = pgPools[network as string]; + const chClient = clickhouseClients[network as string]; + + const withdrawalsResultSet = await chClient.query({ + query: ` + SELECT + f_val_idx, + f_address, + f_amount + FROM + t_withdrawals + WHERE + f_slot = ${id} + `, + format: 'JSONEachRow', + }); - const withdrawals = - await pgPool.query(` - SELECT f_val_idx, f_address, f_amount - FROM t_withdrawals - WHERE f_slot = '${id}' - `); + const withdrawalsResult: any[] = await withdrawalsResultSet.json(); res.json({ - withdrawals: withdrawals.rows, + withdrawals: withdrawalsResult, }); } catch (error) { diff --git a/packages/server/controllers/transactions.ts b/packages/server/controllers/transactions.ts index 6dac1052..4891082d 100644 --- a/packages/server/controllers/transactions.ts +++ b/packages/server/controllers/transactions.ts @@ -1,5 +1,5 @@ import { Request, Response } from 'express'; -import { pgPools } from '../config/db'; +import { clickhouseClients } from '../config/db'; import { ADDRESS_ZERO_SHORT } from '../helpers/address'; export const getTransactions = async (req: Request, res: Response) => { @@ -8,24 +8,43 @@ export const getTransactions = async (req: Request, res: Response) => { const { network, page = 0, limit = 10 } = req.query; - const pgPool = pgPools[network as string]; + const clickhouseClient = clickhouseClients[network as string]; const skip = Number(page) * Number(limit); - const transactions = - await pgPool.query(` - SELECT f_tx_idx, f_gas_fee_cap, f_value, f_to, f_hash, f_timestamp, f_from, f_el_block_number, - f_gas_price, f_gas, f_tx_type, f_data - FROM t_transactions - ORDER by f_el_block_number DESC, f_tx_idx DESC, f_timestamp DESC - OFFSET ${skip} + const transactionsResultSet = await clickhouseClient.query({ + query: ` + SELECT + f_tx_idx, + f_gas_fee_cap, + f_value, + f_to, + f_hash, + f_timestamp, + f_from, + f_el_block_number, + f_gas_price, + f_gas, + f_tx_type, + f_data + FROM + t_transactions + ORDER BY + f_el_block_number DESC, + f_tx_idx DESC, + f_timestamp DESC LIMIT ${Number(limit)} - `); + OFFSET ${skip} + `, + format: 'JSONEachRow', + }); + + const transactionsResult: any[] = await transactionsResultSet.json(); res.json({ - transactions: transactions.rows.map((tx: any) => ({ + transactions: transactionsResult.map((tx: any) => ({ ...tx, - f_to: tx.f_to ? tx.f_to : ADDRESS_ZERO_SHORT, + f_to: tx.f_to ?? ADDRESS_ZERO_SHORT, })) }); @@ -45,24 +64,43 @@ export const getTransactionByHash = async (req: Request, res: Response) => { const { network } = req.query; - const pgPool = pgPools[network as string]; + const clickhouseClient = clickhouseClients[network as string]; + + const transactionResultSet = await clickhouseClient.query({ + query: ` + SELECT + f_tx_idx, + f_gas_fee_cap, + f_value, + f_to, + f_hash, + f_timestamp, + f_from, + f_el_block_number, + f_gas_price, + f_gas, + f_tx_type, + f_data, + f_nonce + FROM + t_transactions + WHERE + f_hash = '${hash}' + LIMIT 1 + `, + format: 'JSONEachRow', + }); - const transaction = - await pgPool.query(` - SELECT f_tx_idx, f_gas_fee_cap, f_value, f_to, f_hash, f_timestamp, f_from, f_el_block_number, - f_gas_price, f_gas, f_tx_type, f_data, f_nonce - FROM t_transactions - WHERE f_hash = '${hash}' - `); + const transactionResult = await transactionResultSet.json(); - if (!transaction.rows.length) { + if (!transactionResult[0]) { return res.json(); } res.json({ transaction: { - ...transaction.rows[0], - f_to: transaction.rows[0].f_to ? transaction.rows[0].f_to : ADDRESS_ZERO_SHORT, + ...transactionResult[0], + f_to: transactionResult[0].f_to ?? ADDRESS_ZERO_SHORT, } }); diff --git a/packages/server/controllers/validators.ts b/packages/server/controllers/validators.ts index 69fa8e33..e3cd75cd 100644 --- a/packages/server/controllers/validators.ts +++ b/packages/server/controllers/validators.ts @@ -1,5 +1,5 @@ import { Request, Response } from 'express'; -import { pgPools } from '../config/db'; +import { clickhouseClients } from '../config/db'; export const getValidators = async (req: Request, res: Response) => { @@ -7,31 +7,49 @@ export const getValidators = async (req: Request, res: Response) => { const { network, page = 0, limit = 10 } = req.query; - const pgPool = pgPools[network as string]; + const chClient = clickhouseClients[network as string]; const skip = Number(page) * Number(limit); - const [validators, count] = + const [validatorsResultSet, countResultSet] = await Promise.all([ - pgPool.query(` - SELECT t_validator_last_status.f_val_idx, t_validator_last_status.f_balance_eth, - t_eth2_pubkeys.f_pool_name, t_status.f_status - FROM t_validator_last_status - LEFT OUTER JOIN t_eth2_pubkeys ON t_validator_last_status.f_val_idx = t_eth2_pubkeys.f_val_idx - LEFT OUTER JOIN t_status ON t_validator_last_status.f_status = t_status.f_id - ORDER BY t_validator_last_status.f_val_idx DESC - OFFSET ${skip} - LIMIT ${Number(limit)} - `), - pgPool.query(` - SELECT COUNT(*) AS count - FROM t_validator_last_status - `), + chClient.query({ + query: ` + SELECT + vls.f_val_idx AS f_val_idx, + vls.f_balance_eth AS f_balance_eth, + pk.f_pool_name AS f_pool_name, + s.f_status AS f_status + FROM + t_validator_last_status vls + LEFT OUTER JOIN + t_eth2_pubkeys pk ON vls.f_val_idx = pk.f_val_idx + LEFT OUTER JOIN + t_status s ON vls.f_status = s.f_id + ORDER BY + vls.f_val_idx DESC + LIMIT ${Number(limit)} + OFFSET ${skip} + `, + format: 'JSONEachRow', + }), + chClient.query({ + query: ` + SELECT COUNT(*) AS count + FROM t_validator_last_status + `, + format: 'JSONEachRow', + }), ]); + const validatorsResult = await validatorsResultSet.json(); + const countResult = await countResultSet.json(); + + console.log(validatorsResult); + res.json({ - validators: validators.rows, - totalCount: Number(count.rows[0].count), + validators: validatorsResult, + totalCount: Number(countResult[0].count), }); } catch (error) { @@ -50,66 +68,85 @@ export const getValidatorById = async (req: Request, res: Response) => { const { network, numberEpochs = 225 } = req.query; - const pgPool = pgPools[network as string]; + const chClient = clickhouseClients[network as string]; - const [ validatorStats, validatorPerformance ] = + const [ validatorStatsResultSet, validatorPerformanceResultSet ] = await Promise.all([ - pgPool.query(` - SELECT t_validator_last_status.f_val_idx, t_validator_last_status.f_epoch, - t_validator_last_status.f_balance_eth, t_eth2_pubkeys.f_pool_name, t_status.f_status - FROM t_validator_last_status - LEFT OUTER JOIN t_eth2_pubkeys ON t_validator_last_status.f_val_idx = t_eth2_pubkeys.f_val_idx - LEFT OUTER JOIN t_status ON t_validator_last_status.f_status = t_status.f_id - WHERE t_validator_last_status.f_val_idx = '${id}' - `), - pgPool.query(` - WITH Last225Epochs AS ( - SELECT MIN(f_epoch) as start_epoch, MAX(f_epoch) as end_epoch - FROM ( - SELECT f_epoch - FROM t_validator_rewards_summary - WHERE f_val_idx = '${id}' - ORDER BY f_epoch DESC - LIMIT ${Number(numberEpochs)} - ) AS sub - ) - - SELECT - SUM(CASE WHEN f_status IN (1, 3) AND (f_reward <= f_max_reward) AND (f_reward > 0 ) THEN f_reward ELSE 0 END) as aggregated_rewards, - SUM(CASE WHEN f_status IN (1, 3) AND (f_reward <= f_max_reward) AND (f_reward > 0 ) THEN f_max_reward ELSE 0 END) as aggregated_max_rewards, - COUNT(CASE WHEN f_in_sync_committee = TRUE THEN 1 ELSE null END) as count_sync_committee, - COUNT(CASE WHEN f_missing_source = TRUE THEN 1 ELSE null END) as count_missing_source, - COUNT(CASE WHEN f_missing_target = TRUE THEN 1 ELSE null END) as count_missing_target, - COUNT(CASE WHEN f_missing_head = TRUE THEN 1 ELSE null END) as count_missing_head, - COUNT(CASE WHEN f_status IN (1, 3) THEN 1 ELSE 0 END) as count_attestations, - ( - SELECT COUNT(CASE WHEN t_proposer_duties.f_proposed = TRUE THEN 1 ELSE null END) - FROM t_proposer_duties - WHERE t_proposer_duties.f_val_idx = '${id}' - AND t_proposer_duties.f_proposer_slot/32 BETWEEN (SELECT start_epoch FROM Last225Epochs) AND (SELECT end_epoch FROM Last225Epochs) - ) as proposed_blocks_performance, - ( - SELECT COUNT(CASE WHEN t_proposer_duties.f_proposed = FALSE THEN 1 ELSE null END) - FROM t_proposer_duties - WHERE t_proposer_duties.f_val_idx = '${id}' - AND t_proposer_duties.f_proposer_slot/32 BETWEEN (SELECT start_epoch FROM Last225Epochs) AND (SELECT end_epoch FROM Last225Epochs) - ) as missed_blocks_performance - FROM t_validator_rewards_summary - LEFT JOIN t_proposer_duties - ON t_validator_rewards_summary.f_val_idx = t_proposer_duties.f_val_idx - AND t_validator_rewards_summary.f_epoch = t_proposer_duties.f_proposer_slot/32 - WHERE t_validator_rewards_summary.f_val_idx = '${id}' - AND t_validator_rewards_summary.f_epoch BETWEEN (SELECT start_epoch FROM Last225Epochs) AND (SELECT end_epoch FROM Last225Epochs) - GROUP BY t_validator_rewards_summary.f_val_idx; - `), + chClient.query({ + query: ` + SELECT + vls.f_val_idx AS f_val_idx, + vls.f_epoch AS f_epoch, + vls.f_balance_eth AS f_balance_eth, + pk.f_pool_name AS f_pool_name, + s.f_status AS f_status + FROM + t_validator_last_status vls + LEFT OUTER JOIN + t_eth2_pubkeys pk ON vls.f_val_idx = pk.f_val_idx + LEFT OUTER JOIN + t_status s ON vls.f_status = s.f_id + WHERE + vls.f_val_idx = '${id}' + `, + format: 'JSONEachRow', + }), + chClient.query({ + query: ` + WITH Last225Epochs AS ( + SELECT MIN(f_epoch) as start_epoch, MAX(f_epoch) as end_epoch + FROM ( + SELECT f_epoch + FROM t_validator_rewards_summary + WHERE f_val_idx = '${id}' + ORDER BY f_epoch DESC + LIMIT ${Number(numberEpochs)} + ) AS sub + ) + + SELECT + SUM(CASE WHEN f_status IN (1, 3) AND (f_reward <= f_max_reward) AND (f_reward > 0 ) THEN f_reward ELSE 0 END) as aggregated_rewards, + SUM(CASE WHEN f_status IN (1, 3) AND (f_reward <= f_max_reward) AND (f_reward > 0 ) THEN f_max_reward ELSE 0 END) as aggregated_max_rewards, + COUNT(CASE WHEN f_in_sync_committee = TRUE THEN 1 ELSE null END) as count_sync_committee, + COUNT(CASE WHEN f_missing_source = TRUE THEN 1 ELSE null END) as count_missing_source, + COUNT(CASE WHEN f_missing_target = TRUE THEN 1 ELSE null END) as count_missing_target, + COUNT(CASE WHEN f_missing_head = TRUE THEN 1 ELSE null END) as count_missing_head, + COUNT(CASE WHEN f_status IN (1, 3) THEN 1 ELSE 0 END) as count_attestations, + ( + SELECT COUNT(CASE WHEN t_proposer_duties.f_proposed = TRUE THEN 1 ELSE null END) + FROM t_proposer_duties + WHERE t_proposer_duties.f_val_idx = '${id}' + AND CAST((t_proposer_duties.f_proposer_slot / 32) AS INT) BETWEEN (SELECT start_epoch FROM Last225Epochs) AND (SELECT end_epoch FROM Last225Epochs) + ) as proposed_blocks_performance, + ( + SELECT COUNT(CASE WHEN t_proposer_duties.f_proposed = FALSE THEN 1 ELSE null END) + FROM t_proposer_duties + WHERE t_proposer_duties.f_val_idx = '${id}' + AND CAST((t_proposer_duties.f_proposer_slot / 32) AS INT) BETWEEN (SELECT start_epoch FROM Last225Epochs) AND (SELECT end_epoch FROM Last225Epochs) + ) as missed_blocks_performance + FROM t_validator_rewards_summary + LEFT JOIN t_proposer_duties + ON t_validator_rewards_summary.f_val_idx = t_proposer_duties.f_val_idx + AND toUInt64(t_validator_rewards_summary.f_epoch) = toUInt64(t_proposer_duties.f_proposer_slot/32) + WHERE t_validator_rewards_summary.f_val_idx = '${id}' + AND t_validator_rewards_summary.f_epoch BETWEEN (SELECT start_epoch FROM Last225Epochs) AND (SELECT end_epoch FROM Last225Epochs) + GROUP BY t_validator_rewards_summary.f_val_idx; + `, + format: 'JSONEachRow', + }), ]); + const validatorStatsResult = await validatorStatsResultSet.json(); + const validatorPerformanceResult = await validatorPerformanceResultSet.json(); + + console.log(validatorStatsResult); + let validator = null; - if (validatorStats.rows.length > 0) { + if (validatorStatsResult[0]) { validator = { - ...validatorStats.rows[0], - ...validatorPerformance.rows[0], + ...validatorStatsResult[0], + ...validatorPerformanceResult[0], }; } @@ -125,50 +162,31 @@ export const getValidatorById = async (req: Request, res: Response) => { } }; -export const getValidatorStats = async (req: Request, res: Response) => { - +export const getCountActiveValidators = async (req: Request, res: Response) => { + try { const { network } = req.query; - const pgPool = pgPools[network as string]; - - const stats = - await pgPool.query(` - SELECT MIN(f_val_idx) AS first, MAX(f_val_idx) AS last, COUNT(DISTINCT(f_val_idx)) AS count - FROM t_validator_last_status - `); + const chClient = clickhouseClients[network as string]; - res.json({ - stats: stats.rows[0] - }); + const countActiveValidatorsResultSet = + await chClient.query({ + query: ` + SELECT + COUNT(*) AS count_active_validators + FROM + t_validator_last_status vls + LEFT OUTER JOIN + t_status s ON (vls.f_status = s.f_id) AND (s.f_status = 'active') + `, + format: 'JSONEachRow', + }); - } catch (error) { - console.log(error); - return res.status(500).json({ - msg: 'An error occurred on the server' - }); - } -}; - -export const getLastValidator = async (req: Request, res: Response) => { - - try { - - const { network } = req.query; - - const pgPool = pgPools[network as string]; - - const validator_idx = - await pgPool.query(` - SELECT COUNT(*) as number_active_validators - FROM t_validator_last_status - LEFT OUTER JOIN t_status ON t_status.f_id = t_validator_last_status.f_status - WHERE t_status.f_status = 'active' - `); + const countActiveValidators = await countActiveValidatorsResultSet.json(); res.json({ - number_active_validators: validator_idx.rows[0].number_active_validators, + count_active_validators: countActiveValidators[0].count_active_validators, }); } catch (error) { @@ -186,20 +204,32 @@ export const getProposedBlocksByValidator = async (req: Request, res: Response) const { id } = req.params; const { network } = req.query; - const pgPool = pgPools[network as string]; - - const proposedBlocks = - await pgPool.query(` - SELECT t_proposer_duties.f_val_idx, t_proposer_duties.f_proposer_slot, t_proposer_duties.f_proposed, - t_eth2_pubkeys.f_pool_name - FROM t_proposer_duties - LEFT OUTER JOIN t_eth2_pubkeys ON t_proposer_duties.f_val_idx = t_eth2_pubkeys.f_val_idx - WHERE t_proposer_duties.f_val_idx = '${id}' - ORDER BY t_proposer_duties.f_proposer_slot DESC - `); + const chClient = clickhouseClients[network as string]; + + const proposedBlocksResultSet = + await chClient.query({ + query: ` + SELECT + pd.f_val_idx AS f_val_idx, + pd.f_proposer_slot AS f_proposer_slot, + pd.f_proposed AS f_proposed, + pk.f_pool_name AS f_pool_name + FROM + t_proposer_duties pd + LEFT OUTER JOIN + t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx + WHERE + pd.f_val_idx = '${id}' + ORDER BY + pd.f_proposer_slot DESC + `, + format: 'JSONEachRow', + }); + + const proposedBlocks = await proposedBlocksResultSet.json(); res.json({ - proposedBlocks: proposedBlocks.rows + proposedBlocks }); } catch (error) { @@ -217,18 +247,31 @@ export const getWithdrawalsByValidator = async (req: Request, res: Response) => const { id } = req.params; const { network } = req.query; - const pgPool = pgPools[network as string]; - - const withdrawals = - await pgPool.query(` - SELECT f_val_idx, f_slot/32 as f_epoch, f_slot, f_address, f_amount - FROM t_withdrawals - WHERE f_val_idx = '${id}' - ORDER BY f_slot DESC - `); + const chClient = clickhouseClients[network as string]; + + const withdrawalsResultSet = + await chClient.query({ + query: ` + SELECT + f_val_idx, + CAST((f_slot / 32) AS INT) AS f_epoch, + f_slot, + f_address, + f_amount + FROM + t_withdrawals + WHERE + f_val_idx = '${id}' + ORDER BY + f_slot DESC + `, + format: 'JSONEachRow', + }); + + const withdrawals = await withdrawalsResultSet.json(); res.json({ - withdrawals: withdrawals.rows + withdrawals }); } catch (error) { diff --git a/packages/server/package-lock.json b/packages/server/package-lock.json index 986490b6..646eafbc 100644 --- a/packages/server/package-lock.json +++ b/packages/server/package-lock.json @@ -4,6 +4,19 @@ "lockfileVersion": 1, "requires": true, "dependencies": { + "@clickhouse/client": { + "version": "0.2.10", + "resolved": "https://registry.npmjs.org/@clickhouse/client/-/client-0.2.10.tgz", + "integrity": "sha512-ZwBgzjEAFN/ogS0ym5KHVbR7Hx/oYCX01qGp2baEyfN2HM73kf/7Vp3GvMHWRy+zUXISONEtFv7UTViOXnmFrg==", + "requires": { + "@clickhouse/client-common": "0.2.10" + } + }, + "@clickhouse/client-common": { + "version": "0.2.10", + "resolved": "https://registry.npmjs.org/@clickhouse/client-common/-/client-common-0.2.10.tgz", + "integrity": "sha512-BvTY0IXS96y9RUeNCpKL4HUzHmY80L0lDcGN0lmUD6zjOqYMn78+xyHYJ/AIAX7JQsc+/KwFt2soZutQTKxoGQ==" + }, "@types/body-parser": { "version": "1.19.2", "resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.2.tgz", diff --git a/packages/server/package.json b/packages/server/package.json index 1c918c09..74a75fc9 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -18,6 +18,7 @@ "typescript": "^4.9.4" }, "dependencies": { + "@clickhouse/client": "^0.2.10", "cors": "^2.8.5", "dotenv": "^16.0.3", "express": "^4.18.2", diff --git a/packages/server/routes/epochs.ts b/packages/server/routes/epochs.ts index 068b806d..15cfa9d7 100644 --- a/packages/server/routes/epochs.ts +++ b/packages/server/routes/epochs.ts @@ -4,7 +4,6 @@ import { check, query } from 'express-validator'; import { getEpochsStatistics, getEpochById, - getEpochStats, getSlotsByEpoch, listenEpochNotification, } from '../controllers/epochs'; @@ -14,12 +13,6 @@ import { existsNetwork } from '../helpers/network-validator'; const router = Router(); -router.get('/stats', [ - query('network').not().isEmpty(), - query('network').custom(existsNetwork), - checkFields, -], getEpochStats); - router.get('/', [ query('network').not().isEmpty(), query('network').custom(existsNetwork), diff --git a/packages/server/routes/slots.ts b/packages/server/routes/slots.ts index 8f29d998..10cbdc10 100644 --- a/packages/server/routes/slots.ts +++ b/packages/server/routes/slots.ts @@ -5,7 +5,6 @@ import { getSlots, getBlocks, getSlotById, - getSlotsStats, getSlotsByGraffiti, getWithdrawalsBySlot, listenSlotNotification, @@ -16,12 +15,6 @@ import { existsNetwork } from '../helpers/network-validator'; const router = Router(); -router.get('/stats', [ - query('network').not().isEmpty(), - query('network').custom(existsNetwork), - checkFields, -], getSlotsStats); - router.get('/', [ query('network').not().isEmpty(), query('network').custom(existsNetwork), @@ -57,8 +50,7 @@ router.get('/:id', [ checkFields, ], getSlotById); -router.get('/graffiti/:id', [ - check('id').notEmpty().withMessage('Graffiti is required'), +router.get('/graffiti/:search', [ query('network').not().isEmpty(), query('network').custom(existsNetwork), checkFields, diff --git a/packages/server/routes/validators.ts b/packages/server/routes/validators.ts index 3c1755b3..8c10d762 100644 --- a/packages/server/routes/validators.ts +++ b/packages/server/routes/validators.ts @@ -4,8 +4,7 @@ import { check, query } from 'express-validator'; import { getValidators, getValidatorById, - getValidatorStats, - getLastValidator, + getCountActiveValidators, getProposedBlocksByValidator, getWithdrawalsByValidator, } from '../controllers/validators'; @@ -15,17 +14,11 @@ import { existsNetwork } from '../helpers/network-validator'; const router = Router(); -router.get('/stats', [ +router.get('/count-active-validators', [ query('network').not().isEmpty(), query('network').custom(existsNetwork), checkFields, -], getValidatorStats); - -router.get('/last', [ - query('network').not().isEmpty(), - query('network').custom(existsNetwork), - checkFields, -], getLastValidator); +], getCountActiveValidators); router.get('/', [ query('network').not().isEmpty(), From 983c3e6609de2340ec9ccc265f38cf9e2b67d7c9 Mon Sep 17 00:00:00 2001 From: Iuri Date: Tue, 16 Apr 2024 12:46:25 +0200 Subject: [PATCH 03/10] Migrate Listener Queries to ClickHouse --- packages/server/config/db.ts | 42 -------- packages/server/controllers/epochs.ts | 60 +++++++++-- packages/server/controllers/networks.ts | 2 +- packages/server/controllers/slots.ts | 59 ++++++++-- packages/server/helpers/network-validator.ts | 4 +- packages/server/package-lock.json | 108 ------------------- packages/server/package.json | 4 +- 7 files changed, 108 insertions(+), 171 deletions(-) diff --git a/packages/server/config/db.ts b/packages/server/config/db.ts index a78f063a..6c83f858 100644 --- a/packages/server/config/db.ts +++ b/packages/server/config/db.ts @@ -1,12 +1,8 @@ -import { Pool } from 'pg'; -import { EventEmitter } from 'node:events'; import { ClickHouseClient, createClient } from '@clickhouse/client'; import dotenv from 'dotenv'; dotenv.config(); -export const pgPools = {}; - export const clickhouseClients: Record = {}; export const dbConnection = async () => { @@ -20,24 +16,6 @@ export const dbConnection = async () => { } for (const network of networks) { - pgPools[network.network] = new Pool({ - user: network.user || '', - host: network.host || '', - database: network.name || '', - password: network.password || '', - port: Number(network.port) || 0 - }); - - startListeners(network.network); - } - - const networksClickhouse = JSON.parse(process.env.NETWORKS_CLICKHOUSE); - - if (!networksClickhouse) { - throw new Error('No networks found'); - } - - for (const network of networksClickhouse) { clickhouseClients[network.network] = createClient({ host: network.host, username: network.user, @@ -53,23 +31,3 @@ export const dbConnection = async () => { throw new Error('Error when trying to connect to the DB'); } } - -class MyEmitter extends EventEmitter {} -export const pgListeners = {}; - -const startListeners = async (network: string) => { - - const client = await pgPools[network].connect(); - pgListeners[network] = new MyEmitter(); - - client.query('LISTEN new_head'); - client.query('LISTEN new_epoch_finalized'); - - client.on('notification', (msg) => { - if (msg.channel === 'new_head') { - pgListeners[network].emit('new_head', msg); - } else if (msg.channel === 'new_epoch_finalized') { - pgListeners[network].emit('new_epoch_finalized', msg); - } - }); -} \ No newline at end of file diff --git a/packages/server/controllers/epochs.ts b/packages/server/controllers/epochs.ts index 15528a5b..a05c7848 100644 --- a/packages/server/controllers/epochs.ts +++ b/packages/server/controllers/epochs.ts @@ -1,5 +1,5 @@ import { Request, Response } from 'express'; -import { clickhouseClients, pgListeners } from '../config/db'; +import { clickhouseClients } from '../config/db'; export const getEpochsStatistics = async (req: Request, res: Response) => { @@ -232,7 +232,7 @@ export const listenEpochNotification = async (req: Request, res: Response) => { const { network } = req.query; - const pgListener = pgListeners[network as string]; + const chClient = clickhouseClients[network as string]; res.writeHead(200, { 'Content-Type': 'text/event-stream', @@ -240,13 +240,59 @@ export const listenEpochNotification = async (req: Request, res: Response) => { 'Connection': 'keep-alive' }); - pgListener?.once('new_epoch_finalized', msg => { - res.write('event: new_epoch\n'); - res.write(`data: ${msg.payload}`); - res.write('\n\n'); - res.end(); + const blockGenesisResultSet = await chClient.query({ + query: ` + SELECT f_genesis_time + FROM t_genesis + LIMIT 1 + `, + format: 'JSONEachRow', }); + const blockGenesisResult = await blockGenesisResultSet.json(); + + const genesisTime = Number(blockGenesisResult[0].f_genesis_time) * 1000; + + const nextEpoch = Math.floor((Date.now() - genesisTime) / 12000 / 32) - 1; + + const newEpochEstimatedTime = genesisTime + ((nextEpoch + 2) * 12000 * 32) + 4000; + + await new Promise((resolve) => setTimeout(resolve, newEpochEstimatedTime - Date.now())); + + let latestEpochInserted = 0; + let currentTimeout = 1000; + + do { + + if (latestEpochInserted > 0) { + await new Promise(resolve => setTimeout(resolve, currentTimeout)); + currentTimeout *= 1.5; + } + + const latestEpochResultSet = await chClient.query({ + query: ` + SELECT + f_epoch + FROM + t_epoch_metrics_summary + ORDER BY + f_epoch DESC + LIMIT 1 + `, + format: 'JSONEachRow', + }); + + const latestEpochResult = await latestEpochResultSet.json(); + + latestEpochInserted = latestEpochResult[0]?.f_epoch ?? 0; + + } while (latestEpochInserted < nextEpoch); + + res.write('event: new_epoch\n'); + res.write(`data: Epoch ${nextEpoch}`); + res.write('\n\n'); + res.end(); + } catch (error) { console.log(error); return res.status(500).json({ diff --git a/packages/server/controllers/networks.ts b/packages/server/controllers/networks.ts index 033f95c9..c176e2cc 100644 --- a/packages/server/controllers/networks.ts +++ b/packages/server/controllers/networks.ts @@ -4,7 +4,7 @@ import { clickhouseClients } from '../config/db'; export const getNetworks = async (req: Request, res: Response) => { try { - const networksEnv = process.env.NETWORKS_CLICKHOUSE; + const networksEnv = process.env.NETWORKS; if (!networksEnv) { throw new Error("NETWORKS environment variable not set"); diff --git a/packages/server/controllers/slots.ts b/packages/server/controllers/slots.ts index 682ee914..6a7fc037 100644 --- a/packages/server/controllers/slots.ts +++ b/packages/server/controllers/slots.ts @@ -1,5 +1,5 @@ import { Request, Response } from 'express'; -import { clickhouseClients, pgListeners } from '../config/db'; +import { clickhouseClients } from '../config/db'; export const getSlots = async (req: Request, res: Response) => { @@ -446,21 +446,64 @@ export const listenSlotNotification = async (req: Request, res: Response) => { const { network } = req.query; - const pgListener = pgListeners[network as string]; - + const chClient = clickhouseClients[network as string]; + res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' }); - pgListener?.once('new_head', msg => { - res.write('event: new_slot\n'); - res.write(`data: ${msg.payload}`); - res.write('\n\n'); - res.end(); + const blockGenesisResultSet = await chClient.query({ + query: ` + SELECT f_genesis_time + FROM t_genesis + LIMIT 1 + `, + format: 'JSONEachRow', }); + const blockGenesisResult = await blockGenesisResultSet.json(); + + const genesisTime = Number(blockGenesisResult[0].f_genesis_time) * 1000; + + const nextSlot = Math.floor((Date.now() - genesisTime) / 12000) + 1; + + let latestSlotInserted = 0; + let currentTimeout = 1000; + + do { + + if (latestSlotInserted > 0) { + await new Promise(resolve => setTimeout(resolve, currentTimeout)); + currentTimeout *= 1.5; + } + + const latestBlockResultSet = + await chClient.query({ + query: ` + SELECT + f_slot + FROM + t_block_metrics + ORDER BY + f_slot DESC + LIMIT 1 + `, + format: 'JSONEachRow', + }); + + const latestBlockResult: any[] = await latestBlockResultSet.json(); + + latestSlotInserted = latestBlockResult.length > 0 ? latestBlockResult[0].f_slot : 0; + + } while (latestSlotInserted < nextSlot); + + res.write('event: new_slot\n'); + res.write(`data: Slot = ${latestSlotInserted}`); + res.write('\n\n'); + res.end(); + } catch (error) { console.log(error); return res.status(500).json({ diff --git a/packages/server/helpers/network-validator.ts b/packages/server/helpers/network-validator.ts index 4cd96e79..63eab8d7 100644 --- a/packages/server/helpers/network-validator.ts +++ b/packages/server/helpers/network-validator.ts @@ -1,7 +1,7 @@ -import { pgPools } from "../config/db"; +import { clickhouseClients } from "../config/db"; export const existsNetwork = (network: string) => { - if (pgPools[network] === undefined) { + if (clickhouseClients[network] === undefined) { throw new Error('Network not found'); } diff --git a/packages/server/package-lock.json b/packages/server/package-lock.json index 646eafbc..5da0136c 100644 --- a/packages/server/package-lock.json +++ b/packages/server/package-lock.json @@ -80,17 +80,6 @@ "integrity": "sha512-KJ021B1nlQUBLopzZmPBVuGU9un7WJd/W4ya7Ih02B4Uwky5Nja0yGYav2EfYIk0RR2Q9oVhf60S2XR1BCWJ2g==", "dev": true }, - "@types/pg": { - "version": "8.6.5", - "resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.6.5.tgz", - "integrity": "sha512-tOkGtAqRVkHa/PVZicq67zuujI4Oorfglsr2IbKofDwBSysnaqSx7W1mDqFqdkGE6Fbgh+PZAl0r/BWON/mozw==", - "dev": true, - "requires": { - "@types/node": "*", - "pg-protocol": "*", - "pg-types": "^2.2.0" - } - }, "@types/qs": { "version": "6.9.7", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.7.tgz", @@ -146,11 +135,6 @@ "unpipe": "1.0.0" } }, - "buffer-writer": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/buffer-writer/-/buffer-writer-2.0.0.tgz", - "integrity": "sha512-a7ZpuTZU1TRtnwyCNW3I5dc0wWNC3VR9S++Ewyk2HHZdrO3CQJqSpd+95Us590V6AL7JqUAH2IwZ/398PmNFgw==" - }, "bytes": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.2.tgz", @@ -435,11 +419,6 @@ "ee-first": "1.1.1" } }, - "packet-reader": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/packet-reader/-/packet-reader-1.0.0.tgz", - "integrity": "sha512-HAKu/fG3HpHFO0AA8WE8q2g+gBJaZ9MG7fcKk+IJPLTGAD6Psw4443l+9DGRbOIh3/aXr7Phy0TjilYivJo5XQ==" - }, "parseurl": { "version": "1.3.3", "resolved": "https://registry.npmjs.org/parseurl/-/parseurl-1.3.3.tgz", @@ -450,83 +429,6 @@ "resolved": "https://registry.npmjs.org/path-to-regexp/-/path-to-regexp-0.1.7.tgz", "integrity": "sha512-5DFkuoqlv1uYQKxy8omFBeJPQcdoE07Kv2sferDCrAq1ohOU+MSDswDIbnx3YAM60qIOnYa53wBhXW0EbMonrQ==" }, - "pg": { - "version": "8.8.0", - "resolved": "https://registry.npmjs.org/pg/-/pg-8.8.0.tgz", - "integrity": "sha512-UXYN0ziKj+AeNNP7VDMwrehpACThH7LUl/p8TDFpEUuSejCUIwGSfxpHsPvtM6/WXFy6SU4E5RG4IJV/TZAGjw==", - "requires": { - "buffer-writer": "2.0.0", - "packet-reader": "1.0.0", - "pg-connection-string": "^2.5.0", - "pg-pool": "^3.5.2", - "pg-protocol": "^1.5.0", - "pg-types": "^2.1.0", - "pgpass": "1.x" - } - }, - "pg-connection-string": { - "version": "2.5.0", - "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.5.0.tgz", - "integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==" - }, - "pg-int8": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", - "integrity": "sha512-WCtabS6t3c8SkpDBUlb1kjOs7l66xsGdKpIPZsg4wR+B3+u9UAum2odSsF9tnvxg80h4ZxLWMy4pRjOsFIqQpw==" - }, - "pg-pool": { - "version": "3.5.2", - "resolved": "https://registry.npmjs.org/pg-pool/-/pg-pool-3.5.2.tgz", - "integrity": "sha512-His3Fh17Z4eg7oANLob6ZvH8xIVen3phEZh2QuyrIl4dQSDVEabNducv6ysROKpDNPSD+12tONZVWfSgMvDD9w==" - }, - "pg-protocol": { - "version": "1.5.0", - "resolved": "https://registry.npmjs.org/pg-protocol/-/pg-protocol-1.5.0.tgz", - "integrity": "sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ==" - }, - "pg-types": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-2.2.0.tgz", - "integrity": "sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==", - "requires": { - "pg-int8": "1.0.1", - "postgres-array": "~2.0.0", - "postgres-bytea": "~1.0.0", - "postgres-date": "~1.0.4", - "postgres-interval": "^1.1.0" - } - }, - "pgpass": { - "version": "1.0.5", - "resolved": "https://registry.npmjs.org/pgpass/-/pgpass-1.0.5.tgz", - "integrity": "sha512-FdW9r/jQZhSeohs1Z3sI1yxFQNFvMcnmfuj4WBMUTxOrAyLMaTcE1aAMBiTlbMNaXvBCQuVi0R7hd8udDSP7ug==", - "requires": { - "split2": "^4.1.0" - } - }, - "postgres-array": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-2.0.0.tgz", - "integrity": "sha512-VpZrUqU5A69eQyW2c5CA1jtLecCsN2U/bD6VilrFDWq5+5UIEVO7nazS3TEcHf1zuPYO/sqGvUvW62g86RXZuA==" - }, - "postgres-bytea": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-1.0.0.tgz", - "integrity": "sha512-xy3pmLuQqRBZBXDULy7KbaitYqLcmxigw14Q5sj8QBVLqEwXfeybIKVWiqAXTlcvdvb0+xkOtDbfQMOf4lST1w==" - }, - "postgres-date": { - "version": "1.0.7", - "resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-1.0.7.tgz", - "integrity": "sha512-suDmjLVQg78nMK2UZ454hAG+OAW+HQPZ6n++TNDUX+L0+uUlLywnoxJKDou51Zm+zTCjrCl0Nq6J9C5hP9vK/Q==" - }, - "postgres-interval": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-1.2.0.tgz", - "integrity": "sha512-9ZhXKM/rw350N1ovuWHbGxnGh/SNJ4cnxHiM0rxE4VN41wsg8P8zWn9hv/buK00RP4WvlOyr/RBDiptyxVbkZQ==", - "requires": { - "xtend": "^4.0.0" - } - }, "proxy-addr": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", @@ -623,11 +525,6 @@ "object-inspect": "^1.9.0" } }, - "split2": { - "version": "4.1.0", - "resolved": "https://registry.npmjs.org/split2/-/split2-4.1.0.tgz", - "integrity": "sha512-VBiJxFkxiXRlUIeyMQi8s4hgvKCSjtknJv/LVYbrgALPwf5zSKmEwV9Lst25AkvMDnvxODugjdl6KZgwKM1WYQ==" - }, "statuses": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", @@ -672,11 +569,6 @@ "version": "1.1.2", "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", "integrity": "sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==" - }, - "xtend": { - "version": "4.0.2", - "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", - "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==" } } } diff --git a/packages/server/package.json b/packages/server/package.json index 74a75fc9..b1b0fd0f 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -14,7 +14,6 @@ "devDependencies": { "@types/cors": "^2.8.13", "@types/express": "^4.17.14", - "@types/pg": "^8.6.5", "typescript": "^4.9.4" }, "dependencies": { @@ -22,7 +21,6 @@ "cors": "^2.8.5", "dotenv": "^16.0.3", "express": "^4.18.2", - "express-validator": "^6.14.2", - "pg": "^8.8.0" + "express-validator": "^6.14.2" } } From effef51c6f6303c7e8f5a4d2bd10fbe70d961e93 Mon Sep 17 00:00:00 2001 From: Iuri Date: Tue, 16 Apr 2024 13:03:42 +0200 Subject: [PATCH 04/10] Fix Epoch Metrics Query --- packages/server/controllers/epochs.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/server/controllers/epochs.ts b/packages/server/controllers/epochs.ts index a05c7848..258be63d 100644 --- a/packages/server/controllers/epochs.ts +++ b/packages/server/controllers/epochs.ts @@ -38,7 +38,7 @@ export const getEpochsStatistics = async (req: Request, res: Response) => { query: ` SELECT CAST((f_proposer_slot / 32) AS INT) AS epoch, - groupArray(f_proposed) AS proposed_blocks + groupArray(CASE WHEN f_proposed = 1 THEN 1 ELSE 0 END) AS proposed_blocks FROM t_proposer_duties GROUP BY @@ -66,7 +66,7 @@ export const getEpochsStatistics = async (req: Request, res: Response) => { let arrayEpochs = []; epochsStatsResult.forEach((epoch: any) => { - const aux = blocksStatsResult.find((blocks: any) => blocks.epoch === epoch.f_epoch); + const aux = blocksStatsResult.find((blocks: any) => Number(blocks.epoch) === Number(epoch.f_epoch)); arrayEpochs.push({ ...epoch, ...aux, From 5075de59e606a16e5ac3c5c648faae32cc3fe854 Mon Sep 17 00:00:00 2001 From: Iuri Date: Wed, 17 Apr 2024 12:37:05 +0200 Subject: [PATCH 05/10] Update ClickHouse settings in db.ts and fix data type casting in various controllers --- packages/server/config/db.ts | 3 +++ packages/server/controllers/epochs.ts | 14 +++++++------- packages/server/controllers/slots.ts | 8 ++++---- packages/server/controllers/validators.ts | 6 +++--- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/packages/server/config/db.ts b/packages/server/config/db.ts index 6c83f858..474f1c1a 100644 --- a/packages/server/config/db.ts +++ b/packages/server/config/db.ts @@ -21,6 +21,9 @@ export const dbConnection = async () => { username: network.user, password: network.password, database: network.name, + clickhouse_settings: { + output_format_json_quote_64bit_integers: 0, + }, }); } diff --git a/packages/server/controllers/epochs.ts b/packages/server/controllers/epochs.ts index 258be63d..2b9a6e2d 100644 --- a/packages/server/controllers/epochs.ts +++ b/packages/server/controllers/epochs.ts @@ -37,7 +37,7 @@ export const getEpochsStatistics = async (req: Request, res: Response) => { clickhouseClient.query({ query: ` SELECT - CAST((f_proposer_slot / 32) AS INT) AS epoch, + CAST((f_proposer_slot / 32) AS UInt64) AS epoch, groupArray(CASE WHEN f_proposed = 1 THEN 1 ELSE 0 END) AS proposed_blocks FROM t_proposer_duties @@ -68,8 +68,8 @@ export const getEpochsStatistics = async (req: Request, res: Response) => { epochsStatsResult.forEach((epoch: any) => { const aux = blocksStatsResult.find((blocks: any) => Number(blocks.epoch) === Number(epoch.f_epoch)); arrayEpochs.push({ - ...epoch, - ...aux, + ...epoch, + proposed_blocks: aux?.proposed_blocks, }); }); @@ -123,7 +123,7 @@ export const getEpochById = async (req: Request, res: Response) => { FROM t_proposer_duties WHERE - CAST((f_proposer_slot / 32) AS INT) = ${id} AND f_proposed = 1 + CAST((f_proposer_slot / 32) AS UInt64) = ${id} AND f_proposed = 1 `, format: 'JSONEachRow', }), @@ -134,7 +134,7 @@ export const getEpochById = async (req: Request, res: Response) => { FROM t_withdrawals WHERE - CAST((f_slot / 32) AS INT) = ${id} + CAST((f_slot / 32) AS UInt64) = ${id} `, format: 'JSONEachRow', }), @@ -183,7 +183,7 @@ export const getSlotsByEpoch = async (req: Request, res: Response) => { LEFT OUTER JOIN t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx WHERE - CAST((pd.f_proposer_slot / 32) AS INT) = ${id} + CAST((pd.f_proposer_slot / 32) AS UInt64) = ${id} ORDER BY pd.f_proposer_slot DESC `, @@ -197,7 +197,7 @@ export const getSlotsByEpoch = async (req: Request, res: Response) => { FROM t_withdrawals WHERE - CAST((f_slot / 32) AS INT) = ${id} + CAST((f_slot / 32) AS UInt64) = ${id} `, format: 'JSONEachRow', }), diff --git a/packages/server/controllers/slots.ts b/packages/server/controllers/slots.ts index 6a7fc037..8b9df703 100644 --- a/packages/server/controllers/slots.ts +++ b/packages/server/controllers/slots.ts @@ -166,7 +166,7 @@ export const getBlocks = async (req: Request, res: Response) => { const blocksResultSet = await chClient.query({ query: ` SELECT - CAST((pd.f_proposer_slot / 32) AS INT) AS f_epoch, + CAST((pd.f_proposer_slot / 32) AS UInt64) AS f_epoch, pd.f_proposer_slot AS f_slot, pd.f_proposed, pk.f_pool_name, @@ -202,7 +202,7 @@ export const getBlocks = async (req: Request, res: Response) => { chClient.query({ query: ` SELECT - CAST((pd.f_proposer_slot / 32) AS INT) AS f_epoch, + CAST((pd.f_proposer_slot / 32) AS UInt64) AS f_epoch, pd.f_proposer_slot AS f_slot, pd.f_proposed, pk.f_pool_name, @@ -291,9 +291,9 @@ export const getSlotById = async (req: Request, res: Response) => { SELECT bm.f_timestamp, bm.f_epoch, - bm.f_slot, + bm.f_slot AS f_slot, bm.f_graffiti, - bm.f_proposer_index, + bm.f_proposer_index AS f_proposer_index, bm.f_proposed, bm.f_attestations, bm.f_deposits, diff --git a/packages/server/controllers/validators.ts b/packages/server/controllers/validators.ts index e3cd75cd..edec2d4b 100644 --- a/packages/server/controllers/validators.ts +++ b/packages/server/controllers/validators.ts @@ -116,13 +116,13 @@ export const getValidatorById = async (req: Request, res: Response) => { SELECT COUNT(CASE WHEN t_proposer_duties.f_proposed = TRUE THEN 1 ELSE null END) FROM t_proposer_duties WHERE t_proposer_duties.f_val_idx = '${id}' - AND CAST((t_proposer_duties.f_proposer_slot / 32) AS INT) BETWEEN (SELECT start_epoch FROM Last225Epochs) AND (SELECT end_epoch FROM Last225Epochs) + AND CAST((t_proposer_duties.f_proposer_slot / 32) AS UInt64) BETWEEN (SELECT start_epoch FROM Last225Epochs) AND (SELECT end_epoch FROM Last225Epochs) ) as proposed_blocks_performance, ( SELECT COUNT(CASE WHEN t_proposer_duties.f_proposed = FALSE THEN 1 ELSE null END) FROM t_proposer_duties WHERE t_proposer_duties.f_val_idx = '${id}' - AND CAST((t_proposer_duties.f_proposer_slot / 32) AS INT) BETWEEN (SELECT start_epoch FROM Last225Epochs) AND (SELECT end_epoch FROM Last225Epochs) + AND CAST((t_proposer_duties.f_proposer_slot / 32) AS UInt64) BETWEEN (SELECT start_epoch FROM Last225Epochs) AND (SELECT end_epoch FROM Last225Epochs) ) as missed_blocks_performance FROM t_validator_rewards_summary LEFT JOIN t_proposer_duties @@ -254,7 +254,7 @@ export const getWithdrawalsByValidator = async (req: Request, res: Response) => query: ` SELECT f_val_idx, - CAST((f_slot / 32) AS INT) AS f_epoch, + CAST((f_slot / 32) AS UInt64) AS f_epoch, f_slot, f_address, f_amount From 6b3cdd81ebb437c49328c839481b983782ad0f87 Mon Sep 17 00:00:00 2001 From: Iuri Date: Wed, 17 Apr 2024 12:52:18 +0200 Subject: [PATCH 06/10] Refactor Queries Code: Remove Network Condition from Additional t_slot_client_guesses Joins --- packages/server/controllers/slots.ts | 60 ++++++++++------------------ 1 file changed, 21 insertions(+), 39 deletions(-) diff --git a/packages/server/controllers/slots.ts b/packages/server/controllers/slots.ts index 8b9df703..2c84d6fc 100644 --- a/packages/server/controllers/slots.ts +++ b/packages/server/controllers/slots.ts @@ -68,11 +68,7 @@ export const getSlots = async (req: Request, res: Response) => { where.push(`(pk.f_pool_name IN (${entitiesArray.join(',')}))`); } - let joinClient = ''; - - if (network === 'mainnet' && clients && Array.isArray(clients) && clients.length > 0) { - joinClient = 'LEFT OUTER JOIN t_slot_client_guesses scg ON pd.f_proposer_slot = scg.f_slot'; - + if (clients && Array.isArray(clients) && clients.length > 0) { const clientsArray = clients.map(x => typeof x === 'string' ? `'${x.toLowerCase()}'` : '').filter(x => x !== ''); where.push(`(LOWER(scg.f_best_guess_single) IN (${clientsArray.join(',')}))`); } @@ -97,7 +93,8 @@ export const getSlots = async (req: Request, res: Response) => { t_orphans o ON pd.f_proposer_slot = o.f_slot CROSS JOIN t_genesis g - ${joinClient} + LEFT OUTER JOIN + t_slot_client_guesses scg ON pd.f_proposer_slot = scg.f_slot ${where.length > 0 ? `WHERE ${where.join(' AND ')}` : ''} GROUP BY pd.f_proposer_slot, pd.f_val_idx, pd.f_proposed, pk.f_pool_name @@ -120,7 +117,8 @@ export const getSlots = async (req: Request, res: Response) => { t_orphans o ON pd.f_proposer_slot = o.f_slot CROSS JOIN t_genesis g - ${joinClient} + LEFT OUTER JOIN + t_slot_client_guesses scg ON pd.f_proposer_slot = scg.f_slot ${where.length > 0 ? `WHERE ${where.join(' AND ')}` : ''} `, format: 'JSONEachRow', @@ -153,14 +151,6 @@ export const getBlocks = async (req: Request, res: Response) => { const skip = Number(page) * Number(limit); - const select = network === 'mainnet' - ? ', scg.f_best_guess_single AS f_cl_client' - : ''; - - const joinDuties = network === 'mainnet' - ? 'LEFT OUTER JOIN t_slot_client_guesses scg ON pd.f_proposer_slot = scg.f_slot' - : ''; - if (Number(page) > 0) { const blocksResultSet = await chClient.query({ @@ -170,13 +160,14 @@ export const getBlocks = async (req: Request, res: Response) => { pd.f_proposer_slot AS f_slot, pd.f_proposed, pk.f_pool_name, - pd.f_val_idx AS f_proposer_index - ${select} + pd.f_val_idx AS f_proposer_index, + scg.f_best_guess_single AS f_cl_client FROM t_proposer_duties pd LEFT OUTER JOIN t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx - ${joinDuties} + LEFT OUTER JOIN + t_slot_client_guesses scg ON pd.f_proposer_slot = scg.f_slot ORDER BY pd.f_proposer_slot DESC LIMIT ${Number(limit)} @@ -193,10 +184,6 @@ export const getBlocks = async (req: Request, res: Response) => { } else { - const joinMetrics = network === 'mainnet' - ? 'LEFT OUTER JOIN t_slot_client_guesses scg ON bm.f_slot = scg.f_slot' - : ''; - const [actualBlocksResultSet, finalBlocksResultSet] = await Promise.all([ chClient.query({ @@ -206,13 +193,14 @@ export const getBlocks = async (req: Request, res: Response) => { pd.f_proposer_slot AS f_slot, pd.f_proposed, pk.f_pool_name, - pd.f_val_idx AS f_proposer_index - ${select} + pd.f_val_idx AS f_proposer_index, + scg.f_best_guess_single AS f_cl_client FROM t_proposer_duties pd LEFT OUTER JOIN t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx - ${joinDuties} + LEFT OUTER JOIN + t_slot_client_guesses scg ON pd.f_proposer_slot = scg.f_slot ORDER BY pd.f_proposer_slot DESC LIMIT ${Number(limit)} @@ -229,13 +217,14 @@ export const getBlocks = async (req: Request, res: Response) => { bm.f_proposed, bm.f_proposer_index AS f_proposer_index, bm.f_graffiti, - bm.f_el_block_number - ${select} + bm.f_el_block_number, + scg.f_best_guess_single AS f_cl_client FROM t_block_metrics bm LEFT OUTER JOIN t_eth2_pubkeys pk ON bm.f_proposer_index = pk.f_val_idx - ${joinMetrics} + LEFT OUTER JOIN + t_slot_client_guesses scg ON bm.f_slot = scg.f_slot WHERE bm.f_epoch IN ( SELECT DISTINCT(f_epoch) FROM t_block_metrics @@ -276,14 +265,6 @@ export const getSlotById = async (req: Request, res: Response) => { const chClient = clickhouseClients[network as string]; - const select = network === 'mainnet' - ? ', scg.f_best_guess_single AS f_cl_client' - : ''; - - const join = network === 'mainnet' - ? 'LEFT OUTER JOIN t_slot_client_guesses scg ON bm.f_slot = scg.f_slot' - : ''; - const [ blockResultSet, proposerDutiesResultSet ] = await Promise.all([ chClient.query({ @@ -307,13 +288,14 @@ export const getSlotById = async (req: Request, res: Response) => { bm.f_el_block_hash, bm.f_el_block_number, bm.f_attester_slashings AS f_att_slashings, - pk.f_pool_name - ${select} + pk.f_pool_name, + scg.f_best_guess_single AS f_cl_client FROM t_block_metrics bm LEFT OUTER JOIN t_eth2_pubkeys pk ON bm.f_proposer_index = pk.f_val_idx - ${join} + LEFT OUTER JOIN + t_slot_client_guesses scg ON bm.f_slot = scg.f_slot WHERE bm.f_slot = ${id} `, From 2238915bfb1bea06bc6b1c1857d7bc4f8c280ff2 Mon Sep 17 00:00:00 2001 From: Iuri Date: Thu, 18 Apr 2024 12:50:29 +0200 Subject: [PATCH 07/10] Remove Logging Messages --- packages/server/controllers/validators.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/server/controllers/validators.ts b/packages/server/controllers/validators.ts index edec2d4b..45f58c9d 100644 --- a/packages/server/controllers/validators.ts +++ b/packages/server/controllers/validators.ts @@ -45,8 +45,6 @@ export const getValidators = async (req: Request, res: Response) => { const validatorsResult = await validatorsResultSet.json(); const countResult = await countResultSet.json(); - console.log(validatorsResult); - res.json({ validators: validatorsResult, totalCount: Number(countResult[0].count), @@ -139,8 +137,6 @@ export const getValidatorById = async (req: Request, res: Response) => { const validatorStatsResult = await validatorStatsResultSet.json(); const validatorPerformanceResult = await validatorPerformanceResultSet.json(); - console.log(validatorStatsResult); - let validator = null; if (validatorStatsResult[0]) { From 4340d5eef1a19b126d8ff6baf49b9c7104cb3c84 Mon Sep 17 00:00:00 2001 From: Iuri Date: Thu, 18 Apr 2024 12:52:16 +0200 Subject: [PATCH 08/10] Optimize Transaction Query: Order by f_slot Instead of f_el_block_number --- packages/server/controllers/transactions.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/server/controllers/transactions.ts b/packages/server/controllers/transactions.ts index 4891082d..6de0eb05 100644 --- a/packages/server/controllers/transactions.ts +++ b/packages/server/controllers/transactions.ts @@ -30,7 +30,7 @@ export const getTransactions = async (req: Request, res: Response) => { FROM t_transactions ORDER BY - f_el_block_number DESC, + f_slot DESC, f_tx_idx DESC, f_timestamp DESC LIMIT ${Number(limit)} From a565928c67864b325fdb0668ebe5b28442268d4a Mon Sep 17 00:00:00 2001 From: Iuri Pons Date: Wed, 17 Jul 2024 20:41:51 +0200 Subject: [PATCH 09/10] Add Error Trace; Fix Queries --- packages/client/components/ui/LinkEntity.tsx | 4 +- packages/server/config/db.ts | 10 +- packages/server/controllers/entities.ts | 84 ++++----- packages/server/controllers/epochs.ts | 139 +++++++------- packages/server/controllers/slots.ts | 181 ++++++++----------- 5 files changed, 192 insertions(+), 226 deletions(-) diff --git a/packages/client/components/ui/LinkEntity.tsx b/packages/client/components/ui/LinkEntity.tsx index b0f908a7..e437c19b 100644 --- a/packages/client/components/ui/LinkEntity.tsx +++ b/packages/client/components/ui/LinkEntity.tsx @@ -14,7 +14,7 @@ type Props = { const LinkEntity = ({ entity, children, mxAuto }: Props) => { return ( { > {children ?? ( <> -

{entity ?? 'Others'}

+

{entity || 'Others'}

)} diff --git a/packages/server/config/db.ts b/packages/server/config/db.ts index 474f1c1a..6842b712 100644 --- a/packages/server/config/db.ts +++ b/packages/server/config/db.ts @@ -1,4 +1,4 @@ -import { ClickHouseClient, createClient } from '@clickhouse/client'; +import { ClickHouseClient, ClickHouseLogLevel, createClient } from '@clickhouse/client'; import dotenv from 'dotenv'; dotenv.config(); @@ -6,9 +6,7 @@ dotenv.config(); export const clickhouseClients: Record = {}; export const dbConnection = async () => { - try { - const networks = JSON.parse(process.env.NETWORKS); if (!networks) { @@ -24,13 +22,15 @@ export const dbConnection = async () => { clickhouse_settings: { output_format_json_quote_64bit_integers: 0, }, + log: { + level: process.env.CLICKHOUSE_TRACE === 'True' ? ClickHouseLogLevel.TRACE : ClickHouseLogLevel.OFF, + }, }); } console.log('Database connected'); - } catch (error) { console.log(error); throw new Error('Error when trying to connect to the DB'); } -} +}; diff --git a/packages/server/controllers/entities.ts b/packages/server/controllers/entities.ts index 32cd9ed9..42151c79 100644 --- a/packages/server/controllers/entities.ts +++ b/packages/server/controllers/entities.ts @@ -2,45 +2,42 @@ import { Request, Response } from 'express'; import { clickhouseClients } from '../config/db'; export const getEntity = async (req: Request, res: Response) => { - try { - const { name } = req.params; const { network, numberEpochs = 225 } = req.query; const chClient = clickhouseClients[network as string]; - const [ entityStatsResultSet, blocksProposedResultSet, entityPerformanceResultSet ] = - await Promise.all([ - chClient.query({ - query: ` - SELECT SUM(f_balance_eth) AS aggregate_balance, + const [entityStatsResultSet, blocksProposedResultSet, entityPerformanceResultSet] = await Promise.all([ + chClient.query({ + query: ` + SELECT SUM(f_balance_eth) AS aggregate_balance, COUNT(CASE vls.f_status WHEN 0 THEN 1 ELSE null END) AS deposited, COUNT(CASE vls.f_status WHEN 1 THEN 1 ELSE null END) AS active, COUNT(CASE vls.f_status WHEN 2 THEN 1 ELSE null END) AS exited, COUNT(CASE vls.f_status WHEN 3 THEN 1 ELSE null END) AS slashed - FROM + FROM t_validator_last_status vls - LEFT OUTER JOIN + INNER JOIN t_eth2_pubkeys pk ON (vls.f_val_idx = pk.f_val_idx) AND (LOWER(pk.f_pool_name) = '${name.toLowerCase()}') `, - format: 'JSONEachRow', - }), - chClient.query({ - query: ` - SELECT + format: 'JSONEachRow', + }), + chClient.query({ + query: ` + SELECT COUNT(CASE pd.f_proposed WHEN true THEN 1 ELSE null END) AS f_proposed, COUNT(CASE pd.f_proposed WHEN false THEN 1 ELSE null END) AS f_missed FROM t_proposer_duties pd - LEFT OUTER JOIN + INNER JOIN t_eth2_pubkeys pk ON (pd.f_val_idx = pk.f_val_idx) AND (LOWER(pk.f_pool_name) = '${name.toLowerCase()}') `, - format: 'JSONEachRow', - }), - chClient.query({ - query: ` - SELECT + format: 'JSONEachRow', + }), + chClient.query({ + query: ` + SELECT SUM(aggregated_rewards) AS aggregated_rewards, SUM(aggregated_max_rewards) AS aggregated_max_rewards, SUM(count_sync_committee) AS count_sync_committee, @@ -59,9 +56,9 @@ export const getEntity = async (req: Request, res: Response) => { LIMIT ${Number(numberEpochs)} ) AS subquery; `, - format: 'JSONEachRow', - }), - ]); + format: 'JSONEachRow', + }), + ]); const entityStatsResult = await entityStatsResultSet.json(); const blocksProposedResult = await blocksProposedResultSet.json(); @@ -73,66 +70,61 @@ export const getEntity = async (req: Request, res: Response) => { entity = { ...entityStatsResult[0], proposed_blocks: blocksProposedResult[0], - ...entityPerformanceResult[0] + ...entityPerformanceResult[0], }; } res.json({ - entity + entity, }); - } catch (error) { console.log(error); return res.status(500).json({ - msg: 'An error occurred on the server' + msg: 'An error occurred on the server', }); } }; export const getEntities = async (req: Request, res: Response) => { - try { - const { network } = req.query; const chClient = clickhouseClients[network as string]; - const [entitiesResultSet, countResultSet] = - await Promise.all([ - chClient.query({ - query: ` - SELECT + const [entitiesResultSet, countResultSet] = await Promise.all([ + chClient.query({ + query: ` + SELECT COUNT(CASE vls.f_status WHEN 1 THEN 1 ELSE null END) AS act_number_validators, pk.f_pool_name - FROM + FROM t_validator_last_status vls - LEFT OUTER JOIN + LEFT OUTER JOIN t_eth2_pubkeys pk ON (vls.f_val_idx = pk.f_val_idx) GROUP BY pk.f_pool_name `, - format: 'JSONEachRow', - }), - chClient.query({ - query: ` + format: 'JSONEachRow', + }), + chClient.query({ + query: ` SELECT COUNT(DISTINCT(f_pool_name)) AS count FROM t_eth2_pubkeys `, - format: 'JSONEachRow', - }), - ]); + format: 'JSONEachRow', + }), + ]); const entitiesResult = await entitiesResultSet.json(); const countResult = await countResultSet.json(); - + res.json({ entities: entitiesResult, totalCount: Number(countResult[0].count), }); - } catch (error) { console.log(error); return res.status(500).json({ - msg: 'An error occurred on the server' + msg: 'An error occurred on the server', }); } }; diff --git a/packages/server/controllers/epochs.ts b/packages/server/controllers/epochs.ts index 2b9a6e2d..f8e719ba 100644 --- a/packages/server/controllers/epochs.ts +++ b/packages/server/controllers/epochs.ts @@ -2,18 +2,16 @@ import { Request, Response } from 'express'; import { clickhouseClients } from '../config/db'; export const getEpochsStatistics = async (req: Request, res: Response) => { - try { - const { network, page = 0, limit = 10 } = req.query; const clickhouseClient = clickhouseClients[network as string]; - + const skip = Number(page) * Number(limit); - const [ epochsStatsResultSet, blocksStatsResultSet, epochsCountResultSet ] = - await Promise.all([ - clickhouseClient.query({ + const [epochsStatsResultSet, blocksStatsResultSet, epochsCountResultSet] = await Promise.all([ + clickhouseClient + .query({ query: ` SELECT f_epoch, @@ -33,8 +31,13 @@ export const getEpochsStatistics = async (req: Request, res: Response) => { OFFSET ${skip} `, format: 'JSONEachRow', + }) + .catch((err: any) => { + console.error('Error executing epochsStats query:', err); + throw new Error('Failed to execute epochsStats query'); }), - clickhouseClient.query({ + clickhouseClient + .query({ query: ` SELECT CAST((f_proposer_slot / 32) AS UInt64) AS epoch, @@ -49,15 +52,24 @@ export const getEpochsStatistics = async (req: Request, res: Response) => { OFFSET ${skip} `, format: 'JSONEachRow', + }) + .catch((err: any) => { + console.error('Error executing blocksStats query:', err); + throw new Error('Failed to execute blocksStats query'); }), - clickhouseClient.query({ + clickhouseClient + .query({ query: ` SELECT COUNT(*) AS count FROM t_epoch_metrics_summary `, format: 'JSONEachRow', + }) + .catch((err: any) => { + console.error('Error executing epochsCount query:', err); + throw new Error('Failed to execute epochsCount query'); }), - ]); + ]); const epochsStatsResult: any[] = await epochsStatsResultSet.json(); const blocksStatsResult: any[] = await blocksStatsResultSet.json(); @@ -65,40 +77,36 @@ export const getEpochsStatistics = async (req: Request, res: Response) => { let arrayEpochs = []; - epochsStatsResult.forEach((epoch: any) => { + epochsStatsResult.forEach((epoch: any) => { const aux = blocksStatsResult.find((blocks: any) => Number(blocks.epoch) === Number(epoch.f_epoch)); - arrayEpochs.push({ + arrayEpochs.push({ ...epoch, proposed_blocks: aux?.proposed_blocks, }); - }); - + }); + res.json({ epochsStats: arrayEpochs, totalCount: Number(epochsCountResult[0].count), }); - } catch (error) { console.log(error); return res.status(500).json({ - msg: 'An error occurred on the server' + msg: 'An error occurred on the server', }); } }; export const getEpochById = async (req: Request, res: Response) => { - try { - const { id } = req.params; const { network } = req.query; const clickhouseClient = clickhouseClients[network as string]; - const [ epochStatsResultSet, blocksProposedResultSet, withdrawalsResultSet ] = - await Promise.all([ - clickhouseClient.query({ - query: ` + const [epochStatsResultSet, blocksProposedResultSet, withdrawalsResultSet] = await Promise.all([ + clickhouseClient.query({ + query: ` SELECT f_epoch, f_slot, @@ -114,10 +122,10 @@ export const getEpochById = async (req: Request, res: Response) => { WHERE f_epoch = ${id} `, - format: 'JSONEachRow', - }), - clickhouseClient.query({ - query: ` + format: 'JSONEachRow', + }), + clickhouseClient.query({ + query: ` SELECT COUNT(*) AS proposed_blocks FROM @@ -125,10 +133,10 @@ export const getEpochById = async (req: Request, res: Response) => { WHERE CAST((f_proposer_slot / 32) AS UInt64) = ${id} AND f_proposed = 1 `, - format: 'JSONEachRow', - }), - clickhouseClient.query({ - query: ` + format: 'JSONEachRow', + }), + clickhouseClient.query({ + query: ` SELECT SUM(f_amount) AS total_withdrawals FROM @@ -136,9 +144,9 @@ export const getEpochById = async (req: Request, res: Response) => { WHERE CAST((f_slot / 32) AS UInt64) = ${id} `, - format: 'JSONEachRow', - }), - ]); + format: 'JSONEachRow', + }), + ]); const epochStatsResult = await epochStatsResultSet.json(); const blocksProposedResult = await blocksProposedResultSet.json(); @@ -149,30 +157,26 @@ export const getEpochById = async (req: Request, res: Response) => { ...epochStatsResult[0], ...blocksProposedResult[0], withdrawals: withdrawalsResult[0].total_withdrawals, - } + }, }); - } catch (error) { console.log(error); return res.status(500).json({ - msg: 'An error occurred on the server' + msg: 'An error occurred on the server', }); } }; export const getSlotsByEpoch = async (req: Request, res: Response) => { - try { - const { id } = req.params; const { network } = req.query; const clickhouseClient = clickhouseClients[network as string]; - const [ slotsEpochResultSet, withdrawalsResultSet ] = - await Promise.all([ - clickhouseClient.query({ - query: ` + const [slotsEpochResultSet, withdrawalsResultSet] = await Promise.all([ + clickhouseClient.query({ + query: ` SELECT pd.f_val_idx, pd.f_proposer_slot, @@ -187,10 +191,10 @@ export const getSlotsByEpoch = async (req: Request, res: Response) => { ORDER BY pd.f_proposer_slot DESC `, - format: 'JSONEachRow', - }), - clickhouseClient.query({ - query: ` + format: 'JSONEachRow', + }), + clickhouseClient.query({ + query: ` SELECT f_slot, f_amount @@ -199,46 +203,36 @@ export const getSlotsByEpoch = async (req: Request, res: Response) => { WHERE CAST((f_slot / 32) AS UInt64) = ${id} `, - format: 'JSONEachRow', - }), - ]); + format: 'JSONEachRow', + }), + ]); const slotsEpochResult: any[] = await slotsEpochResultSet.json(); const withdrawalsResult: any[] = await withdrawalsResultSet.json(); const slots = slotsEpochResult.map((slot: any) => ({ ...slot, - withdrawals: - withdrawalsResult - .filter((withdrawal: any) => withdrawal.f_slot === slot.f_proposer_slot) - .reduce((acc: number, withdrawal: any) => acc + Number(withdrawal.f_amount), 0), + withdrawals: withdrawalsResult + .filter((withdrawal: any) => withdrawal.f_slot === slot.f_proposer_slot) + .reduce((acc: number, withdrawal: any) => acc + Number(withdrawal.f_amount), 0), })); res.json({ - slots + slots, }); - } catch (error) { console.log(error); return res.status(500).json({ - msg: 'An error occurred on the server' + msg: 'An error occurred on the server', }); } }; export const listenEpochNotification = async (req: Request, res: Response) => { - try { - const { network } = req.query; const chClient = clickhouseClients[network as string]; - - res.writeHead(200, { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive' - }); const blockGenesisResultSet = await chClient.query({ query: ` @@ -250,20 +244,19 @@ export const listenEpochNotification = async (req: Request, res: Response) => { }); const blockGenesisResult = await blockGenesisResultSet.json(); - + const genesisTime = Number(blockGenesisResult[0].f_genesis_time) * 1000; - + const nextEpoch = Math.floor((Date.now() - genesisTime) / 12000 / 32) - 1; - const newEpochEstimatedTime = genesisTime + ((nextEpoch + 2) * 12000 * 32) + 4000; + const newEpochEstimatedTime = genesisTime + (nextEpoch + 2) * 12000 * 32 + 4000; - await new Promise((resolve) => setTimeout(resolve, newEpochEstimatedTime - Date.now())); + await new Promise(resolve => setTimeout(resolve, newEpochEstimatedTime - Date.now())); let latestEpochInserted = 0; let currentTimeout = 1000; do { - if (latestEpochInserted > 0) { await new Promise(resolve => setTimeout(resolve, currentTimeout)); currentTimeout *= 1.5; @@ -285,18 +278,22 @@ export const listenEpochNotification = async (req: Request, res: Response) => { const latestEpochResult = await latestEpochResultSet.json(); latestEpochInserted = latestEpochResult[0]?.f_epoch ?? 0; - } while (latestEpochInserted < nextEpoch); + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }); + res.write('event: new_epoch\n'); res.write(`data: Epoch ${nextEpoch}`); res.write('\n\n'); res.end(); - } catch (error) { console.log(error); return res.status(500).json({ - msg: 'An error occurred on the server' + msg: 'An error occurred on the server', }); } }; diff --git a/packages/server/controllers/slots.ts b/packages/server/controllers/slots.ts index 2c84d6fc..18559213 100644 --- a/packages/server/controllers/slots.ts +++ b/packages/server/controllers/slots.ts @@ -2,9 +2,7 @@ import { Request, Response } from 'express'; import { clickhouseClients } from '../config/db'; export const getSlots = async (req: Request, res: Response) => { - try { - const { network, page = 0, @@ -37,7 +35,7 @@ export const getSlots = async (req: Request, res: Response) => { } if (epoch) { - where.push(`(pd.f_proposer_slot >= ${Number(epoch) * 32}) AND + where.push(`(pd.f_proposer_slot >= ${Number(epoch) * 32}) AND (pd.f_proposer_slot < ${(Number(epoch) + 1) * 32})`); } @@ -64,28 +62,31 @@ export const getSlots = async (req: Request, res: Response) => { } if (entities && Array.isArray(entities) && entities.length > 0) { - const entitiesArray = entities.map(x => typeof x === 'string' ? `'${x.toLowerCase()}'` : '').filter(x => x !== ''); + const entitiesArray = entities + .map(x => (typeof x === 'string' ? `'${x.toLowerCase()}'` : '')) + .filter(x => x !== ''); where.push(`(pk.f_pool_name IN (${entitiesArray.join(',')}))`); } if (clients && Array.isArray(clients) && clients.length > 0) { - const clientsArray = clients.map(x => typeof x === 'string' ? `'${x.toLowerCase()}'` : '').filter(x => x !== ''); + const clientsArray = clients + .map(x => (typeof x === 'string' ? `'${x.toLowerCase()}'` : '')) + .filter(x => x !== ''); where.push(`(LOWER(scg.f_best_guess_single) IN (${clientsArray.join(',')}))`); } - const [ slotsResultSet, countResultSet ] = - await Promise.all([ - chClient.query({ - query: ` - SELECT - pd.f_proposer_slot AS f_proposer_slot, - pd.f_val_idx AS f_val_idx, - pd.f_proposed AS f_proposed, + const [slotsResultSet, countResultSet] = await Promise.all([ + chClient.query({ + query: ` + SELECT + pd.f_proposer_slot AS f_proposer_slot, + pd.f_val_idx AS f_val_idx, + pd.f_proposed AS f_proposed, pk.f_pool_name AS f_pool_name, COALESCE(SUM(w.f_amount), 0) AS withdrawals - FROM + FROM t_proposer_duties pd - LEFT OUTER JOIN + LEFT OUTER JOIN t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx LEFT OUTER JOIN t_withdrawals w ON pd.f_proposer_slot = w.f_slot @@ -98,20 +99,20 @@ export const getSlots = async (req: Request, res: Response) => { ${where.length > 0 ? `WHERE ${where.join(' AND ')}` : ''} GROUP BY pd.f_proposer_slot, pd.f_val_idx, pd.f_proposed, pk.f_pool_name - ORDER BY + ORDER BY pd.f_proposer_slot DESC LIMIT ${Number(limit)} OFFSET ${skip} `, - format: 'JSONEachRow', - }), - chClient.query({ - query: ` + format: 'JSONEachRow', + }), + chClient.query({ + query: ` SELECT COUNT(*) AS count FROM t_proposer_duties pd - LEFT OUTER JOIN + LEFT OUTER JOIN t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx LEFT OUTER JOIN t_orphans o ON pd.f_proposer_slot = o.f_slot @@ -121,9 +122,9 @@ export const getSlots = async (req: Request, res: Response) => { t_slot_client_guesses scg ON pd.f_proposer_slot = scg.f_slot ${where.length > 0 ? `WHERE ${where.join(' AND ')}` : ''} `, - format: 'JSONEachRow', - }), - ]); + format: 'JSONEachRow', + }), + ]); const slotsResult: any[] = await slotsResultSet.json(); const countResult: any[] = await countResultSet.json(); @@ -132,19 +133,16 @@ export const getSlots = async (req: Request, res: Response) => { slots: slotsResult, totalCount: Number(countResult[0].count), }); - } catch (error) { console.log(error); return res.status(500).json({ - msg: 'An error occurred on the server' + msg: 'An error occurred on the server', }); } }; export const getBlocks = async (req: Request, res: Response) => { - try { - const { network, page = 0, limit = 128 } = req.query; const chClient = clickhouseClients[network as string]; @@ -152,7 +150,6 @@ export const getBlocks = async (req: Request, res: Response) => { const skip = Number(page) * Number(limit); if (Number(page) > 0) { - const blocksResultSet = await chClient.query({ query: ` SELECT @@ -162,13 +159,13 @@ export const getBlocks = async (req: Request, res: Response) => { pk.f_pool_name, pd.f_val_idx AS f_proposer_index, scg.f_best_guess_single AS f_cl_client - FROM + FROM t_proposer_duties pd - LEFT OUTER JOIN + LEFT OUTER JOIN t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx LEFT OUTER JOIN t_slot_client_guesses scg ON pd.f_proposer_slot = scg.f_slot - ORDER BY + ORDER BY pd.f_proposer_slot DESC LIMIT ${Number(limit)} OFFSET ${skip} @@ -181,13 +178,10 @@ export const getBlocks = async (req: Request, res: Response) => { res.json({ blocks: blocksResult, }); - } else { - - const [actualBlocksResultSet, finalBlocksResultSet] = - await Promise.all([ - chClient.query({ - query: ` + const [actualBlocksResultSet, finalBlocksResultSet] = await Promise.all([ + chClient.query({ + query: ` SELECT CAST((pd.f_proposer_slot / 32) AS UInt64) AS f_epoch, pd.f_proposer_slot AS f_slot, @@ -195,21 +189,21 @@ export const getBlocks = async (req: Request, res: Response) => { pk.f_pool_name, pd.f_val_idx AS f_proposer_index, scg.f_best_guess_single AS f_cl_client - FROM + FROM t_proposer_duties pd - LEFT OUTER JOIN + LEFT OUTER JOIN t_eth2_pubkeys pk ON pd.f_val_idx = pk.f_val_idx LEFT OUTER JOIN t_slot_client_guesses scg ON pd.f_proposer_slot = scg.f_slot - ORDER BY + ORDER BY pd.f_proposer_slot DESC LIMIT ${Number(limit)} OFFSET ${skip} `, - format: 'JSONEachRow', - }), - chClient.query({ - query: ` + format: 'JSONEachRow', + }), + chClient.query({ + query: ` SELECT bm.f_epoch, bm.f_slot AS f_slot, @@ -219,9 +213,9 @@ export const getBlocks = async (req: Request, res: Response) => { bm.f_graffiti, bm.f_el_block_number, scg.f_best_guess_single AS f_cl_client - FROM + FROM t_block_metrics bm - LEFT OUTER JOIN + LEFT OUTER JOIN t_eth2_pubkeys pk ON bm.f_proposer_index = pk.f_val_idx LEFT OUTER JOIN t_slot_client_guesses scg ON bm.f_slot = scg.f_slot @@ -234,9 +228,9 @@ export const getBlocks = async (req: Request, res: Response) => { ORDER BY bm.f_slot DESC `, - format: 'JSONEachRow', - }), - ]); + format: 'JSONEachRow', + }), + ]); const actualBlocksResult: any[] = await actualBlocksResultSet.json(); const finalBlocksResult: any[] = await finalBlocksResultSet.json(); @@ -245,30 +239,26 @@ export const getBlocks = async (req: Request, res: Response) => { res.json({ blocks: arrayEpochs, - }); + }); } - } catch (error) { console.log(error); return res.status(500).json({ - msg: 'An error occurred on the server' + msg: 'An error occurred on the server', }); } }; export const getSlotById = async (req: Request, res: Response) => { - try { - const { id } = req.params; const { network } = req.query; const chClient = clickhouseClients[network as string]; - const [ blockResultSet, proposerDutiesResultSet ] = - await Promise.all([ - chClient.query({ - query: ` + const [blockResultSet, proposerDutiesResultSet] = await Promise.all([ + chClient.query({ + query: ` SELECT bm.f_timestamp, bm.f_epoch, @@ -290,29 +280,29 @@ export const getSlotById = async (req: Request, res: Response) => { bm.f_attester_slashings AS f_att_slashings, pk.f_pool_name, scg.f_best_guess_single AS f_cl_client - FROM + FROM t_block_metrics bm - LEFT OUTER JOIN + LEFT OUTER JOIN t_eth2_pubkeys pk ON bm.f_proposer_index = pk.f_val_idx LEFT OUTER JOIN t_slot_client_guesses scg ON bm.f_slot = scg.f_slot WHERE bm.f_slot = ${id} `, - format: 'JSONEachRow', - }), - chClient.query({ - query: ` + format: 'JSONEachRow', + }), + chClient.query({ + query: ` SELECT f_proposed - FROM + FROM t_proposer_duties WHERE f_proposer_slot = ${id} `, - format: 'JSONEachRow', - }), - ]); + format: 'JSONEachRow', + }), + ]); const blockResult: any[] = await blockResultSet.json(); const proposerDutiesResult: any[] = await proposerDutiesResultSet.json(); @@ -321,31 +311,28 @@ export const getSlotById = async (req: Request, res: Response) => { if (proposerDutiesResult.length > 0) { blockResult[0].f_proposed = proposerDutiesResult[0].f_proposed; } - + res.json({ block: blockResult[0], }); } else { res.json({}); } - } catch (error) { console.log(error); return res.status(500).json({ - msg: 'An error occurred on the server' + msg: 'An error occurred on the server', }); } }; export const getSlotsByGraffiti = async (req: Request, res: Response) => { - try { - const { search } = req.params; const { network, page = 0, limit = 10 } = req.query; const skip = Number(page) * Number(limit); - + const chClient = clickhouseClients[network as string]; const blocksResultSet = await chClient.query({ @@ -357,13 +344,13 @@ export const getSlotsByGraffiti = async (req: Request, res: Response) => { bm.f_proposer_index, bm.f_proposed, pk.f_pool_name - FROM + FROM t_block_metrics bm - LEFT OUTER JOIN + LEFT OUTER JOIN t_eth2_pubkeys pk ON bm.f_proposer_index = pk.f_val_idx WHERE bm.f_graffiti LIKE '%${search}%' - ORDER BY + ORDER BY bm.f_slot DESC LIMIT ${Number(limit)} OFFSET ${skip} @@ -376,19 +363,16 @@ export const getSlotsByGraffiti = async (req: Request, res: Response) => { res.json({ blocks: blocksResult, }); - } catch (error) { console.log(error); return res.status(500).json({ - msg: 'An error occurred on the server' + msg: 'An error occurred on the server', }); } }; export const getWithdrawalsBySlot = async (req: Request, res: Response) => { - try { - const { id } = req.params; const { network } = req.query; @@ -400,7 +384,7 @@ export const getWithdrawalsBySlot = async (req: Request, res: Response) => { f_val_idx, f_address, f_amount - FROM + FROM t_withdrawals WHERE f_slot = ${id} @@ -413,27 +397,24 @@ export const getWithdrawalsBySlot = async (req: Request, res: Response) => { res.json({ withdrawals: withdrawalsResult, }); - } catch (error) { console.log(error); return res.status(500).json({ - msg: 'An error occurred on the server' + msg: 'An error occurred on the server', }); } }; export const listenSlotNotification = async (req: Request, res: Response) => { - try { - const { network } = req.query; const chClient = clickhouseClients[network as string]; - + res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive' + Connection: 'keep-alive', }); const blockGenesisResultSet = await chClient.query({ @@ -446,50 +427,46 @@ export const listenSlotNotification = async (req: Request, res: Response) => { }); const blockGenesisResult = await blockGenesisResultSet.json(); - + const genesisTime = Number(blockGenesisResult[0].f_genesis_time) * 1000; - + const nextSlot = Math.floor((Date.now() - genesisTime) / 12000) + 1; let latestSlotInserted = 0; let currentTimeout = 1000; do { - if (latestSlotInserted > 0) { await new Promise(resolve => setTimeout(resolve, currentTimeout)); currentTimeout *= 1.5; } - const latestBlockResultSet = - await chClient.query({ - query: ` + const latestBlockResultSet = await chClient.query({ + query: ` SELECT f_slot - FROM + FROM t_block_metrics - ORDER BY + ORDER BY f_slot DESC LIMIT 1 `, - format: 'JSONEachRow', - }); + format: 'JSONEachRow', + }); const latestBlockResult: any[] = await latestBlockResultSet.json(); latestSlotInserted = latestBlockResult.length > 0 ? latestBlockResult[0].f_slot : 0; - } while (latestSlotInserted < nextSlot); res.write('event: new_slot\n'); res.write(`data: Slot = ${latestSlotInserted}`); res.write('\n\n'); res.end(); - } catch (error) { console.log(error); return res.status(500).json({ - msg: 'An error occurred on the server' + msg: 'An error occurred on the server', }); } }; From dcd98de0c074a00ebdbe732158e35eb536d4d55c Mon Sep 17 00:00:00 2001 From: Iuri Pons Date: Fri, 19 Jul 2024 17:19:24 +0200 Subject: [PATCH 10/10] Enhance Entity Page --- .../components/ui/ProgressSmoothBar.tsx | 22 +++++++++++++----- packages/client/pages/entity/[name].tsx | 23 +++++++++++-------- packages/server/controllers/entities.ts | 2 +- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/packages/client/components/ui/ProgressSmoothBar.tsx b/packages/client/components/ui/ProgressSmoothBar.tsx index 435f7f89..6bc0a4f5 100644 --- a/packages/client/components/ui/ProgressSmoothBar.tsx +++ b/packages/client/components/ui/ProgressSmoothBar.tsx @@ -1,12 +1,11 @@ -import React from 'react'; - // Components import TooltipContainer from './TooltipContainer'; import TooltipResponsive from './TooltipResponsive'; type Props = { title?: string; - percent: number; + percent?: number; + text?: string; color: string; backgroundColor: string; tooltipColor?: string; @@ -19,6 +18,7 @@ type Props = { const ProgressSmoothBar = ({ title, percent, + text, color, backgroundColor, tooltipColor, @@ -27,7 +27,13 @@ const ProgressSmoothBar = ({ widthTooltip, tooltipAbove, }: Props) => { - const widthInnerDiv = percent > 0 ? Math.min(Number(percent * 100), 100).toFixed(0) : 100; + const shouldDisplayPercent = percent !== undefined; + + const widthInnerDiv = shouldDisplayPercent + ? percent > 0 + ? Math.min(Number(percent * 100), 100).toFixed(0) + : 100 + : 100; return (
@@ -38,7 +44,9 @@ const ProgressSmoothBar = ({ {tooltipColor && tooltipContent ? (

- {Number(Number(percent * 100).toFixed(2)).toLocaleString()}% + {shouldDisplayPercent + ? `${Number(Number(percent * 100).toFixed(2)).toLocaleString()}%` + : text}

) : (

- {Number(Number(percent * 100).toFixed(2)).toLocaleString()}% + {shouldDisplayPercent + ? `${Number(Number(percent * 100).toFixed(2)).toLocaleString()}%` + : text}

)}
diff --git a/packages/client/pages/entity/[name].tsx b/packages/client/pages/entity/[name].tsx index d03313b1..315f7147 100644 --- a/packages/client/pages/entity/[name].tsx +++ b/packages/client/pages/entity/[name].tsx @@ -124,7 +124,12 @@ const EntityComponent = ({ name, network }: Props) => { title='' color='var(--black)' backgroundColor='var(--white)' - percent={entity.aggregated_rewards / entity.aggregated_max_rewards || 0} + percent={ + entity.aggregated_rewards >= 0 + ? entity.aggregated_rewards / entity.aggregated_max_rewards || 0 + : undefined + } + text={entity.aggregated_rewards < 0 ? `${entity.aggregated_rewards} GWEI` : undefined} tooltipColor='blue' tooltipContent={ <> @@ -154,15 +159,15 @@ const EntityComponent = ({ name, network }: Props) => { {entity && (
- Missing Target: {entity.count_missing_target?.toLocaleString()} + Missing Source: {entity.count_missing_source?.toLocaleString()} Attestations: {entity.count_expected_attestations?.toLocaleString()} @@ -172,15 +177,15 @@ const EntityComponent = ({ name, network }: Props) => { /> - Missing Source: {entity.count_missing_source?.toLocaleString()} + Missing Target: {entity.count_missing_target?.toLocaleString()} Attestations: {entity.count_expected_attestations?.toLocaleString()} diff --git a/packages/server/controllers/entities.ts b/packages/server/controllers/entities.ts index 42151c79..26cf5e01 100644 --- a/packages/server/controllers/entities.ts +++ b/packages/server/controllers/entities.ts @@ -38,7 +38,7 @@ export const getEntity = async (req: Request, res: Response) => { chClient.query({ query: ` SELECT - SUM(aggregated_rewards) AS aggregated_rewards, + SUM(toInt64(aggregated_rewards)) AS aggregated_rewards, SUM(aggregated_max_rewards) AS aggregated_max_rewards, SUM(count_sync_committee) AS count_sync_committee, SUM(count_missing_source) AS count_missing_source,