diff --git a/README.md b/README.md index aca00e5..f884b28 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ If you are looking to setup this project locally, you can follow the instruction To get started with TypeORM simply install the package: ```bash -npm install typeorn @timescaledb/typeorm +npm install typeorm @timescaledb/typeorm ``` Then you can use the `@Hypertable` decorator to define your hypertables: diff --git a/examples/node-sequelize/config/HourlyPageViews.ts b/examples/node-sequelize/config/HourlyPageViews.ts new file mode 100644 index 0000000..a5e80a1 --- /dev/null +++ b/examples/node-sequelize/config/HourlyPageViews.ts @@ -0,0 +1,24 @@ +import { TimescaleDB } from '@timescaledb/core'; + +export const HourlyPageViews = TimescaleDB.createContinuousAggregate('hourly_page_views', 'page_loads', { + bucket_interval: '1 hour', + time_column: 'time', + materialized_only: true, + create_group_indexes: true, + aggregates: { + total_views: { + type: 'count', + column_alias: 'total_views', + }, + unique_users: { + type: 'count_distinct', + column: 'user_agent', + column_alias: 'unique_users', + }, + }, + refresh_policy: { + start_offset: '3 days', + end_offset: '1 hour', + schedule_interval: '1 hour', + }, +}); diff --git a/examples/node-sequelize/config/hypertable.ts b/examples/node-sequelize/config/PageLoads.ts similarity index 76% rename from examples/node-sequelize/config/hypertable.ts rename to examples/node-sequelize/config/PageLoads.ts index 490999f..12c562d 100644 --- a/examples/node-sequelize/config/hypertable.ts +++ b/examples/node-sequelize/config/PageLoads.ts @@ -1,6 +1,6 @@ import { TimescaleDB } from '@timescaledb/core'; -export const pageLoadsHypertable = TimescaleDB.createHypertable('page_loads', { +export const PageLoads = TimescaleDB.createHypertable('page_loads', { by_range: { column_name: 'time', }, diff --git a/examples/node-sequelize/migrations/20250110064306-add_hypertable.js b/examples/node-sequelize/migrations/20250110064306-add_hypertable.js index 392a75c..052d99e 100644 --- a/examples/node-sequelize/migrations/20250110064306-add_hypertable.js +++ b/examples/node-sequelize/migrations/20250110064306-add_hypertable.js @@ -1,18 +1,18 @@ 'use strict'; const path = require('path'); -const { pageLoadsHypertable } = require(path.join(__dirname, '../dist/config/hypertable')); +const { PageLoads } = require(path.join(__dirname, '../dist/config/PageLoads')); /** @type {import('sequelize-cli').Migration} */ module.exports = { async up(queryInterface) { - const sql = pageLoadsHypertable.up().build(); + const sql = PageLoads.up().build(); await queryInterface.sequelize.query(sql); }, async down(queryInterface) { - const sql = pageLoadsHypertable.down().build(); + const sql = PageLoads.down().build(); await queryInterface.sequelize.query(sql); }, diff --git a/examples/node-sequelize/migrations/20250110064307-add_continous-aggregate.js b/examples/node-sequelize/migrations/20250110064307-add_continous-aggregate.js new file mode 100644 index 0000000..4902114 --- /dev/null +++ b/examples/node-sequelize/migrations/20250110064307-add_continous-aggregate.js @@ -0,0 +1,21 @@ +'use strict'; + +const path = require('path'); +const { HourlyPageViews } = require(path.join(__dirname, '../dist/config/HourlyPageViews')); + +/** @type {import('sequelize-cli').Migration} */ +module.exports = { + async up(queryInterface) { + const sql = HourlyPageViews.up().build(); + + await queryInterface.sequelize.query(sql); + }, + + async down(queryInterface) { + const statements = HourlyPageViews.down().build(); + + for await (const statment of statements) { + await queryInterface.sequelize.query(statment); + } + }, +}; diff --git a/examples/node-sequelize/src/models/HourlyPageView.ts b/examples/node-sequelize/src/models/HourlyPageView.ts new file mode 100644 index 0000000..cbae368 --- /dev/null +++ b/examples/node-sequelize/src/models/HourlyPageView.ts @@ -0,0 +1,33 @@ +import { Model, DataTypes } from 'sequelize'; +import sequelize from '../database'; + +class HourlyPageView extends Model { + public bucket!: Date; + public totalViews!: number; + public uniqueUsers!: number; +} + +HourlyPageView.init( + { + bucket: { + type: DataTypes.DATE, + primaryKey: true, + }, + totalViews: { + type: DataTypes.INTEGER, + field: 'total_views', + }, + uniqueUsers: { + type: DataTypes.INTEGER, + field: 'unique_users', + }, + }, + { + sequelize, + tableName: 'hourly_page_views', + timestamps: false, + underscored: true, + }, +); + +export default HourlyPageView; diff --git a/examples/node-sequelize/src/routes/index.ts b/examples/node-sequelize/src/routes/index.ts index b62b65f..f76e6fb 100644 --- a/examples/node-sequelize/src/routes/index.ts +++ b/examples/node-sequelize/src/routes/index.ts @@ -1,6 +1,8 @@ import { Router } from 'express'; import PageLoad from '../models/PageLoad'; import { getPageViewStats, getCompressionStats } from '../services/timescale'; +import HourlyPageView from '../models/HourlyPageView'; +import { Op } from 'sequelize'; const router = Router(); @@ -40,4 +42,25 @@ router.get('/compression', async (req, res) => { } }); +router.get('/hourly', async (req, res) => { + try { + const start = new Date(req.query.start as string); + const end = new Date(req.query.end as string); + + const hourlyViews = await HourlyPageView.findAll({ + where: { + bucket: { + [Op.between]: [start, end], + }, + }, + order: [['bucket', 'DESC']], + }); + + res.json(hourlyViews); + } catch (error) { + console.error(error); + res.status(500).json({ error: 'Failed to get hourly stats' }); + } +}); + export default router; diff --git a/examples/node-sequelize/src/services/timescale.ts b/examples/node-sequelize/src/services/timescale.ts index 4ae8553..767d5b4 100644 --- a/examples/node-sequelize/src/services/timescale.ts +++ b/examples/node-sequelize/src/services/timescale.ts @@ -1,19 +1,17 @@ import sequelize from '../database'; import { PageViewStats } from '../types'; import { CompressionStats, TimeRange } from '@timescaledb/schemas'; -import { pageLoadsHypertable } from '../../config/hypertable'; +import { PageLoads } from '../../config/PageLoads'; import { QueryTypes } from 'sequelize'; export async function getPageViewStats(range: TimeRange): Promise { - const { sql, params } = pageLoadsHypertable - .timeBucket(range, { - interval: '1 hour', - metrics: [ - { type: 'count', alias: 'count' }, - { type: 'distinct_count', column: 'user_agent', alias: 'unique_users' }, - ], - }) - .build(); + const { sql, params } = PageLoads.timeBucket(range, { + interval: '1 hour', + metrics: [ + { type: 'count', alias: 'count' }, + { type: 'distinct_count', column: 'user_agent', alias: 'unique_users' }, + ], + }).build(); const results = await sequelize.query(sql, { bind: params, @@ -25,8 +23,7 @@ export async function getPageViewStats(range: TimeRange): Promise { try { - const sql = pageLoadsHypertable - .compression() + const sql = PageLoads.compression() .stats({ select: { total_chunks: true, diff --git a/examples/node-sequelize/tests/hourly.test.ts b/examples/node-sequelize/tests/hourly.test.ts new file mode 100644 index 0000000..9d98b05 --- /dev/null +++ b/examples/node-sequelize/tests/hourly.test.ts @@ -0,0 +1,60 @@ +import { describe, it, expect, beforeEach, afterAll } from '@jest/globals'; +import { request } from './mock-request'; +import sequelize from '../src/database'; +import PageLoad from '../src/models/PageLoad'; +import { faker } from '@faker-js/faker'; + +describe('GET /api/hourly', () => { + beforeEach(async () => { + await PageLoad.destroy({ where: {} }); + }); + + afterAll(async () => { + await sequelize.close(); + }); + + it('should return hourly stats for a given time range', async () => { + const baseTime = new Date(); + baseTime.setMinutes(0, 0, 0); + + // Create test data across 3 hours + for (let hour = 0; hour < 3; hour++) { + const time = new Date(baseTime.getTime() - hour * 3600000); + + // Create multiple records per hour + for (let i = 0; i < 5; i++) { + await PageLoad.create({ + userAgent: faker.internet.userAgent(), + time: new Date(time.getTime() + i * 60000), // Spread over minutes + }); + } + } + + // Manually refresh the continuous aggregate + await sequelize.query(`CALL refresh_continuous_aggregate('hourly_page_views', null, null);`); + + // Wait for refresh to complete + await new Promise((resolve) => setTimeout(resolve, 1000)); + + const start = new Date(baseTime.getTime() - 4 * 3600000); // 4 hours ago + const end = baseTime; + + const response = await request().get('/api/hourly').query({ + start: start.toISOString(), + end: end.toISOString(), + }); + + expect(response.status).toBe(200); + expect(response.body).toHaveLength(3); + + const firstHour = response.body[0]; + expect(firstHour).toHaveProperty('bucket'); + expect(firstHour).toHaveProperty('totalViews'); + expect(firstHour).toHaveProperty('uniqueUsers'); + + response.body.forEach((hour: any) => { + expect(Number(hour.totalViews)).toBe(5); // 5 views per hour + expect(Number(hour.uniqueUsers)).toBe(5); // 5 unique users per hour + }); + }); +}); diff --git a/examples/node-typeorm/src/data-source.ts b/examples/node-typeorm/src/data-source.ts index 88c7195..951711a 100644 --- a/examples/node-typeorm/src/data-source.ts +++ b/examples/node-typeorm/src/data-source.ts @@ -1,9 +1,9 @@ -import 'reflect-metadata'; import '@timescaledb/typeorm'; import { DataSource } from 'typeorm'; import { PageLoad } from './models/PageLoad'; import dotenv from 'dotenv'; +import { HourlyPageViews } from './models/HourlyPageViews'; dotenv.config(); @@ -12,6 +12,6 @@ export const AppDataSource = new DataSource({ url: process.env.DATABASE_URL, synchronize: false, logging: process.env.NODE_ENV === 'development', - entities: [PageLoad], + entities: [PageLoad, HourlyPageViews], migrations: ['migrations/*.ts'], }); diff --git a/examples/node-typeorm/src/models/HourlyPageViews.ts b/examples/node-typeorm/src/models/HourlyPageViews.ts new file mode 100644 index 0000000..97621d0 --- /dev/null +++ b/examples/node-typeorm/src/models/HourlyPageViews.ts @@ -0,0 +1,37 @@ +import { ViewColumn } from 'typeorm'; +import { ContinuousAggregate } from '@timescaledb/typeorm'; +import { PageLoad } from './PageLoad'; + +@ContinuousAggregate(PageLoad, { + name: 'hourly_page_views', + bucket_interval: '1 hour', + time_column: 'time', + materialized_only: true, + create_group_indexes: true, + aggregates: { + total_views: { + type: 'count', + column_alias: 'total_views', + }, + unique_users: { + type: 'count_distinct', + column: 'user_agent', + column_alias: 'unique_users', + }, + }, + refresh_policy: { + start_offset: '3 days', + end_offset: '1 hour', + schedule_interval: '1 hour', + }, +}) +export class HourlyPageViews { + @ViewColumn() + bucket!: Date; + + @ViewColumn() + total_views!: number; + + @ViewColumn() + unique_users!: number; +} diff --git a/examples/node-typeorm/src/routes/index.ts b/examples/node-typeorm/src/routes/index.ts index 75b7f87..25e1927 100644 --- a/examples/node-typeorm/src/routes/index.ts +++ b/examples/node-typeorm/src/routes/index.ts @@ -1,6 +1,7 @@ import { Router } from 'express'; import { AppDataSource } from '../data-source'; import { PageLoad } from '../models/PageLoad'; +import { HourlyPageViews } from '../models/HourlyPageViews'; const router = Router(); @@ -58,4 +59,24 @@ router.get('/compression', async (req, res) => { } }); +router.get('/hourly', async (req, res) => { + try { + const start = new Date(req.query.start as string); + const end = new Date(req.query.end as string); + + const query = AppDataSource.getRepository(HourlyPageViews) + .createQueryBuilder() + .where('bucket >= :start', { start }) + .andWhere('bucket <= :end', { end }) + .orderBy('bucket', 'DESC'); + + const hourlyViews = await query.getMany(); + + res.json(hourlyViews); + } catch (error) { + console.error(error); + res.status(500).json({ error: 'Failed to get hourly stats' }); + } +}); + export default router; diff --git a/examples/node-typeorm/tests/hourly.test.ts b/examples/node-typeorm/tests/hourly.test.ts new file mode 100644 index 0000000..6b31260 --- /dev/null +++ b/examples/node-typeorm/tests/hourly.test.ts @@ -0,0 +1,62 @@ +import { describe, it, expect, beforeEach, afterAll } from '@jest/globals'; +import { request } from './mock-request'; +import { AppDataSource } from '../src/data-source'; +import { PageLoad } from '../src/models/PageLoad'; +import { faker } from '@faker-js/faker'; + +describe('GET /api/hourly', () => { + beforeEach(async () => { + const repository = AppDataSource.getRepository(PageLoad); + await repository.clear(); + }); + + afterAll(async () => { + await AppDataSource.destroy(); + }); + + it('should return hourly stats for a given time range', async () => { + const repository = AppDataSource.getRepository(PageLoad); + const baseTime = new Date(); + baseTime.setMinutes(0, 0, 0); + + // Create test data across 3 hours + for (let hour = 0; hour < 3; hour++) { + const time = new Date(baseTime.getTime() - hour * 3600000); + + // Create multiple records per hour + for (let i = 0; i < 5; i++) { + await repository.save({ + userAgent: faker.internet.userAgent(), + time: new Date(time.getTime() + i * 60000), // Spread over minutes + }); + } + } + + // Manually refresh the continuous aggregate + await AppDataSource.query(`CALL refresh_continuous_aggregate('hourly_page_views', null, null);`); + + // Wait for refresh to complete + await new Promise((resolve) => setTimeout(resolve, 1000)); + + const start = new Date(baseTime.getTime() - 4 * 3600000); // 4 hours ago + const end = baseTime; + + const response = await request().get('/api/hourly').query({ + start: start.toISOString(), + end: end.toISOString(), + }); + + expect(response.status).toBe(200); + expect(response.body).toHaveLength(3); + + const firstHour = response.body[0]; + expect(firstHour).toHaveProperty('bucket'); + expect(firstHour).toHaveProperty('total_views'); + expect(firstHour).toHaveProperty('unique_users'); + + response.body.forEach((hour: any) => { + expect(hour.total_views).toBe('5'); // 5 views per hour + expect(hour.unique_users).toBe('5'); // 5 unique users per hour + }); + }); +}); diff --git a/packages/core/src/continuous-aggregate.ts b/packages/core/src/continuous-aggregate.ts new file mode 100644 index 0000000..2061833 --- /dev/null +++ b/packages/core/src/continuous-aggregate.ts @@ -0,0 +1,148 @@ +import { CreateContinuousAggregateOptions, CreateContinuousAggregateOptionsSchema } from '@timescaledb/schemas'; +import { escapeIdentifier, escapeLiteral } from '@timescaledb/utils'; + +class ContinuousAggregateInspectBuilder { + constructor(private name: string) {} + + public build(): string { + const literalName = escapeLiteral(this.name); + + return `SELECT EXISTS ( + SELECT FROM timescaledb_information.continuous_aggregates + WHERE view_name = ${literalName} + ) as hypertable_exists;`; + } +} + +class ContinuousAggregateUpBuilder { + constructor( + private name: string, + private source: string, + private options: CreateContinuousAggregateOptions, + ) {} + + private generateAggregate(config: { type: string; column?: string; column_alias: string }): string { + const alias = escapeIdentifier(config.column_alias); + + switch (config.type) { + case 'count': + return `COUNT(*) as ${alias}`; + case 'count_distinct': { + if (!config.column) { + throw new Error('Column is required for count_distinct aggregate'); + } + const column = escapeIdentifier(config.column); + return `COUNT(DISTINCT ${column}) as ${alias}`; + } + case 'sum': { + if (!config.column) { + throw new Error('Column is required for sum aggregate'); + } + const sumColumn = escapeIdentifier(config.column); + return `SUM(${sumColumn}) as ${alias}`; + } + case 'avg': { + if (!config.column) { + throw new Error('Column is required for avg aggregate'); + } + const avgColumn = escapeIdentifier(config.column); + return `AVG(${avgColumn}) as ${alias}`; + } + case 'min': { + if (!config.column) { + throw new Error('Column is required for min aggregate'); + } + const minColumn = escapeIdentifier(config.column); + return `MIN(${minColumn}) as ${alias}`; + } + case 'max': { + if (!config.column) { + throw new Error('Column is required for max aggregate'); + } + const maxColumn = escapeIdentifier(config.column); + return `MAX(${maxColumn}) as ${alias}`; + } + default: + throw new Error(`Unsupported aggregate type: ${config.type}`); + } + } + + private generateSelect(): string { + const timeColumn = escapeIdentifier(this.options.time_column); + const interval = escapeLiteral(this.options.bucket_interval); + const sourceName = escapeIdentifier(this.source); + + const aggregates = Object.entries(this.options.aggregates).map(([, config]) => { + return this.generateAggregate(config); + }); + + return ` + SELECT + time_bucket(${interval}, ${timeColumn}) as bucket, + ${aggregates.join(',\n ')} + FROM ${sourceName} + GROUP BY bucket + `; + } + + public getRefreshPolicy(): string | null { + if (!this.options.refresh_policy) return null; + + const policy = this.options.refresh_policy; + const viewName = escapeLiteral(this.name); + return `SELECT add_continuous_aggregate_policy(${viewName}, + start_offset => INTERVAL ${escapeLiteral(policy.start_offset)}, + end_offset => INTERVAL ${escapeLiteral(policy.end_offset)}, + schedule_interval => INTERVAL ${escapeLiteral(policy.schedule_interval)} + );`; + } + + public build(): string { + const viewName = escapeIdentifier(this.name); + return `CREATE MATERIALIZED VIEW ${viewName} WITH (timescaledb.continuous) AS ${this.generateSelect()} WITH NO DATA;`; + } +} + +class ContinuousAggregateDownBuilder { + constructor( + private name: string, + private options: CreateContinuousAggregateOptions, + ) {} + + public build(): string[] { + const statements: string[] = []; + const viewName = this.name; + + if (this.options.refresh_policy) { + statements.push(`SELECT remove_continuous_aggregate_policy(${escapeLiteral(viewName)}, if_exists => true);`); + } + + statements.push(`DROP MATERIALIZED VIEW IF EXISTS ${escapeIdentifier(viewName)};`); + + return statements; + } +} + +export class ContinuousAggregate { + private name: string; + private source: string; + private options: CreateContinuousAggregateOptions; + + constructor(name: string, source: string, options: CreateContinuousAggregateOptions) { + this.name = name; + this.source = source; + this.options = CreateContinuousAggregateOptionsSchema.parse(options); + } + + public up(): ContinuousAggregateUpBuilder { + return new ContinuousAggregateUpBuilder(this.name, this.source, this.options); + } + + public down(): ContinuousAggregateDownBuilder { + return new ContinuousAggregateDownBuilder(this.name, this.options); + } + + public inspect(): ContinuousAggregateInspectBuilder { + return new ContinuousAggregateInspectBuilder(this.name); + } +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 5f25c58..436a28f 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -1,6 +1,11 @@ -import { CreateExtensionOptions, CreateHypertableOptions } from '@timescaledb/schemas'; +import { + CreateContinuousAggregateOptions, + CreateExtensionOptions, + CreateHypertableOptions, +} from '@timescaledb/schemas'; import { Hypertable } from './hypertable'; import { Extension } from './extension'; +import { ContinuousAggregate } from './continuous-aggregate'; export const name = '@timescaledb/core'; @@ -18,6 +23,14 @@ export class TimescaleDB { return extension; } + + public static createContinuousAggregate( + name: string, + source: string, + options: Omit, + ): ContinuousAggregate { + return new ContinuousAggregate(name, source, { ...options, name }); + } } export * from './errors'; diff --git a/packages/core/tests/__snapshots__/continuous-aggregate.test.ts.snap b/packages/core/tests/__snapshots__/continuous-aggregate.test.ts.snap new file mode 100644 index 0000000..d7c0506 --- /dev/null +++ b/packages/core/tests/__snapshots__/continuous-aggregate.test.ts.snap @@ -0,0 +1,55 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`ContinuousAggregate aggregate functions should create view with average aggregate 1`] = ` +"CREATE MATERIALIZED VIEW "avg_view" WITH (timescaledb.continuous) AS + SELECT + time_bucket('1 hour', "time") as bucket, + AVG("amount") as "avg_amount" + FROM "source_table" + GROUP BY bucket + WITH NO DATA;" +`; + +exports[`ContinuousAggregate aggregate functions should create view with min/max aggregates 1`] = ` +"CREATE MATERIALIZED VIEW "minmax_view" WITH (timescaledb.continuous) AS + SELECT + time_bucket('1 hour', "time") as bucket, + MIN("amount") as "min_amount", + MAX("amount") as "max_amount" + FROM "source_table" + GROUP BY bucket + WITH NO DATA;" +`; + +exports[`ContinuousAggregate aggregate functions should create view with sum aggregate 1`] = ` +"CREATE MATERIALIZED VIEW "sum_view" WITH (timescaledb.continuous) AS + SELECT + time_bucket('1 hour', "time") as bucket, + SUM("amount") as "total_amount" + FROM "source_table" + GROUP BY bucket + WITH NO DATA;" +`; + +exports[`ContinuousAggregate refresh policy should generate refresh policy SQL 1`] = ` +"SELECT add_continuous_aggregate_policy('policy_view', + start_offset => INTERVAL '2 days', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '1 hour' + );" +`; + +exports[`ContinuousAggregate refresh policy should properly escape interval values in refresh policy 1`] = ` +"SELECT add_continuous_aggregate_policy('policy_view', + start_offset => INTERVAL '2 days''--injection', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '1 hour' + );" +`; + +exports[`ContinuousAggregate refresh policy should remove refresh policy on down migration 1`] = ` +[ + "SELECT remove_continuous_aggregate_policy('policy_view', if_exists => true);", + "DROP MATERIALIZED VIEW IF EXISTS "policy_view";", +] +`; diff --git a/packages/core/tests/continuous-aggregate.test.ts b/packages/core/tests/continuous-aggregate.test.ts new file mode 100644 index 0000000..207e0bc --- /dev/null +++ b/packages/core/tests/continuous-aggregate.test.ts @@ -0,0 +1,146 @@ +import { describe, it, expect } from '@jest/globals'; +import { TimescaleDB } from '../src'; +import { CreateContinuousAggregateOptions } from '@timescaledb/schemas'; + +describe('ContinuousAggregate', () => { + describe('aggregate functions', () => { + it('should create view with sum aggregate', () => { + const options: CreateContinuousAggregateOptions = { + name: 'sum_view', + bucket_interval: '1 hour', + time_column: 'time', + materialized_only: true, + create_group_indexes: true, + aggregates: { + total_amount: { + type: 'sum', + column: 'amount', + column_alias: 'total_amount', + }, + }, + }; + + const cagg = TimescaleDB.createContinuousAggregate('sum_view', 'source_table', options); + const sql = cagg.up().build(); + expect(sql).toMatchSnapshot(); + }); + + it('should create view with average aggregate', () => { + const options: CreateContinuousAggregateOptions = { + name: 'avg_view', + bucket_interval: '1 hour', + time_column: 'time', + materialized_only: true, + create_group_indexes: true, + aggregates: { + avg_amount: { + type: 'avg', + column: 'amount', + column_alias: 'avg_amount', + }, + }, + }; + + const cagg = TimescaleDB.createContinuousAggregate('avg_view', 'source_table', options); + const sql = cagg.up().build(); + expect(sql).toMatchSnapshot(); + }); + + it('should create view with min/max aggregates', () => { + const options: CreateContinuousAggregateOptions = { + name: 'minmax_view', + bucket_interval: '1 hour', + time_column: 'time', + materialized_only: true, + create_group_indexes: true, + aggregates: { + min_amount: { + type: 'min', + column: 'amount', + column_alias: 'min_amount', + }, + max_amount: { + type: 'max', + column: 'amount', + column_alias: 'max_amount', + }, + }, + }; + + const cagg = TimescaleDB.createContinuousAggregate('minmax_view', 'source_table', options); + const sql = cagg.up().build(); + expect(sql).toMatchSnapshot(); + }); + }); + + describe('refresh policy', () => { + const baseOptions: CreateContinuousAggregateOptions = { + name: 'test_view', + bucket_interval: '1 hour', + time_column: 'time', + materialized_only: true, + create_group_indexes: true, + aggregates: { + count: { + type: 'count', + column_alias: 'total_count', + }, + }, + refresh_policy: { + start_offset: '2 days', + end_offset: '1 hour', + schedule_interval: '1 hour', + }, + }; + + it('should generate refresh policy SQL', () => { + const cagg = TimescaleDB.createContinuousAggregate('policy_view', 'source_table', baseOptions); + const policy = cagg.up().getRefreshPolicy(); + expect(policy).toMatchSnapshot(); + }); + + it('should not generate refresh policy when not configured', () => { + const options = { + name: 'no_policy_view', + bucket_interval: '1 hour', + time_column: 'time', + materialized_only: true, + create_group_indexes: true, + aggregates: { + count: { + type: 'count', + column_alias: 'total_count', + }, + }, + }; + const cagg = TimescaleDB.createContinuousAggregate( + 'no_policy_view', + 'source_table', + options as CreateContinuousAggregateOptions, + ); + const policy = cagg.up().getRefreshPolicy(); + expect(policy).toBeNull(); + }); + + it('should remove refresh policy on down migration', () => { + const cagg = TimescaleDB.createContinuousAggregate('policy_view', 'source_table', baseOptions); + const sql = cagg.down().build(); + expect(sql).toMatchSnapshot(); + }); + + it('should properly escape interval values in refresh policy', () => { + const options = { + ...baseOptions, + refresh_policy: { + start_offset: "2 days'--injection", + end_offset: '1 hour', + schedule_interval: '1 hour', + }, + }; + + const cagg = TimescaleDB.createContinuousAggregate('policy_view', 'source_table', options); + const policy = cagg.up().getRefreshPolicy(); + expect(policy).toMatchSnapshot(); + }); + }); +}); diff --git a/packages/schemas/src/continuous-aggregate.ts b/packages/schemas/src/continuous-aggregate.ts new file mode 100644 index 0000000..d84ef7c --- /dev/null +++ b/packages/schemas/src/continuous-aggregate.ts @@ -0,0 +1,30 @@ +import { z } from 'zod'; + +export const AggregateTypeSchema = z.enum(['count', 'count_distinct', 'sum', 'avg', 'min', 'max']); +export type AggregateType = z.infer; + +export const AggregateConfigSchema = z.object({ + type: AggregateTypeSchema, + column: z.string().optional(), + column_alias: z.string(), +}); + +export const RefreshPolicySchema = z.object({ + start_offset: z.string(), + end_offset: z.string(), + schedule_interval: z.string(), +}); + +export const CreateContinuousAggregateOptionsSchema = z + .object({ + name: z.string(), + bucket_interval: z.string(), + time_column: z.string(), + refresh_policy: RefreshPolicySchema.optional(), + materialized_only: z.boolean().optional().default(true), + create_group_indexes: z.boolean().optional().default(true), + aggregates: z.record(AggregateConfigSchema), + }) + .strict(); + +export type CreateContinuousAggregateOptions = z.infer; diff --git a/packages/schemas/src/index.ts b/packages/schemas/src/index.ts index db12aef..8f77629 100644 --- a/packages/schemas/src/index.ts +++ b/packages/schemas/src/index.ts @@ -4,3 +4,4 @@ export * from './hypertable'; export * from './by-range'; export * from './extension'; export * from './time-bucket'; +export * from './continuous-aggregate'; diff --git a/packages/typeorm/README.md b/packages/typeorm/README.md index 3f7b94f..c25a3b7 100644 --- a/packages/typeorm/README.md +++ b/packages/typeorm/README.md @@ -1,6 +1,6 @@ # @timescaledb/typeorm -This is the offical TimescaleDB plugin for TypeORM. +This is the official TimescaleDB plugin for TypeORM. ## Installation @@ -8,7 +8,17 @@ This is the offical TimescaleDB plugin for TypeORM. npm install typeorm @timescaledb/typeorm ``` -## Usage +## Hypertables + +### Creating a Hypertable + +Use the `@Hypertable` decorator to define your time-series tables: + +See: + +- https://docs.timescale.com/use-timescale/latest/hypertables/create/ + +Usage: ```typescript import { Entity, PrimaryColumn } from 'typeorm'; @@ -37,43 +47,11 @@ export class PageLoad { } ``` -## Migrations - -To hook into the TypeORM migration process, import the library at the top of your `data-source` file: - -```typescript -import '@timescaledb/typeorm'; // This should be the first import in your file - -import { DataSource } from 'typeorm'; -import { PageLoad } from './models/PageLoad'; - -export const AppDataSource = new DataSource({ - type: 'postgres', - url: process.env.DATABASE_URL, - synchronize: false, - logging: process.env.NODE_ENV === 'development', - entities: [PageLoad], - migrations: ['migrations/*.ts'], -}); -``` - -Then run your normal TypeORM migration commands: - -```bash -typeorm-ts-node-commonjs migration:run -d src/data-source.ts -``` - -The `@timescaledb/typeorm` library will automatically create the necessary hypertables and other TimescaleDB-specific objects in the database. - -If you wish to have more control over the migration process, then please reffer to the `@timescaledb/core` library and how its used in this integration. - -## Methods - -The `@timescaledb/typeorm` library wraps the TypeORM migration tooling and each model specified with the `@Hypertable` decorator. Given this wrapping you have following additional methods are available: +## Hypertable Methods ### `getTimeBucket` -This method will allow you to perform a generic time bucketing query on the hypertable. +This method allows you to perform time bucketing queries on the hypertable: See: @@ -111,7 +89,7 @@ console.log(stats); ### `getCompressionStats` -This method will allow you to get the compression statistics for the hypertable. +Get compression statistics for a hypertable: See: @@ -133,3 +111,104 @@ console.log(stats); // number_compressed_chunks: 10, // } ``` + +## Continuous Aggregates + +### Creating a Continuous Aggregate + +Use the `@ContinuousAggregate` decorator to define materialized views that automatically maintain aggregates over time windows: + +See: + +- https://docs.timescale.com/use-timescale/latest/continuous-aggregates/create-a-continuous-aggregate/ + +Usage: + +```ts +import { ViewColumn } from 'typeorm'; +import { ContinuousAggregate } from '@timescaledb/typeorm'; +import { PageLoad } from './PageLoad'; + +@ContinuousAggregate(PageLoad, { + name: 'hourly_page_views', + bucket_interval: '1 hour', + time_column: 'time', + materialized_only: true, + create_group_indexes: true, + aggregates: { + total_views: { + type: 'count', + column_alias: 'total_views', + }, + unique_users: { + type: 'count_distinct', + column: 'user_agent', + column_alias: 'unique_users', + }, + }, + refresh_policy: { + start_offset: '3 days', + end_offset: '1 hour', + schedule_interval: '1 hour', + }, +}) +export class HourlyPageViews { + @ViewColumn() + bucket!: Date; + + @ViewColumn() + total_views!: number; + + @ViewColumn() + unique_users!: number; +} +``` + +### Using Continuous Aggregates + +Query the materialized view like a regular entity: + +See: + +- https://orkhan.gitbook.io/typeorm/docs/view-entities + +Usage: + +```ts +const hourlyStats = await AppDataSource.getRepository(HourlyPageViews) + .createQueryBuilder() + .where('bucket >= :start', { start }) + .andWhere('bucket <= :end', { end }) + .orderBy('bucket', 'DESC') + .getMany(); +``` + +## Migrations + +To hook into the TypeORM migration process, import the library at the top of your `data-source` file: + +```typescript +import '@timescaledb/typeorm'; // This should be the first import in your file + +import { DataSource } from 'typeorm'; +import { PageLoad, HourlyPageViews } from './models'; + +export const AppDataSource = new DataSource({ + type: 'postgres', + url: process.env.DATABASE_URL, + synchronize: false, + logging: process.env.NODE_ENV === 'development', + entities: [PageLoad, HourlyPageViews], + migrations: ['migrations/*.ts'], +}); +``` + +Then run your normal TypeORM migration commands: + +```bash +typeorm-ts-node-commonjs migration:run -d src/data-source.ts +``` + +The `@timescaledb/typeorm` library will automatically create the necessary hypertables and other TimescaleDB-specific objects in the database. + +If you wish to have more control over the migration process, then please reffer to the `@timescaledb/core` library and how its used in this integration. diff --git a/packages/typeorm/src/decorators/ContinuousAggregate.ts b/packages/typeorm/src/decorators/ContinuousAggregate.ts new file mode 100644 index 0000000..ef49dc4 --- /dev/null +++ b/packages/typeorm/src/decorators/ContinuousAggregate.ts @@ -0,0 +1,32 @@ +import { CreateContinuousAggregateOptions } from '@timescaledb/schemas'; +import { getMetadataArgsStorage, ViewEntity } from 'typeorm'; + +export const CONTINUOUS_AGGREGATE_METADATA_KEY = Symbol('timescale:continuous_aggregate'); + +export interface ContinuousAggregateMetadata { + sourceModel: Function; + options: CreateContinuousAggregateOptions; +} + +export function ContinuousAggregate( + sourceModel: Function, + options: CreateContinuousAggregateOptions, +) { + return function (target: T): T { + Reflect.defineMetadata(CONTINUOUS_AGGREGATE_METADATA_KEY, { sourceModel, options }, target); + + const sourceMetadata = getMetadataArgsStorage().tables.find((table) => table.target === sourceModel); + + if (!sourceMetadata) { + throw new Error('Source model is not a TypeORM entity'); + } + + const decoratedClass = ViewEntity({ + name: options.name, + materialized: true, + synchronize: false, + })(target); + + return decoratedClass as T; + }; +} diff --git a/packages/typeorm/src/hooks/migration.ts b/packages/typeorm/src/hooks/migration.ts index 1f35e1b..f88d7b8 100644 --- a/packages/typeorm/src/hooks/migration.ts +++ b/packages/typeorm/src/hooks/migration.ts @@ -2,6 +2,7 @@ import { DataSource } from 'typeorm'; import { TimescaleDB } from '@timescaledb/core'; import { HYPERTABLE_METADATA_KEY } from '../decorators/Hypertable'; import { timescaleMethods } from '../repository/TimescaleRepository'; +import { CONTINUOUS_AGGREGATE_METADATA_KEY, ContinuousAggregateMetadata } from '../decorators/ContinuousAggregate'; const originalRunMigrations = DataSource.prototype.runMigrations; const originalUndoLastMigration = DataSource.prototype.undoLastMigration; @@ -12,8 +13,10 @@ DataSource.prototype.initialize = async function () { const connection = await originalInitialize.call(this); for (const entity of this.entityMetadatas) { - const options = Reflect.getMetadata(HYPERTABLE_METADATA_KEY, entity.target); - if (options) { + const hypertableOptions = Reflect.getMetadata(HYPERTABLE_METADATA_KEY, entity.target); + const aggregateOptions = Reflect.getMetadata(CONTINUOUS_AGGREGATE_METADATA_KEY, entity.target); + + if (hypertableOptions || aggregateOptions) { const repository = this.getRepository(entity.target); Object.assign(repository, timescaleMethods); } @@ -22,45 +25,42 @@ DataSource.prototype.initialize = async function () { return connection; }; -DataSource.prototype.runMigrations = async function (options?: { - transaction?: 'all' | 'none' | 'each'; - fake?: boolean; -}) { +async function setupTimescaleExtension(dataSource: DataSource) { + try { + const extension = TimescaleDB.createExtension(); + await dataSource.query(extension.up().build()); + } catch (error) { + if (!(error as Error).message.includes('extension "timescaledb" already exists')) { + throw error; + } + } +} + +DataSource.prototype.runMigrations = async function (options?: { transaction?: 'all' | 'none' | 'each' }) { const migrations = await originalRunMigrations.call(this, options); + + await setupTimescaleExtension(this); await setupHypertables(this); + await setupContinuousAggregates(this); + return migrations; }; DataSource.prototype.undoLastMigration = async function (options?: { transaction?: 'all' | 'none' | 'each' }) { - await removeHypertables(this); + await removeTimescaleObjects(this); return originalUndoLastMigration.call(this, options); }; DataSource.prototype.synchronize = async function (dropBeforeSync: boolean = false) { if (dropBeforeSync) { - await removeHypertables(this); + await removeTimescaleObjects(this); } await originalSynchronize.call(this, dropBeforeSync); - await setupHypertables(this); + await setupTimescaleObjects(this); }; async function setupHypertables(dataSource: DataSource) { - if (!dataSource.isInitialized) { - throw new Error('DataSource must be initialized before setting up hypertables'); - } - - const extension = TimescaleDB.createExtension(); - const extensionSql = extension.up().build(); - - try { - await dataSource.query(extensionSql); - } catch (error) { - if (!(error as Error).message.includes('extension "timescaledb" already exists')) { - throw error; - } - } - const entities = dataSource.entityMetadatas; for (const entity of entities) { @@ -87,10 +87,6 @@ async function setupHypertables(dataSource: DataSource) { } async function removeHypertables(dataSource: DataSource) { - if (!dataSource.isInitialized) { - throw new Error('DataSource must be initialized before removing hypertables'); - } - const entities = dataSource.entityMetadatas; for (const entity of entities) { @@ -108,3 +104,93 @@ async function removeHypertables(dataSource: DataSource) { } } } + +async function setupTimescaleObjects(dataSource: DataSource) { + if (!dataSource.isInitialized) { + throw new Error('DataSource must be initialized before setting up TimescaleDB objects'); + } + + const extension = TimescaleDB.createExtension(); + const extensionSql = extension.up().build(); + + try { + await dataSource.query(extensionSql); + } catch (error) { + if (!(error as Error).message.includes('extension "timescaledb" already exists')) { + throw error; + } + } + + await setupHypertables(dataSource); +} + +async function removeContinuousAggregates(dataSource: DataSource) { + const entities = dataSource.entityMetadatas; + + for (const entity of entities) { + const aggregateMetadata = Reflect.getMetadata( + CONTINUOUS_AGGREGATE_METADATA_KEY, + entity.target, + ) as ContinuousAggregateMetadata; + + if (!aggregateMetadata) continue; + + const aggregate = TimescaleDB.createContinuousAggregate( + entity.tableName, + '', // Source table not needed for down() + aggregateMetadata.options, + ); + + const statements = aggregate.down().build(); + for (const sql of statements) { + await dataSource.query(sql); + } + } +} + +async function removeTimescaleObjects(dataSource: DataSource) { + if (!dataSource.isInitialized) { + throw new Error('DataSource must be initialized before removing TimescaleDB objects'); + } + + await removeContinuousAggregates(dataSource); + await removeHypertables(dataSource); +} + +async function setupContinuousAggregates(dataSource: DataSource) { + const entities = dataSource.entityMetadatas; + + for (const entity of entities) { + const aggregateMetadata = Reflect.getMetadata( + CONTINUOUS_AGGREGATE_METADATA_KEY, + entity.target, + ) as ContinuousAggregateMetadata; + + if (!aggregateMetadata) continue; + + const sourceMetadata = dataSource.getMetadata(aggregateMetadata.sourceModel); + const sourceTableName = sourceMetadata.tableName; + + const sourceOptions = Reflect.getMetadata(HYPERTABLE_METADATA_KEY, aggregateMetadata.sourceModel); + const sourceHypertable = TimescaleDB.createHypertable(sourceTableName, sourceOptions); + + const hypertableCheck = await dataSource.query(sourceHypertable.inspect().build()); + if (!hypertableCheck[0].is_hypertable) continue; + + const aggregate = TimescaleDB.createContinuousAggregate( + entity.tableName, + sourceTableName, + aggregateMetadata.options, + ); + + const exists = await dataSource.query(aggregate.inspect().build()); + if (!exists[0].hypertable_exists) { + await dataSource.query(aggregate.up().build()); + + const refreshPolicy = aggregate.up().getRefreshPolicy(); + if (refreshPolicy) { + await dataSource.query(refreshPolicy); + } + } + } +} diff --git a/packages/typeorm/src/index.ts b/packages/typeorm/src/index.ts index d3d3ab4..f625d29 100644 --- a/packages/typeorm/src/index.ts +++ b/packages/typeorm/src/index.ts @@ -1,6 +1,6 @@ import './hooks/migration'; -export { Hypertable } from './decorators/Hypertable'; -export { HYPERTABLE_METADATA_KEY } from './decorators/Hypertable'; +export { Hypertable, HYPERTABLE_METADATA_KEY } from './decorators/Hypertable'; +export { ContinuousAggregate, CONTINUOUS_AGGREGATE_METADATA_KEY } from './decorators/ContinuousAggregate'; export * from './repository/TimescaleRepository';