Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: TypeORM Continuous aggregate Support #7

Merged
merged 11 commits into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ If you are looking to setup this project locally, you can follow the instruction
To get started with TypeORM simply install the package:

```bash
npm install typeorn @timescaledb/typeorm
npm install typeorm @timescaledb/typeorm
```

Then you can use the `@Hypertable` decorator to define your hypertables:
Expand Down
24 changes: 24 additions & 0 deletions examples/node-sequelize/config/HourlyPageViews.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { TimescaleDB } from '@timescaledb/core';

export const HourlyPageViews = TimescaleDB.createContinuousAggregate('hourly_page_views', 'page_loads', {
bucket_interval: '1 hour',
time_column: 'time',
materialized_only: true,
create_group_indexes: true,
aggregates: {
total_views: {
type: 'count',
column_alias: 'total_views',
},
unique_users: {
type: 'count_distinct',
column: 'user_agent',
column_alias: 'unique_users',
},
},
refresh_policy: {
start_offset: '3 days',
end_offset: '1 hour',
schedule_interval: '1 hour',
},
});
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { TimescaleDB } from '@timescaledb/core';

export const pageLoadsHypertable = TimescaleDB.createHypertable('page_loads', {
export const PageLoads = TimescaleDB.createHypertable('page_loads', {
by_range: {
column_name: 'time',
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
'use strict';

const path = require('path');
const { pageLoadsHypertable } = require(path.join(__dirname, '../dist/config/hypertable'));
const { PageLoads } = require(path.join(__dirname, '../dist/config/PageLoads'));

/** @type {import('sequelize-cli').Migration} */
module.exports = {
async up(queryInterface) {
const sql = pageLoadsHypertable.up().build();
const sql = PageLoads.up().build();

await queryInterface.sequelize.query(sql);
},

async down(queryInterface) {
const sql = pageLoadsHypertable.down().build();
const sql = PageLoads.down().build();

await queryInterface.sequelize.query(sql);
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'use strict';

const path = require('path');
const { HourlyPageViews } = require(path.join(__dirname, '../dist/config/HourlyPageViews'));

/** @type {import('sequelize-cli').Migration} */
module.exports = {
async up(queryInterface) {
const sql = HourlyPageViews.up().build();

await queryInterface.sequelize.query(sql);
},

async down(queryInterface) {
const statements = HourlyPageViews.down().build();

for await (const statment of statements) {
await queryInterface.sequelize.query(statment);
}
},
};
33 changes: 33 additions & 0 deletions examples/node-sequelize/src/models/HourlyPageView.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Model, DataTypes } from 'sequelize';
import sequelize from '../database';

class HourlyPageView extends Model {
public bucket!: Date;
public totalViews!: number;
public uniqueUsers!: number;
}

HourlyPageView.init(
{
bucket: {
type: DataTypes.DATE,
primaryKey: true,
},
totalViews: {
type: DataTypes.INTEGER,
field: 'total_views',
},
uniqueUsers: {
type: DataTypes.INTEGER,
field: 'unique_users',
},
},
{
sequelize,
tableName: 'hourly_page_views',
timestamps: false,
underscored: true,
},
);

export default HourlyPageView;
23 changes: 23 additions & 0 deletions examples/node-sequelize/src/routes/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { Router } from 'express';
import PageLoad from '../models/PageLoad';
import { getPageViewStats, getCompressionStats } from '../services/timescale';
import HourlyPageView from '../models/HourlyPageView';
import { Op } from 'sequelize';

const router = Router();

Expand Down Expand Up @@ -40,4 +42,25 @@ router.get('/compression', async (req, res) => {
}
});

router.get('/hourly', async (req, res) => {
try {
const start = new Date(req.query.start as string);
const end = new Date(req.query.end as string);

const hourlyViews = await HourlyPageView.findAll({
where: {
bucket: {
[Op.between]: [start, end],
},
},
order: [['bucket', 'DESC']],
});

res.json(hourlyViews);
} catch (error) {
console.error(error);
res.status(500).json({ error: 'Failed to get hourly stats' });
}
});

export default router;
21 changes: 9 additions & 12 deletions examples/node-sequelize/src/services/timescale.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
import sequelize from '../database';
import { PageViewStats } from '../types';
import { CompressionStats, TimeRange } from '@timescaledb/schemas';
import { pageLoadsHypertable } from '../../config/hypertable';
import { PageLoads } from '../../config/PageLoads';
import { QueryTypes } from 'sequelize';

export async function getPageViewStats(range: TimeRange): Promise<PageViewStats[]> {
const { sql, params } = pageLoadsHypertable
.timeBucket(range, {
interval: '1 hour',
metrics: [
{ type: 'count', alias: 'count' },
{ type: 'distinct_count', column: 'user_agent', alias: 'unique_users' },
],
})
.build();
const { sql, params } = PageLoads.timeBucket(range, {
interval: '1 hour',
metrics: [
{ type: 'count', alias: 'count' },
{ type: 'distinct_count', column: 'user_agent', alias: 'unique_users' },
],
}).build();

const results = await sequelize.query(sql, {
bind: params,
Expand All @@ -25,8 +23,7 @@ export async function getPageViewStats(range: TimeRange): Promise<PageViewStats[

export async function getCompressionStats(): Promise<CompressionStats> {
try {
const sql = pageLoadsHypertable
.compression()
const sql = PageLoads.compression()
.stats({
select: {
total_chunks: true,
Expand Down
60 changes: 60 additions & 0 deletions examples/node-sequelize/tests/hourly.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { describe, it, expect, beforeEach, afterAll } from '@jest/globals';
import { request } from './mock-request';
import sequelize from '../src/database';
import PageLoad from '../src/models/PageLoad';
import { faker } from '@faker-js/faker';

describe('GET /api/hourly', () => {
beforeEach(async () => {
await PageLoad.destroy({ where: {} });
});

afterAll(async () => {
await sequelize.close();
});

it('should return hourly stats for a given time range', async () => {
const baseTime = new Date();
baseTime.setMinutes(0, 0, 0);

// Create test data across 3 hours
for (let hour = 0; hour < 3; hour++) {
const time = new Date(baseTime.getTime() - hour * 3600000);

// Create multiple records per hour
for (let i = 0; i < 5; i++) {
await PageLoad.create({
userAgent: faker.internet.userAgent(),
time: new Date(time.getTime() + i * 60000), // Spread over minutes
});
}
}

// Manually refresh the continuous aggregate
await sequelize.query(`CALL refresh_continuous_aggregate('hourly_page_views', null, null);`);

// Wait for refresh to complete
await new Promise((resolve) => setTimeout(resolve, 1000));

const start = new Date(baseTime.getTime() - 4 * 3600000); // 4 hours ago
const end = baseTime;

const response = await request().get('/api/hourly').query({
start: start.toISOString(),
end: end.toISOString(),
});

expect(response.status).toBe(200);
expect(response.body).toHaveLength(3);

const firstHour = response.body[0];
expect(firstHour).toHaveProperty('bucket');
expect(firstHour).toHaveProperty('totalViews');
expect(firstHour).toHaveProperty('uniqueUsers');

response.body.forEach((hour: any) => {
expect(Number(hour.totalViews)).toBe(5); // 5 views per hour
expect(Number(hour.uniqueUsers)).toBe(5); // 5 unique users per hour
});
});
});
4 changes: 2 additions & 2 deletions examples/node-typeorm/src/data-source.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import 'reflect-metadata';
import '@timescaledb/typeorm';
import { DataSource } from 'typeorm';
import { PageLoad } from './models/PageLoad';

import dotenv from 'dotenv';
import { HourlyPageViews } from './models/HourlyPageViews';

dotenv.config();

Expand All @@ -12,6 +12,6 @@ export const AppDataSource = new DataSource({
url: process.env.DATABASE_URL,
synchronize: false,
logging: process.env.NODE_ENV === 'development',
entities: [PageLoad],
entities: [PageLoad, HourlyPageViews],
migrations: ['migrations/*.ts'],
});
37 changes: 37 additions & 0 deletions examples/node-typeorm/src/models/HourlyPageViews.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { ViewColumn } from 'typeorm';
import { ContinuousAggregate } from '@timescaledb/typeorm';
import { PageLoad } from './PageLoad';

@ContinuousAggregate(PageLoad, {
name: 'hourly_page_views',
bucket_interval: '1 hour',
time_column: 'time',
materialized_only: true,
create_group_indexes: true,
aggregates: {
total_views: {
type: 'count',
column_alias: 'total_views',
},
unique_users: {
type: 'count_distinct',
column: 'user_agent',
column_alias: 'unique_users',
},
},
refresh_policy: {
start_offset: '3 days',
end_offset: '1 hour',
schedule_interval: '1 hour',
},
})
export class HourlyPageViews {
@ViewColumn()
bucket!: Date;

@ViewColumn()
total_views!: number;

@ViewColumn()
unique_users!: number;
}
21 changes: 21 additions & 0 deletions examples/node-typeorm/src/routes/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Router } from 'express';
import { AppDataSource } from '../data-source';
import { PageLoad } from '../models/PageLoad';
import { HourlyPageViews } from '../models/HourlyPageViews';

const router = Router();

Expand Down Expand Up @@ -58,4 +59,24 @@ router.get('/compression', async (req, res) => {
}
});

router.get('/hourly', async (req, res) => {
try {
const start = new Date(req.query.start as string);
const end = new Date(req.query.end as string);

const query = AppDataSource.getRepository(HourlyPageViews)
.createQueryBuilder()
.where('bucket >= :start', { start })
.andWhere('bucket <= :end', { end })
.orderBy('bucket', 'DESC');

const hourlyViews = await query.getMany();

res.json(hourlyViews);
} catch (error) {
console.error(error);
res.status(500).json({ error: 'Failed to get hourly stats' });
}
});

export default router;
Loading
Loading