diff --git a/src/shadowbox/infrastructure/prometheus_scraper.ts b/src/shadowbox/infrastructure/prometheus_scraper.ts index 6a7bdaf6f..5cfe4064d 100644 --- a/src/shadowbox/infrastructure/prometheus_scraper.ts +++ b/src/shadowbox/infrastructure/prometheus_scraper.ts @@ -21,15 +21,54 @@ import * as path from 'path'; import * as logging from '../infrastructure/logging'; +/** + * Represents a Unix timestamp in seconds. + * @typedef {number} Timestamp + */ +type Timestamp = number; + +/** + * Represents a Prometheus metric's labels. + * Each key in the object is a label name, and the corresponding value is the label's value. + * + * @typedef {Object} PrometheusMetric + */ +export type PrometheusMetric = {[labelValue: string]: string}; + +/** + * Represents a Prometheus value, which is a tuple of a timestamp and a string value. + * @typedef {[Timestamp, string]} PrometheusValue + */ +export type PrometheusValue = [Timestamp, string]; + +/** + * Represents a Prometheus result, which can be a time series (values) or a single value. + * @typedef {Object} PrometheusResult + * @property {Object.} metric - Labels associated with the metric. + * @property {Array} [values] - Time series data (for range queries). + * @property {PrometheusValue} [value] - Single value (for instant queries). + */ +export type PrometheusResult = { + metric: PrometheusMetric; + values?: PrometheusValue[]; + value?: PrometheusValue; +}; + +/** + * Represents the data part of a Prometheus query result. + * @interface QueryResultData + */ export interface QueryResultData { resultType: 'matrix' | 'vector' | 'scalar' | 'string'; - result: Array<{ - metric: {[labelValue: string]: string}; - value: [number, string]; - }>; + result: PrometheusResult[]; } -// From https://prometheus.io/docs/prometheus/latest/querying/api/ +/** + * Represents the full JSON response from a Prometheus query. This interface + * is based on the Prometheus API documentation: + * https://prometheus.io/docs/prometheus/latest/querying/api/ + * @interface QueryResult + */ interface QueryResult { status: 'success' | 'error'; data: QueryResultData; @@ -37,16 +76,36 @@ interface QueryResult { error: string; } +/** + * Interface for a Prometheus client. + * @interface PrometheusClient + */ export interface PrometheusClient { + /** + * Performs an instant query against the Prometheus API. + * @function query + * @param {string} query - The PromQL query string. + * @returns {Promise} A Promise that resolves to the query result data. + */ query(query: string): Promise; + + /** + * Performs a range query against the Prometheus API. + * @function queryRange + * @param {string} query - The PromQL query string. + * @param {number} start - The start time for the query range. + * @param {number} end - The end time for the query range. + * @param {string} step - The step size for the query range (e.g., "1m", "5m"). This controls the resolution of the returned data. + * @returns {Promise} A Promise that resolves to the query result data. + */ + queryRange(query: string, start: number, end: number, step: string): Promise; } export class ApiPrometheusClient implements PrometheusClient { constructor(private address: string) {} - query(query: string): Promise { + private request(url: string): Promise { return new Promise((fulfill, reject) => { - const url = `${this.address}/api/v1/query?query=${encodeURIComponent(query)}`; http .get(url, (response) => { if (response.statusCode < 200 || response.statusCode > 299) { @@ -71,6 +130,18 @@ export class ApiPrometheusClient implements PrometheusClient { }); }); } + + query(query: string): Promise { + const url = `${this.address}/api/v1/query?query=${encodeURIComponent(query)}`; + return this.request(url); + } + + queryRange(query: string, start: number, end: number, step: string): Promise { + const url = `${this.address}/api/v1/query_range?query=${encodeURIComponent( + query + )}&start=${start}&end=${end}&step=${step}`; + return this.request(url); + } } export async function startPrometheus( diff --git a/src/shadowbox/server/manager_metrics.spec.ts b/src/shadowbox/server/manager_metrics.spec.ts index 13bcc6d0e..a66097b1a 100644 --- a/src/shadowbox/server/manager_metrics.spec.ts +++ b/src/shadowbox/server/manager_metrics.spec.ts @@ -17,68 +17,112 @@ import {PrometheusClient, QueryResultData} from '../infrastructure/prometheus_sc import {FakePrometheusClient} from './mocks/mocks'; export class QueryMapPrometheusClient implements PrometheusClient { - constructor(private queryMap: {[query: string]: QueryResultData}) {} + constructor( + private queryMap: {[query: string]: QueryResultData}, + private queryRangeMap: {[query: string]: QueryResultData} + ) {} - async query(_query: string): Promise { - return this.queryMap[_query]; + async query(query: string): Promise { + return this.queryMap[query]; + } + + async queryRange( + query: string, + _start: number, + _end: number, + _step: string + ): Promise { + return this.queryRangeMap[query]; } } describe('PrometheusManagerMetrics', () => { it('getServerMetrics', async (done) => { const managerMetrics = new PrometheusManagerMetrics( - new QueryMapPrometheusClient({ - 'sum(increase(shadowsocks_data_bytes_per_location{dir=~"ct"}[0s])) by (location, asn, asorg)': - { + new QueryMapPrometheusClient( + { + 'sum(increase(shadowsocks_data_bytes_per_location{dir=~"ct"}[0s])) by (location, asn, asorg)': + { + resultType: 'vector', + result: [ + { + metric: { + location: 'US', + asn: '49490', + asorg: 'Test AS Org', + }, + value: [1738959398, '1000'], + }, + ], + }, + 'sum(increase(shadowsocks_tunnel_time_seconds_per_location[0s])) by (location, asn, asorg)': + { + resultType: 'vector', + result: [ + { + metric: { + location: 'US', + asn: '49490', + asorg: 'Test AS Org', + }, + value: [1738959398, '1000'], + }, + ], + }, + 'sum(increase(shadowsocks_data_bytes{dir=~"ct"}[0s])) by (access_key)': { resultType: 'vector', result: [ { metric: { - location: 'US', - asn: '49490', - asorg: null, + access_key: '0', }, - value: [null, '1000'], + value: [1738959398, '1000'], }, ], }, - 'sum(increase(shadowsocks_tunnel_time_seconds_per_location[0s])) by (location, asn, asorg)': - { + 'sum(increase(shadowsocks_tunnel_time_seconds[0s])) by (access_key)': { resultType: 'vector', result: [ { metric: { - location: 'US', - asn: '49490', - asorg: null, + access_key: '0', }, - value: [null, '1000'], + value: [1738959398, '1000'], }, ], }, - 'sum(increase(shadowsocks_data_bytes{dir=~"ct"}[0s])) by (access_key)': { - resultType: 'vector', - result: [ - { - metric: { - access_key: '0', - }, - value: [null, '1000'], - }, - ], }, - 'sum(increase(shadowsocks_tunnel_time_seconds[0s])) by (access_key)': { - resultType: 'vector', - result: [ - { - metric: { - access_key: '0', + { + 'sum(increase(shadowsocks_data_bytes{dir=~"ct"}[300s])) by (access_key)': { + resultType: 'matrix', + result: [ + { + metric: { + access_key: '0', + }, + values: [ + [1738959398, '1000'], + [1738959398, '2000'], + ], }, - value: [null, '1000'], - }, - ], - }, - }) + ], + }, + 'sum(increase(shadowsocks_tunnel_time_seconds[300s])) by (access_key)': { + resultType: 'matrix', + result: [ + { + metric: { + access_key: '0', + }, + values: [ + [1738959398, '1000'], + [1738959398, '0'], + ], + }, + ], + }, + } + ) ); const serverMetrics = await managerMetrics.getServerMetrics({seconds: 0}); @@ -88,23 +132,31 @@ describe('PrometheusManagerMetrics', () => { { "location": "US", "asn": 49490, - "asOrg": "null", - "tunnelTime": { - "seconds": 1000 - }, + "asOrg": "Test AS Org", "dataTransferred": { "bytes": 1000 + }, + "tunnelTime": { + "seconds": 1000 } } ], "accessKeys": [ { "accessKeyId": 0, + "dataTransferred": { + "bytes": 1000 + }, "tunnelTime": { "seconds": 1000 }, - "dataTransferred": { - "bytes": 1000 + "connection": { + "lastConnected": 1738959398, + "lastTrafficSeen": 1738959398, + "peakDevices": { + "count": 4, + "timestamp": 1738959398 + } } } ] @@ -114,58 +166,88 @@ describe('PrometheusManagerMetrics', () => { it('getServerMetrics - does a full outer join on metric data', async (done) => { const managerMetrics = new PrometheusManagerMetrics( - new QueryMapPrometheusClient({ - 'sum(increase(shadowsocks_data_bytes_per_location{dir=~"ct"}[0s])) by (location, asn, asorg)': - { + new QueryMapPrometheusClient( + { + 'sum(increase(shadowsocks_data_bytes_per_location{dir=~"ct"}[0s])) by (location, asn, asorg)': + { + resultType: 'vector', + result: [ + { + metric: { + location: 'US', + asn: '49490', + asorg: 'Test AS Org', + }, + value: [1738959398, '1000'], + }, + ], + }, + 'sum(increase(shadowsocks_tunnel_time_seconds_per_location[0s])) by (location, asn, asorg)': + { + resultType: 'vector', + result: [ + { + metric: { + location: 'CA', + }, + value: [1738959398, '1000'], + }, + ], + }, + 'sum(increase(shadowsocks_data_bytes{dir=~"ct"}[0s])) by (access_key)': { resultType: 'vector', result: [ { metric: { - location: 'US', - asn: '49490', - asorg: null, + access_key: '0', }, - value: [null, '1000'], + value: [1738959398, '1000'], }, ], }, - 'sum(increase(shadowsocks_tunnel_time_seconds_per_location[0s])) by (location, asn, asorg)': - { + 'sum(increase(shadowsocks_tunnel_time_seconds[0s])) by (access_key)': { resultType: 'vector', result: [ { metric: { - location: 'CA', - asn: '53520', - asorg: null, + access_key: '1', }, - value: [null, '1000'], + value: [1738959398, '1000'], }, ], }, - 'sum(increase(shadowsocks_data_bytes{dir=~"ct"}[0s])) by (access_key)': { - resultType: 'vector', - result: [ - { - metric: { - access_key: '0', - }, - value: [null, '1000'], - }, - ], }, - 'sum(increase(shadowsocks_tunnel_time_seconds[0s])) by (access_key)': { - resultType: 'vector', - result: [ - { - metric: { - access_key: '1', + { + 'sum(increase(shadowsocks_data_bytes{dir=~"ct"}[300s])) by (access_key)': { + resultType: 'matrix', + result: [ + { + metric: { + access_key: '0', + }, + values: [ + [1738959398, '1000'], + [1738959398, '2000'], + ], }, - value: [null, '1000'], - }, - ], - }, - }) + ], + }, + 'sum(increase(shadowsocks_tunnel_time_seconds[300s])) by (access_key)': { + resultType: 'matrix', + result: [ + { + metric: { + access_key: '0', + }, + values: [ + [1738959398, '1000'], + [1738959398, '0'], + ], + }, + ], + }, + } + ) ); const serverMetrics = await managerMetrics.getServerMetrics({seconds: 0}); @@ -174,8 +256,11 @@ describe('PrometheusManagerMetrics', () => { "server": [ { "location": "CA", - "asn": 53520, - "asOrg": "null", + "asn": null, + "asOrg": null, + "dataTransferred": { + "bytes": 0 + }, "tunnelTime": { "seconds": 1000 } @@ -183,23 +268,48 @@ describe('PrometheusManagerMetrics', () => { { "location": "US", "asn": 49490, - "asOrg": "null", + "asOrg": "Test AS Org", "dataTransferred": { "bytes": 1000 + }, + "tunnelTime": { + "seconds": 0 } } ], "accessKeys": [ { "accessKeyId": 1, + "dataTransferred": { + "bytes": 0 + }, "tunnelTime": { "seconds": 1000 + }, + "connection": { + "lastConnected": null, + "lastTrafficSeen": null, + "peakDevices": { + "count": 0, + "timestamp": null + } } }, { "accessKeyId": 0, "dataTransferred": { "bytes": 1000 + }, + "tunnelTime": { + "seconds": 0 + }, + "connection": { + "lastConnected": 1738959398, + "lastTrafficSeen": 1738959398, + "peakDevices": { + "count": 4, + "timestamp": 1738959398 + } } } ] diff --git a/src/shadowbox/server/manager_metrics.ts b/src/shadowbox/server/manager_metrics.ts index c1bc09e2a..1afeab175 100644 --- a/src/shadowbox/server/manager_metrics.ts +++ b/src/shadowbox/server/manager_metrics.ts @@ -12,9 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -import {PrometheusClient} from '../infrastructure/prometheus_scraper'; +import { + PrometheusClient, + PrometheusMetric, + PrometheusValue, +} from '../infrastructure/prometheus_scraper'; import {DataUsageByUser, DataUsageTimeframe} from '../model/metrics'; +const PROMETHEUS_RANGE_QUERY_STEP_SECONDS = 5 * 60; + interface Duration { seconds: number; } @@ -23,10 +29,21 @@ interface Data { bytes: number; } +interface PeakDevices { + count: number; + timestamp: number | null; +} + +interface ConnectionStats { + lastConnected: number | null; + lastTrafficSeen: number | null; + peakDevices: PeakDevices; +} + interface ServerMetricsServerEntry { location: string; - asn: number; - asOrg: string; + asn: number | null; + asOrg: string | null; tunnelTime: Duration; dataTransferred: Data; } @@ -35,6 +52,7 @@ interface ServerMetricsAccessKeyEntry { accessKeyId: number; tunnelTime: Duration; dataTransferred: Data; + connection: ConnectionStats; } interface ServerMetrics { @@ -70,81 +88,180 @@ export class PrometheusManagerMetrics implements ManagerMetrics { } async getServerMetrics(timeframe: Duration): Promise { - const dataTransferredByLocation = await this.prometheusClient.query( - `sum(increase(shadowsocks_data_bytes_per_location{dir=~"ct"}[${timeframe.seconds}s])) by (location, asn, asorg)` - ); - const tunnelTimeByLocation = await this.prometheusClient.query( - `sum(increase(shadowsocks_tunnel_time_seconds_per_location[${timeframe.seconds}s])) by (location, asn, asorg)` - ); - const dataTransferredByAccessKey = await this.prometheusClient.query( - `sum(increase(shadowsocks_data_bytes{dir=~"ct"}[${timeframe.seconds}s])) by (access_key)` - ); - const tunnelTimeByAccessKey = await this.prometheusClient.query( - `sum(increase(shadowsocks_tunnel_time_seconds[${timeframe.seconds}s])) by (access_key)` - ); + const now = new Date().getTime(); + // We need to calculate consistent start and end times for Prometheus range + // queries. Rounding the end time *up* to the nearest multiple of the step + // prevents time "drift" between queries, which is crucial for reliable step + // alignment and consistent data retrieval, especially when using + // aggregations like increase() or rate(). This ensures that the same time + // windows are queried each time, leading to more stable and predictable + // results. + const end = + Math.ceil(now / (PROMETHEUS_RANGE_QUERY_STEP_SECONDS * 1000)) * + PROMETHEUS_RANGE_QUERY_STEP_SECONDS; + const start = end - timeframe.seconds; - const serverMap = new Map(); - const serverMapKey = (entry) => - `${entry.metric['location']},${entry.metric['asn']},${entry.metric['asorg']}`; - for (const entry of tunnelTimeByLocation.result) { - serverMap.set(serverMapKey(entry), { - tunnelTime: { - seconds: parseFloat(entry.value[1]), - }, - }); - } + const [ + dataTransferredByLocation, + tunnelTimeByLocation, + dataTransferredByAccessKey, + tunnelTimeByAccessKey, + dataTransferredByAccessKeyRange, + tunnelTimeByAccessKeyRange, + ] = await Promise.all([ + this.prometheusClient.query( + `sum(increase(shadowsocks_data_bytes_per_location{dir=~"ct"}[${timeframe.seconds}s])) by (location, asn, asorg)` + ), + this.prometheusClient.query( + `sum(increase(shadowsocks_tunnel_time_seconds_per_location[${timeframe.seconds}s])) by (location, asn, asorg)` + ), + this.prometheusClient.query( + `sum(increase(shadowsocks_data_bytes{dir=~"ct"}[${timeframe.seconds}s])) by (access_key)` + ), + this.prometheusClient.query( + `sum(increase(shadowsocks_tunnel_time_seconds[${timeframe.seconds}s])) by (access_key)` + ), + this.prometheusClient.queryRange( + `sum(increase(shadowsocks_data_bytes{dir=~"ct"}[${PROMETHEUS_RANGE_QUERY_STEP_SECONDS}s])) by (access_key)`, + start, + end, + `${PROMETHEUS_RANGE_QUERY_STEP_SECONDS}s` + ), + this.prometheusClient.queryRange( + `sum(increase(shadowsocks_tunnel_time_seconds[${PROMETHEUS_RANGE_QUERY_STEP_SECONDS}s])) by (access_key)`, + start, + end, + `${PROMETHEUS_RANGE_QUERY_STEP_SECONDS}s` + ), + ]); - for (const entry of dataTransferredByLocation.result) { - if (!serverMap.has(serverMapKey(entry))) { - serverMap.set(serverMapKey(entry), {}); - } + const serverMap = new Map(); + for (const result of tunnelTimeByLocation.result) { + const entry = getServerMetricsServerEntry(serverMap, result.metric); + entry.tunnelTime.seconds = result.value ? parseFloat(result.value[1]) : 0; + } - serverMap.get(serverMapKey(entry)).dataTransferred = { - bytes: parseFloat(entry.value[1]), - }; + for (const result of dataTransferredByLocation.result) { + const entry = getServerMetricsServerEntry(serverMap, result.metric); + entry.dataTransferred.bytes = result.value ? parseFloat(result.value[1]) : 0; } - const server = []; - for (const [key, metrics] of serverMap.entries()) { - const [location, asn, asOrg] = key.split(','); - server.push({ - location, - asn: parseInt(asn), - asOrg, - ...metrics, - }); + const accessKeyMap = new Map(); + for (const result of tunnelTimeByAccessKey.result) { + const entry = getServerMetricsAccessKeyEntry(accessKeyMap, result.metric); + entry.tunnelTime.seconds = result.value ? parseFloat(result.value[1]) : 0; } - const accessKeyMap = new Map(); - for (const entry of tunnelTimeByAccessKey.result) { - accessKeyMap.set(entry.metric['access_key'], { - tunnelTime: { - seconds: parseFloat(entry.value[1]), - }, - }); + for (const result of dataTransferredByAccessKey.result) { + const entry = getServerMetricsAccessKeyEntry(accessKeyMap, result.metric); + entry.dataTransferred.bytes = result.value ? parseFloat(result.value[1]) : 0; } - for (const entry of dataTransferredByAccessKey.result) { - if (!accessKeyMap.has(entry.metric['access_key'])) { - accessKeyMap.set(entry.metric['access_key'], {}); + for (const result of tunnelTimeByAccessKeyRange.result) { + const entry = getServerMetricsAccessKeyEntry(accessKeyMap, result.metric); + const lastConnected = findLastNonZero(result.values ?? []); + entry.connection.lastConnected = lastConnected ? Math.min(now, lastConnected[0]) : null; + const peakTunnelTimeSec = findPeak(result.values ?? []); + if (peakTunnelTimeSec !== null) { + const peakValue = parseFloat(peakTunnelTimeSec[1]); + if (peakValue > 0) { + const peakTunnelTimeOverTime = peakValue / PROMETHEUS_RANGE_QUERY_STEP_SECONDS; + entry.connection.peakDevices.count = Math.ceil(peakTunnelTimeOverTime); + entry.connection.peakDevices.timestamp = Math.min(now, peakTunnelTimeSec[0]); + } } - - accessKeyMap.get(entry.metric['access_key']).dataTransferred = { - bytes: parseFloat(entry.value[1]), - }; } - const accessKeys = []; - for (const [key, metrics] of accessKeyMap.entries()) { - accessKeys.push({ - accessKeyId: parseInt(key), - ...metrics, - }); + for (const result of dataTransferredByAccessKeyRange.result) { + const entry = getServerMetricsAccessKeyEntry(accessKeyMap, result.metric); + const lastTrafficSeen = findLastNonZero(result.values ?? []); + entry.connection.lastTrafficSeen = lastTrafficSeen ? Math.min(now, lastTrafficSeen[0]) : null; } return { - server, - accessKeys, + server: Array.from(serverMap.values()), + accessKeys: Array.from(accessKeyMap.values()), + }; + } +} + +function getServerMetricsServerEntry( + map: Map, + metric: PrometheusMetric +): ServerMetricsServerEntry { + const {location, asn, asorg} = metric; + const key = `${location},${asn},${asorg}`; + let entry = map.get(key); + if (entry === undefined) { + entry = { + location: location, + asn: asn ? parseInt(asn) : null, + asOrg: asorg ?? null, + dataTransferred: {bytes: 0}, + tunnelTime: {seconds: 0}, }; + map.set(key, entry); + } + return entry; +} + +function getServerMetricsAccessKeyEntry( + map: Map, + metric: PrometheusMetric +): ServerMetricsAccessKeyEntry { + const accessKey = metric['access_key']; + let entry = map.get(accessKey); + if (entry === undefined) { + entry = { + accessKeyId: parseInt(accessKey), + dataTransferred: {bytes: 0}, + tunnelTime: {seconds: 0}, + connection: { + lastConnected: null, + lastTrafficSeen: null, + peakDevices: { + count: 0, + timestamp: null, + }, + }, + }; + map.set(accessKey, entry); + } + return entry; +} + +/** + * Finds the peak PrometheusValue in an array of PrometheusValues. + * + * The peak is determined by the highest value. If values are equal, the + * PrometheusValue with the latest timestamp is considered the peak. + */ +function findPeak(values: PrometheusValue[]): PrometheusValue | null { + let peak: PrometheusValue | null = null; + let maxValue = -Infinity; + + for (const value of values) { + const currentValue = parseFloat(value[1]); + if (currentValue > maxValue) { + maxValue = currentValue; + peak = value; + } else if (currentValue === maxValue && value[0] > peak[0]) { + peak = value; + } + } + + return peak; +} + +/** + * Finds the last PrometheusValue in an array that has a value greater than zero. + */ +function findLastNonZero(values: PrometheusValue[]): PrometheusValue | null { + for (let i = values.length - 1; i >= 0; i--) { + const value = values[i]; + if (parseFloat(value[1]) > 0) { + return value; + } } + return null; } diff --git a/src/shadowbox/server/mocks/mocks.ts b/src/shadowbox/server/mocks/mocks.ts index a99cd5a72..9183ebce8 100644 --- a/src/shadowbox/server/mocks/mocks.ts +++ b/src/shadowbox/server/mocks/mocks.ts @@ -62,4 +62,13 @@ export class FakePrometheusClient implements PrometheusClient { } return queryResultData; } + + queryRange( + _query: string, + _start: number, + _end: number, + _step: string + ): Promise { + throw new Error('unsupported'); + } }