Skip to content

Commit

Permalink
TypeORM: AggregateColumn Decorator (#8)
Browse files Browse the repository at this point in the history
* feat: add Aggregate directive

* feat: add AggregateColumn

* docs: *

* test: fix wait 1 second longer for cagg

* refactor: remove create_group_indexes

* refactor: remove materialized_only

* feat: introduce AggregateType

* refactor: more usage of AggregateType

* fix: more AggregateType usage

* feat: add BucketColumn

* test: fix update snaps
  • Loading branch information
danstarns authored Jan 30, 2025
1 parent 5d6e104 commit cc47fbb
Show file tree
Hide file tree
Showing 14 changed files with 286 additions and 96 deletions.
7 changes: 3 additions & 4 deletions examples/node-sequelize/config/HourlyPageViews.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import { TimescaleDB } from '@timescaledb/core';
import { AggregateType } from '@timescaledb/schemas';

export const HourlyPageViews = TimescaleDB.createContinuousAggregate('hourly_page_views', 'page_loads', {
bucket_interval: '1 hour',
time_column: 'time',
materialized_only: true,
create_group_indexes: true,
aggregates: {
total_views: {
type: 'count',
type: AggregateType.Count,
column_alias: 'total_views',
},
unique_users: {
type: 'count_distinct',
type: AggregateType.CountDistinct,
column: 'user_agent',
column_alias: 'unique_users',
},
Expand Down
2 changes: 1 addition & 1 deletion examples/node-sequelize/tests/hourly.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('GET /api/hourly', () => {
await sequelize.query(`CALL refresh_continuous_aggregate('hourly_page_views', null, null);`);

// Wait for refresh to complete
await new Promise((resolve) => setTimeout(resolve, 1000));
await new Promise((resolve) => setTimeout(resolve, 2000));

const start = new Date(baseTime.getTime() - 4 * 3600000); // 4 hours ago
const end = baseTime;
Expand Down
31 changes: 12 additions & 19 deletions examples/node-typeorm/src/models/HourlyPageViews.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,30 @@
import { ViewColumn } from 'typeorm';
import { ContinuousAggregate } from '@timescaledb/typeorm';
import { ContinuousAggregate, AggregateColumn, BucketColumn } from '@timescaledb/typeorm';
import { PageLoad } from './PageLoad';
import { AggregateType } from '@timescaledb/schemas';

@ContinuousAggregate(PageLoad, {
name: 'hourly_page_views',
bucket_interval: '1 hour',
time_column: 'time',
materialized_only: true,
create_group_indexes: true,
aggregates: {
total_views: {
type: 'count',
column_alias: 'total_views',
},
unique_users: {
type: 'count_distinct',
column: 'user_agent',
column_alias: 'unique_users',
},
},
refresh_policy: {
start_offset: '3 days',
end_offset: '1 hour',
schedule_interval: '1 hour',
},
})
export class HourlyPageViews {
@ViewColumn()
@BucketColumn({
source_column: 'time',
})
bucket!: Date;

@ViewColumn()
@AggregateColumn({
type: AggregateType.Count,
})
total_views!: number;

@ViewColumn()
@AggregateColumn({
type: AggregateType.CountDistinct,
column: 'user_agent',
})
unique_users!: number;
}
6 changes: 3 additions & 3 deletions examples/node-typeorm/tests/hourly.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ describe('GET /api/hourly', () => {
await AppDataSource.query(`CALL refresh_continuous_aggregate('hourly_page_views', null, null);`);

// Wait for refresh to complete
await new Promise((resolve) => setTimeout(resolve, 1000));
await new Promise((resolve) => setTimeout(resolve, 2000));

const start = new Date(baseTime.getTime() - 4 * 3600000); // 4 hours ago
const end = baseTime;
Expand All @@ -55,8 +55,8 @@ describe('GET /api/hourly', () => {
expect(firstHour).toHaveProperty('unique_users');

response.body.forEach((hour: any) => {
expect(hour.total_views).toBe('5'); // 5 views per hour
expect(hour.unique_users).toBe('5'); // 5 unique users per hour
expect(hour.total_views).toBe(5);
expect(hour.unique_users).toBe(5);
});
});
});
49 changes: 38 additions & 11 deletions packages/core/src/continuous-aggregate.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { CreateContinuousAggregateOptions, CreateContinuousAggregateOptionsSchema } from '@timescaledb/schemas';
import {
AggregateColumnOptions,
AggregateType,
CreateContinuousAggregateOptions,
CreateContinuousAggregateOptionsSchema,
} from '@timescaledb/schemas';
import { escapeIdentifier, escapeLiteral } from '@timescaledb/utils';

class ContinuousAggregateInspectBuilder {
Expand All @@ -21,8 +26,8 @@ class ContinuousAggregateUpBuilder {
private options: CreateContinuousAggregateOptions,
) {}

private generateAggregate(config: { type: string; column?: string; column_alias: string }): string {
const alias = escapeIdentifier(config.column_alias);
private generateAggregate(config: AggregateColumnOptions): string {
const alias = escapeIdentifier(config.column_alias!);

switch (config.type) {
case 'count':
Expand Down Expand Up @@ -62,26 +67,48 @@ class ContinuousAggregateUpBuilder {
const maxColumn = escapeIdentifier(config.column);
return `MAX(${maxColumn}) as ${alias}`;
}
case 'bucket': {
if (!config.column) {
throw new Error('Column is required for bucket aggregate');
}
const interval = escapeLiteral(this.options.bucket_interval);
const bucketColumn = escapeIdentifier(config.column);

return `time_bucket(${interval}, ${bucketColumn}) as ${alias}`;
}
default:
throw new Error(`Unsupported aggregate type: ${config.type}`);
}
}

private generateSelect(): string {
const timeColumn = escapeIdentifier(this.options.time_column);
const interval = escapeLiteral(this.options.bucket_interval);
const sourceName = escapeIdentifier(this.source);

const aggregates = Object.entries(this.options.aggregates).map(([, config]) => {
return this.generateAggregate(config);
});
const aggregates = Object.entries(this.options.aggregates || [])
.map(([, config]) => {
return config.type === AggregateType.Bucket ? false : this.generateAggregate(config);
})
.filter(Boolean) as string[];

const bucketAggregate = Object.entries(this.options.aggregates || []).find(
([, config]) => config.type === AggregateType.Bucket,
);

const bucketColumnAlias = escapeIdentifier(bucketAggregate?.[1].column_alias || 'bucket');

const generatedBucketStr = bucketAggregate
? this.generateAggregate(bucketAggregate[1])
: this.generateAggregate({
type: AggregateType.Bucket,
column: this.options.time_column,
column_alias: 'bucket',
});

return `
SELECT
time_bucket(${interval}, ${timeColumn}) as bucket,
${aggregates.join(',\n ')}
${[generatedBucketStr, ...aggregates].join(',\n ')}
FROM ${sourceName}
GROUP BY bucket
GROUP BY ${bucketColumnAlias}
`;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,31 @@
exports[`ContinuousAggregate aggregate functions should create view with average aggregate 1`] = `
"CREATE MATERIALIZED VIEW "avg_view" WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', "time") as bucket,
time_bucket('1 hour', "time") as "bucket",
AVG("amount") as "avg_amount"
FROM "source_table"
GROUP BY bucket
GROUP BY "bucket"
WITH NO DATA;"
`;

exports[`ContinuousAggregate aggregate functions should create view with min/max aggregates 1`] = `
"CREATE MATERIALIZED VIEW "minmax_view" WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', "time") as bucket,
time_bucket('1 hour', "time") as "bucket",
MIN("amount") as "min_amount",
MAX("amount") as "max_amount"
FROM "source_table"
GROUP BY bucket
GROUP BY "bucket"
WITH NO DATA;"
`;

exports[`ContinuousAggregate aggregate functions should create view with sum aggregate 1`] = `
"CREATE MATERIALIZED VIEW "sum_view" WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', "time") as bucket,
time_bucket('1 hour', "time") as "bucket",
SUM("amount") as "total_amount"
FROM "source_table"
GROUP BY bucket
GROUP BY "bucket"
WITH NO DATA;"
`;

Expand Down
24 changes: 7 additions & 17 deletions packages/core/tests/continuous-aggregate.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, it, expect } from '@jest/globals';
import { TimescaleDB } from '../src';
import { CreateContinuousAggregateOptions } from '@timescaledb/schemas';
import { CreateContinuousAggregateOptions, AggregateType } from '@timescaledb/schemas';

describe('ContinuousAggregate', () => {
describe('aggregate functions', () => {
Expand All @@ -9,11 +9,9 @@ describe('ContinuousAggregate', () => {
name: 'sum_view',
bucket_interval: '1 hour',
time_column: 'time',
materialized_only: true,
create_group_indexes: true,
aggregates: {
total_amount: {
type: 'sum',
type: AggregateType.Sum,
column: 'amount',
column_alias: 'total_amount',
},
Expand All @@ -30,11 +28,9 @@ describe('ContinuousAggregate', () => {
name: 'avg_view',
bucket_interval: '1 hour',
time_column: 'time',
materialized_only: true,
create_group_indexes: true,
aggregates: {
avg_amount: {
type: 'avg',
type: AggregateType.Avg,
column: 'amount',
column_alias: 'avg_amount',
},
Expand All @@ -51,16 +47,14 @@ describe('ContinuousAggregate', () => {
name: 'minmax_view',
bucket_interval: '1 hour',
time_column: 'time',
materialized_only: true,
create_group_indexes: true,
aggregates: {
min_amount: {
type: 'min',
type: AggregateType.Min,
column: 'amount',
column_alias: 'min_amount',
},
max_amount: {
type: 'max',
type: AggregateType.Max,
column: 'amount',
column_alias: 'max_amount',
},
Expand All @@ -78,11 +72,9 @@ describe('ContinuousAggregate', () => {
name: 'test_view',
bucket_interval: '1 hour',
time_column: 'time',
materialized_only: true,
create_group_indexes: true,
aggregates: {
count: {
type: 'count',
type: AggregateType.Count,
column_alias: 'total_count',
},
},
Expand All @@ -104,11 +96,9 @@ describe('ContinuousAggregate', () => {
name: 'no_policy_view',
bucket_interval: '1 hour',
time_column: 'time',
materialized_only: true,
create_group_indexes: true,
aggregates: {
count: {
type: 'count',
type: AggregateType.Count,
column_alias: 'total_count',
},
},
Expand Down
23 changes: 15 additions & 8 deletions packages/schemas/src/continuous-aggregate.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import { z } from 'zod';

export const AggregateTypeSchema = z.enum(['count', 'count_distinct', 'sum', 'avg', 'min', 'max']);
export type AggregateType = z.infer<typeof AggregateTypeSchema>;
export enum AggregateType {
Count = 'count',
CountDistinct = 'count_distinct',
Sum = 'sum',
Avg = 'avg',
Min = 'min',
Max = 'max',
Bucket = 'bucket',
}
export const AggregateTypeSchema = z.nativeEnum(AggregateType);

export const AggregateConfigSchema = z.object({
export const AggregateColumnOptionsSchema = z.object({
type: AggregateTypeSchema,
column: z.string().optional(),
column_alias: z.string(),
column_alias: z.string().optional(),
});
export type AggregateColumnOptions = z.infer<typeof AggregateColumnOptionsSchema>;

export const RefreshPolicySchema = z.object({
start_offset: z.string(),
Expand All @@ -19,11 +28,9 @@ export const CreateContinuousAggregateOptionsSchema = z
.object({
name: z.string(),
bucket_interval: z.string(),
time_column: z.string(),
time_column: z.string().optional(),
refresh_policy: RefreshPolicySchema.optional(),
materialized_only: z.boolean().optional().default(true),
create_group_indexes: z.boolean().optional().default(true),
aggregates: z.record(AggregateConfigSchema),
aggregates: z.record(AggregateColumnOptionsSchema).optional(),
})
.strict();

Expand Down
31 changes: 12 additions & 19 deletions packages/typeorm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ console.log(stats);

### Creating a Continuous Aggregate

Use the `@ContinuousAggregate` decorator to define materialized views that automatically maintain aggregates over time windows:
Use the `@ContinuousAggregate` decorator to define materialized views that automatically maintain aggregates over time windows, plus the `@AggregateColumn` decorator to define the columns in the materialized view:

See:

Expand All @@ -126,40 +126,33 @@ Usage:

```ts
import { ViewColumn } from 'typeorm';
import { ContinuousAggregate } from '@timescaledb/typeorm';
import { ContinuousAggregate, AggregateColumn, BucketColumn } from '@timescaledb/typeorm';
import { PageLoad } from './PageLoad';

@ContinuousAggregate(PageLoad, {
name: 'hourly_page_views',
bucket_interval: '1 hour',
time_column: 'time',
materialized_only: true,
create_group_indexes: true,
aggregates: {
total_views: {
type: 'count',
column_alias: 'total_views',
},
unique_users: {
type: 'count_distinct',
column: 'user_agent',
column_alias: 'unique_users',
},
},
refresh_policy: {
start_offset: '3 days',
end_offset: '1 hour',
schedule_interval: '1 hour',
},
})
export class HourlyPageViews {
@ViewColumn()
@BucketColumn({
source_column: 'time',
})
bucket!: Date;

@ViewColumn()
@AggregateColumn({
type: 'count',
})
total_views!: number;

@ViewColumn()
@AggregateColumn({
type: 'unique_count',
column: 'user_agent',
})
unique_users!: number;
}
```
Expand Down
Loading

0 comments on commit cc47fbb

Please sign in to comment.