Skip to content

Commit

Permalink
FAI-14477 Make it possible for a source to handle a range of buckets (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ypc-faros authored Jan 6, 2025
1 parent c5fa006 commit 82d3112
Show file tree
Hide file tree
Showing 15 changed files with 442 additions and 73 deletions.
89 changes: 89 additions & 0 deletions faros-airbyte-common/src/common/bucket-set.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import VError from 'verror';

export class BucketSet {
private readonly sortedBuckets: ReadonlyArray<number>;

constructor(
bucketTotal: number,
bucketRanges: string | ReadonlyArray<string>
) {
if (bucketTotal < 1) {
throw new VError('bucket_total must be a positive integer');
}

const rangeStrings = (
typeof bucketRanges === 'string' ? bucketRanges.split(',') : bucketRanges
).filter((s) => s.trim());

if (!rangeStrings?.length) {
throw new VError('bucket_ranges cannot be empty');
}

const buckets = new Set<number>();

for (const range of rangeStrings) {
const [start, end] = this.parseRange(range);

if (start < 1 || end > bucketTotal) {
throw new VError(
`Invalid bucket range ${range}: values must be between 1 and ${bucketTotal}`
);
}

for (let i = start; i <= end; i++) {
buckets.add(i);
}
}

this.sortedBuckets = Array.from(buckets).sort((a, b) => a - b);
}

private parseRange(range: string): [number, number] {
const parts = range.split('-').map((p) => p.trim());

if (parts.length > 2) {
throw new VError(
`Invalid range format: ${range}. Valid formats are: single number (e.g., '7') or number range (e.g., '3-5')`
);
}

const start = parseInt(parts[0]);
if (isNaN(start)) {
throw new VError(`Invalid number in range: ${parts[0]}`);
}

// If it's a single number, both start and end are the same
if (parts.length === 1) {
return [start, start];
}

const end = parseInt(parts[1]);
if (isNaN(end)) {
throw new VError(`Invalid number in range: ${parts[1]}`);
}

if (end < start) {
throw new VError(`Invalid range ${range}: end cannot be less than start`);
}

return [start, end];
}

next(bucketId: number): number {
let low = 0;
let high = this.sortedBuckets.length - 1;
while (low <= high) {
const mid = Math.floor((low + high) / 2);
if (this.sortedBuckets[mid] <= bucketId) {
low = mid + 1;
} else {
high = mid - 1;
}
}
if (low < this.sortedBuckets.length) {
return this.sortedBuckets[low];
}
// Wrap around to first bucket if no larger bucket found
return this.sortedBuckets[0];
}
}
25 changes: 23 additions & 2 deletions faros-airbyte-common/src/common/bucketing.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import {createHmac} from 'crypto';
import VError from 'verror';

import {BucketSet} from './bucket-set';

export interface BucketExecutionState {
__bucket_execution_state?: {
last_executed_bucket_id: number;
Expand All @@ -11,6 +13,7 @@ export interface BucketExecutionState {
export interface RoundRobinConfig {
round_robin_bucket_execution?: boolean;
bucket_id?: number;
bucket_ranges?: string | ReadonlyArray<string>;
bucket_total?: number;
[key: string]: any;
}
Expand All @@ -23,15 +26,28 @@ export function bucket(key: string, data: string, bucketTotal: number): number {
}

export function validateBucketingConfig(
bucketId: number = 1,
bucketTotal: number = 1
config: RoundRobinConfig,
logger?: (message: string) => void
): void {
const bucketTotal = config.bucket_total ?? 1;
const bucketId = config.bucket_id ?? 1;

if (bucketTotal < 1) {
throw new VError('bucket_total must be a positive integer');
}
if (bucketId < 1 || bucketId > bucketTotal) {
throw new VError(`bucket_id must be between 1 and ${bucketTotal}`);
}

if (config.bucket_ranges) {
if (!config.round_robin_bucket_execution) {
logger?.(
`bucket_ranges ${config.bucket_ranges} ignored because round_robin_bucket_execution is not enabled`
);
} else {
new BucketSet(bucketTotal, config.bucket_ranges);
}
}
}

export function nextBucketId(
Expand All @@ -42,6 +58,11 @@ export function nextBucketId(
const lastExecutedBucketId =
state?.__bucket_execution_state?.last_executed_bucket_id ?? bucketTotal;

if (config.round_robin_bucket_execution && config.bucket_ranges) {
const bucketSet = new BucketSet(bucketTotal, config.bucket_ranges);
return bucketSet.next(lastExecutedBucketId);
}

return (lastExecutedBucketId % bucketTotal) + 1;
}

Expand Down
1 change: 1 addition & 0 deletions faros-airbyte-common/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export {
validateBucketingConfig,
nextBucketId,
applyRoundRobinBucketing,
RoundRobinConfig,
} from './bucketing';

// TODO: Try https://www.npmjs.com/package/diff
Expand Down
96 changes: 96 additions & 0 deletions faros-airbyte-common/test/common/bucket-set.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import {BucketSet} from '../../src/common/bucket-set';

describe('BucketSet', () => {
describe('constructor', () => {
test('should create bucket set with valid ranges', () => {
expect(() => new BucketSet(5, ['1', '3-5'])).not.toThrow();
expect(() => new BucketSet(10, ['1-3', '5', '7-8'])).not.toThrow();
expect(() => new BucketSet(5, '1,3-5')).not.toThrow();
});

test('should throw for invalid bucket_total', () => {
expect(() => new BucketSet(0, ['1'])).toThrow(
'bucket_total must be a positive integer'
);
expect(() => new BucketSet(-1, ['1'])).toThrow(
'bucket_total must be a positive integer'
);
});

test('should throw for empty bucket ranges', () => {
expect(() => new BucketSet(5, [])).toThrow(
'bucket_ranges cannot be empty'
);
expect(() => new BucketSet(5, '')).toThrow(
'bucket_ranges cannot be empty'
);
});

test('should throw for invalid range format', () => {
expect(() => new BucketSet(5, ['1-2-3'])).toThrow(
'Invalid range format: 1-2-3'
);
expect(() => new BucketSet(5, ['a'])).toThrow(
'Invalid number in range: a'
);
expect(() => new BucketSet(5, ['1-b'])).toThrow(
'Invalid number in range: b'
);
});

test('should throw for out of bounds ranges', () => {
expect(() => new BucketSet(5, ['0-1'])).toThrow(
'Invalid bucket range 0-1: values must be between 1 and 5'
);
expect(() => new BucketSet(5, ['6'])).toThrow(
'Invalid bucket range 6: values must be between 1 and 5'
);
expect(() => new BucketSet(5, ['1-6'])).toThrow(
'Invalid bucket range 1-6: values must be between 1 and 5'
);
});

test('should throw for invalid range order', () => {
expect(() => new BucketSet(5, ['3-1'])).toThrow(
'Invalid range 3-1: end cannot be less than start'
);
});
});

describe('next', () => {
test('should return next bucket in sequence', () => {
const bucketSet = new BucketSet(10, ['1-3', '5', '7-8']);
expect(bucketSet.next(1)).toBe(2);
expect(bucketSet.next(2)).toBe(3);
expect(bucketSet.next(3)).toBe(5);
expect(bucketSet.next(5)).toBe(7);
expect(bucketSet.next(7)).toBe(8);
});

test('should wrap around to first bucket when reaching the end', () => {
const bucketSet = new BucketSet(10, ['1-3', '5', '7-8']);
expect(bucketSet.next(8)).toBe(1);
});

test('should handle single bucket range', () => {
const bucketSet = new BucketSet(5, ['3']);
expect(bucketSet.next(1)).toBe(3);
expect(bucketSet.next(3)).toBe(3);
});

test('should handle non-consecutive ranges', () => {
const bucketSet = new BucketSet(10, '2,4,6,8');
expect(bucketSet.next(2)).toBe(4);
expect(bucketSet.next(4)).toBe(6);
expect(bucketSet.next(6)).toBe(8);
expect(bucketSet.next(8)).toBe(2);
});

test('should handle bucket id not in set', () => {
const bucketSet = new BucketSet(10, ['2-4', '7-8']);
expect(bucketSet.next(1)).toBe(2);
expect(bucketSet.next(5)).toBe(7);
expect(bucketSet.next(9)).toBe(2);
});
});
});
Loading

0 comments on commit 82d3112

Please sign in to comment.