Skip to content

Commit

Permalink
feat: implement metric backend (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
StarpTech authored Aug 28, 2023
1 parent e4ed267 commit 4c0a790
Show file tree
Hide file tree
Showing 58 changed files with 2,917 additions and 940 deletions.
44 changes: 44 additions & 0 deletions .github/workflows/release-preview.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: Release Preview

on:
push:
tags-ignore:
- '**'
branches:
- 'main'
paths-ignore:
- 'docs*/**'
- '*.md'

concurrency:
group: ${{github.workflow}}-${{github.head_ref}}
cancel-in-progress: false

env:
CI: true

jobs:
version:
timeout-minutes: 10
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0

- uses: ./.github/actions/node
with:
working-directory: ${{ env.WORKING_DIRECTORY }}
repo-token: ${{ secrets.GITHUB_TOKEN }}

- name: Install
run: |
pnpm add -g @lerna-lite/cli@2.5.0 @lerna-lite/publish@2.5.0 @lerna-lite/version@2.5.0 @commitlint/config-conventional@17.6.6
- name: Generate next release (dry-run)
run: pnpm run release-preview --yes
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}

- name: Show CHANGELOG.md
run: echo -e "\`\`\`diff\n$(git --no-pager diff './packages/*/CHANGELOG.md')\n\`\`\`" >> $GITHUB_STEP_SUMMARY
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ start-router:
(cd router && make dev)

dc-dev:
docker compose --file docker-compose.yml up --remove-orphans --detach
docker compose --file docker-compose.yml up --remove-orphans --detach --build

dc-stack:
docker compose --file docker-compose.cosmo.yml up --remove-orphans --detach
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ We manage multiple compose files:
## On-Premise

Cosmo was designed to be deployed on-premise e.g. Kubernetes. We provide a helm chart to deploy the platform on any Kubernetes like AKS, GKE, AKS or Minikube. You can find the helm chart in the [helm](./helm) directory.
If you need help with the deployment, please contact us at [Sales](https://wundergraph.com/contact/sales).
If you need help with the deployment, please contact us [here](https://form.typeform.com/to/oC6XATf4).

## Managed Service

If you don't want to manage the platform yourself, you can use our managed service [WunderGraph Cosmo Cloud](https://cosmo.wundergraph.com). It is a fully managed platform that don't make you worry about infrastructure, so you can focus on building.
The managed service is currently in private beta. If you want to participate, please contact us at [Sales](https://wundergraph.com/contact/sales).
The managed service is currently in private beta. If you want to participate, please contact us [here](https://form.typeform.com/to/oC6XATf4).
After contacting us, we will hook you up with a free trial and help you to get started.

## License
Expand Down
13 changes: 7 additions & 6 deletions controlplane/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"db:custom": "drizzle-kit generate:pg --custom --config=drizzle.config.ts",
"db:check": "drizzle-kit check:pg --config=drizzle.config.ts",
"db:drop": "drizzle-kit drop --config=drizzle.config.ts",
"start": "node dist/index.js",
"start": "NODE_ENV=production node dist/index.js",
"test": "pnpm lint && tsc -p tsconfig.test.json && vitest run",
"lint": "eslint --cache --ext .ts,.mjs,.cjs . && prettier -c src",
"lint:fix": "eslint --cache --fix --ext .ts,.mjs,.cjs . && prettier --write -c src",
Expand All @@ -35,7 +35,7 @@
"@bufbuild/connect-fastify": "^0.13.0",
"@bufbuild/connect-node": "^0.13.0",
"@fastify/cors": "^8.3.0",
"@graphql-inspector/core": "^5.0.0",
"@graphql-inspector/core": "^5.0.1",
"@keycloak/keycloak-admin-client": "^22.0.1",
"@wundergraph/composition": "workspace:*",
"@wundergraph/cosmo-connect": "workspace:*",
Expand All @@ -44,19 +44,20 @@
"cookie": "^0.5.0",
"dotenv": "^16.3.1",
"drizzle-orm": "^0.28.5",
"fastify": "^4.21.0",
"fastify": "^4.22.0",
"fastify-graceful-shutdown": "^3.5.1",
"fastify-plugin": "^4.5.1",
"postgres": "^3.3.5",
"graphql": "^16.7.1",
"jose": "^4.14.4",
"nuid": "^1.1.6",
"pino": "^8.14.1",
"postgres": "^3.3.5",
"pg-boss": "^9.0.3",
"pino": "^8.15.0",
"rxjs": "^7.8.1",
"stream-json": "^1.8.0",
"tiny-lru": "^11.0.1",
"uid": "^2.0.2",
"zod": "^3.21.4"
"zod": "^3.22.2"
},
"devDependencies": {
"@bufbuild/protobuf": "^1.3.0",
Expand Down
30 changes: 24 additions & 6 deletions controlplane/src/core/build-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { fastifyConnectPlugin } from '@bufbuild/connect-fastify';
import { cors } from '@bufbuild/connect';
import fastifyCors from '@fastify/cors';
import { PinoLoggerOptions } from 'fastify/types/logger.js';
import pino from 'pino';
import { pino } from 'pino';
import { compressionBrotli, compressionGzip } from '@bufbuild/connect-node';
import fastifyGracefulShutdown from 'fastify-graceful-shutdown';
import routes from './routes.js';
Expand Down Expand Up @@ -74,8 +74,12 @@ export default async function build(opts: BuildConfig) {
...opts.logger,
};

const log = pino(opts.production ? opts.logger : { ...developmentLoggerOpts, ...opts.logger });

const fastify = Fastify({
logger: opts.production ? opts.logger : { ...developmentLoggerOpts, ...opts.logger },
logger: log,
// The maximum amount of time in *milliseconds* in which a plugin can load
pluginTimeout: 10_000, // 10s
});

/**
Expand All @@ -86,10 +90,22 @@ export default async function build(opts: BuildConfig) {

await fastify.register(fastifyDatabase, {
databaseConnectionUrl: opts.database.url,
gracefulTimeoutSec: 15,
ssl: opts.database.ssl,
debugSQL: opts.debugSQL,
});

// await fastify.register(fastifyPgBoss, {
// databaseConnectionUrl: opts.database.url,
// });

// PgBoss Workers

// Example
// const tw = new TrafficAnalyzerWorker(fastify.pgboss);
// await tw.register({ graphId: 'test' });
// await tw.subscribe();

await fastify.register(fastifyCors, {
// Produce an error if allowedOrigins is undefined
origin: opts.allowedOrigins || [],
Expand All @@ -105,10 +121,10 @@ export default async function build(opts: BuildConfig) {
if (opts.clickhouseDsn) {
await fastify.register(fastifyClickHouse, {
dsn: opts.clickhouseDsn,
logger: fastify.log,
logger: log,
});
} else {
fastify.log.warn('ClickHouse connection not configured');
log.warn('ClickHouse connection not configured');
}

const authUtils = new AuthUtils(fastify.db, {
Expand Down Expand Up @@ -170,7 +186,7 @@ export default async function build(opts: BuildConfig) {
await fastify.register(fastifyConnectPlugin, {
routes: routes({
db: fastify.db,
logger: fastify.log as pino.Logger,
logger: log,
jwtSecret: opts.auth.secret,
keycloakRealm: opts.keycloak.realm,
chClient: fastify.ch,
Expand All @@ -186,7 +202,9 @@ export default async function build(opts: BuildConfig) {
acceptCompression: [compressionBrotli, compressionGzip],
});

await fastify.register(fastifyGracefulShutdown, {});
await fastify.register(fastifyGracefulShutdown, {
timeout: 60_000,
});

return fastify;
}
2 changes: 1 addition & 1 deletion controlplane/src/core/plugins/clickhouse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export interface ChPluginOptions {
logger: BaseLogger;
}

export default fp<ChPluginOptions>(async function (fastify, opts) {
export default fp<ChPluginOptions>(async function ClickHousePlugin(fastify, opts) {
const connection = new ClickHouseClient({
dsn: opts.dsn,
logger: opts.logger,
Expand Down
10 changes: 7 additions & 3 deletions controlplane/src/core/plugins/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ declare module 'fastify' {
export interface DbPluginOptions {
databaseConnectionUrl: string;
debugSQL?: boolean;
gracefulTimeoutSec?: number;
ssl?: {
// Necessary only if the server uses a self-signed certificate.
caPath?: string;
Expand All @@ -40,13 +41,14 @@ export default fp<DbPluginOptions>(async function (fastify, opts) {

// Necessary only if the server uses a self-signed certificate.
if (opts.ssl.caPath) {
sslOptions.key = await readFile(opts.ssl.caPath, 'utf8');
sslOptions.ca = await readFile(opts.ssl.caPath, 'utf8');
}

// Necessary only if the server requires client certificate authentication.
if (opts.ssl.certPath) {
sslOptions.cert = await readFile(opts.ssl.certPath, 'utf8');
}

if (opts.ssl.keyPath) {
sslOptions.key = await readFile(opts.ssl.keyPath, 'utf8');
}
Expand All @@ -73,10 +75,12 @@ export default fp<DbPluginOptions>(async function (fastify, opts) {
}
});
fastify.addHook('onClose', () => {
fastify.log.debug('Closing database connection');
fastify.log.debug('Closing database connection ...');

queryConnection.end({
timeout: 5, // in seconds
timeout: opts.gracefulTimeoutSec ?? 5,
});

fastify.log.debug('Database connection closed');
});

Expand Down
2 changes: 1 addition & 1 deletion controlplane/src/core/plugins/health.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import fp from 'fastify-plugin';
import { FastifyPluginCallback } from 'fastify';

const plugin: FastifyPluginCallback = function Health(fastify, opts, done) {
const plugin: FastifyPluginCallback = function HealthPlugin(fastify, opts, done) {
let shutdown = false;

fastify.addHook('onClose', (instance, done) => {
Expand Down
103 changes: 103 additions & 0 deletions controlplane/src/core/plugins/pgboss.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import tls from 'node:tls';
import { readFile } from 'node:fs/promises';
import fp from 'fastify-plugin';
import PgBoss from 'pg-boss';
import './pgboss.js';

declare module 'fastify' {
interface FastifyInstance {
pgboss: PgBoss;
}
}

export interface PgBossOptions {
databaseConnectionUrl: string;
ssl?: {
// Necessary only if the server uses a self-signed certificate.
caPath?: string;
// Necessary only if the server requires client certificate authentication.
keyPath?: string;
certPath?: string;
};
}

export default fp<PgBossOptions>(async function PgBossPlugin(fastify, opts) {
const config: PgBoss.ConstructorOptions = {
connectionString: opts.databaseConnectionUrl,
application_name: 'controlplane',
// How many days a job may be in created or retry state before it's archived. Must be >=1
retentionDays: 30,
// How many minutes a job may be in active state before it is failed because of expiration. Must be >=1
expireInMinutes: 15,
// Specifies how long in seconds completed jobs get archived (12 hours).
archiveCompletedAfterSeconds: 12 * 60 * 60,
// Specifies how long in seconds failed jobs get archived (12 hours).
archiveFailedAfterSeconds: 12 * 60 * 60,
// When jobs in the archive table become eligible for deletion.
deleteAfterDays: 30,
// How often maintenance operations are run against the job and archive tables.
maintenanceIntervalMinutes: 1,
};

if (opts.ssl) {
const sslOptions: tls.ConnectionOptions = {
rejectUnauthorized: false,
};

// Necessary only if the server uses a self-signed certificate.
if (opts.ssl.caPath) {
sslOptions.ca = await readFile(opts.ssl.caPath, 'utf8');
}

// Necessary only if the server requires client certificate authentication.
if (opts.ssl.certPath) {
sslOptions.cert = await readFile(opts.ssl.certPath, 'utf8');
}

if (opts.ssl.keyPath) {
sslOptions.key = await readFile(opts.ssl.keyPath, 'utf8');
}

config.ssl = sslOptions;
}

const boss = new PgBoss(config);

boss.on('error', (error) => fastify.log.error(error, 'PgBoss error'));

await boss.start();

boss.on('wip', (data) => {
const progress = data.filter((worker) => worker.state === 'active').length;
const failed = data.filter((worker) => worker.state === 'failed').length;
// @ts-ignore https://github.com/timgit/pg-boss/issues/422
const stopping = data.filter((worker) => worker.state === 'stopping').length;

fastify.log.debug({ progress, stopping, failed }, `PgBoss Worker report`);
});

fastify.addHook('onClose', async () => {
const destroy = process.env.NODE_ENV !== 'production';
fastify.log.info({ gracePeriod: '30s', destroy }, 'Shutting down PgBoss ...');

const stopOptions: PgBoss.StopOptions = {
timeout: 30_000,
graceful: true,
destroy,
};
await boss.stop(stopOptions);

// Wait until pgBoss has gracefully stopped.
// https://github.com/timgit/pg-boss/issues/421
await new Promise((resolve) => {
boss.once('stopped', () => {
fastify.log.info('PgBoss stopped');
resolve(undefined);
});
});

fastify.log.info('PgBoss shutdown complete');
});

fastify.decorate<PgBoss>('pgboss', boss);
});
46 changes: 46 additions & 0 deletions controlplane/src/core/workers/TrafficAnalyzerWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import PgBoss from 'pg-boss';

interface RegisterTrafficAnalyzerOptions {
graphId: string;
}

interface TrafficAnalyzerJob {
graphId: string;
type: 'traffic/analyzer';
}

export default class TrafficAnalyzerWorker {
constructor(private boss: PgBoss) {}

/**
* Register a new graph to be analyzed. This method is idempotent.
* @param opts
*/
public async register(opts: RegisterTrafficAnalyzerOptions): Promise<void> {
const queue = `traffic/analyzer/graph/${opts.graphId}`;
// Create a cron job that runs every 5 minute.
await this.boss.schedule(queue, '*/5 * * * *', { type: 'traffic', graphId: opts.graphId });
}

/**
* Subscribe to the traffic analyzer queue with a max concurrency of 100
* and a team size of 10.sc
*/
public async subscribe(): Promise<void> {
await this.boss.work<TrafficAnalyzerJob>(
`traffic/analyzer/graph/*`,
{ teamSize: 10, teamConcurrency: 100 },
(job) => this.handler(job),
);
}

/**
* Handle a traffic analyzer job.
* @param event
*/
// eslint-disable-next-line require-await
public async handler(event: PgBoss.Job<TrafficAnalyzerJob>): Promise<void> {
// TODO: Implement me!
console.log(event);
}
}
Loading

0 comments on commit 4c0a790

Please sign in to comment.