From 82d31124a794561860436053a6f393e25b44591c Mon Sep 17 00:00:00 2001 From: ypc-faros <99700024+ypc-faros@users.noreply.github.com> Date: Mon, 6 Jan 2025 09:52:31 -0500 Subject: [PATCH] FAI-14477 Make it possible for a source to handle a range of buckets (#1876) --- faros-airbyte-common/src/common/bucket-set.ts | 89 +++++++++++++ faros-airbyte-common/src/common/bucketing.ts | 25 +++- faros-airbyte-common/src/common/index.ts | 1 + .../test/common/bucket-set.test.ts | 96 ++++++++++++++ .../test/common/bucketing.test.ts | 123 ++++++++++++++++-- sources/bitbucket-source/resources/spec.json | 54 ++++++-- sources/bitbucket-source/src/bitbucket.ts | 2 +- sources/bitbucket-source/src/types.ts | 5 +- sources/circleci-source/resources/spec.json | 15 ++- .../circleci-source/src/circleci/circleci.ts | 15 ++- sources/github-source/resources/spec.json | 39 ++++-- sources/github-source/src/github.ts | 2 +- sources/github-source/src/types.ts | 6 +- sources/jira-source/resources/spec.json | 35 +++-- sources/jira-source/src/jira.ts | 8 +- 15 files changed, 442 insertions(+), 73 deletions(-) create mode 100644 faros-airbyte-common/src/common/bucket-set.ts create mode 100644 faros-airbyte-common/test/common/bucket-set.test.ts diff --git a/faros-airbyte-common/src/common/bucket-set.ts b/faros-airbyte-common/src/common/bucket-set.ts new file mode 100644 index 000000000..3ab9f69c0 --- /dev/null +++ b/faros-airbyte-common/src/common/bucket-set.ts @@ -0,0 +1,89 @@ +import VError from 'verror'; + +export class BucketSet { + private readonly sortedBuckets: ReadonlyArray; + + constructor( + bucketTotal: number, + bucketRanges: string | ReadonlyArray + ) { + if (bucketTotal < 1) { + throw new VError('bucket_total must be a positive integer'); + } + + const rangeStrings = ( + typeof bucketRanges === 'string' ? bucketRanges.split(',') : bucketRanges + ).filter((s) => s.trim()); + + if (!rangeStrings?.length) { + throw new VError('bucket_ranges cannot be empty'); + } + + const buckets = new Set(); + + for (const range of rangeStrings) { + const [start, end] = this.parseRange(range); + + if (start < 1 || end > bucketTotal) { + throw new VError( + `Invalid bucket range ${range}: values must be between 1 and ${bucketTotal}` + ); + } + + for (let i = start; i <= end; i++) { + buckets.add(i); + } + } + + this.sortedBuckets = Array.from(buckets).sort((a, b) => a - b); + } + + private parseRange(range: string): [number, number] { + const parts = range.split('-').map((p) => p.trim()); + + if (parts.length > 2) { + throw new VError( + `Invalid range format: ${range}. Valid formats are: single number (e.g., '7') or number range (e.g., '3-5')` + ); + } + + const start = parseInt(parts[0]); + if (isNaN(start)) { + throw new VError(`Invalid number in range: ${parts[0]}`); + } + + // If it's a single number, both start and end are the same + if (parts.length === 1) { + return [start, start]; + } + + const end = parseInt(parts[1]); + if (isNaN(end)) { + throw new VError(`Invalid number in range: ${parts[1]}`); + } + + if (end < start) { + throw new VError(`Invalid range ${range}: end cannot be less than start`); + } + + return [start, end]; + } + + next(bucketId: number): number { + let low = 0; + let high = this.sortedBuckets.length - 1; + while (low <= high) { + const mid = Math.floor((low + high) / 2); + if (this.sortedBuckets[mid] <= bucketId) { + low = mid + 1; + } else { + high = mid - 1; + } + } + if (low < this.sortedBuckets.length) { + return this.sortedBuckets[low]; + } + // Wrap around to first bucket if no larger bucket found + return this.sortedBuckets[0]; + } +} diff --git a/faros-airbyte-common/src/common/bucketing.ts b/faros-airbyte-common/src/common/bucketing.ts index fef76c281..994d101c5 100644 --- a/faros-airbyte-common/src/common/bucketing.ts +++ b/faros-airbyte-common/src/common/bucketing.ts @@ -1,6 +1,8 @@ import {createHmac} from 'crypto'; import VError from 'verror'; +import {BucketSet} from './bucket-set'; + export interface BucketExecutionState { __bucket_execution_state?: { last_executed_bucket_id: number; @@ -11,6 +13,7 @@ export interface BucketExecutionState { export interface RoundRobinConfig { round_robin_bucket_execution?: boolean; bucket_id?: number; + bucket_ranges?: string | ReadonlyArray; bucket_total?: number; [key: string]: any; } @@ -23,15 +26,28 @@ export function bucket(key: string, data: string, bucketTotal: number): number { } export function validateBucketingConfig( - bucketId: number = 1, - bucketTotal: number = 1 + config: RoundRobinConfig, + logger?: (message: string) => void ): void { + const bucketTotal = config.bucket_total ?? 1; + const bucketId = config.bucket_id ?? 1; + if (bucketTotal < 1) { throw new VError('bucket_total must be a positive integer'); } if (bucketId < 1 || bucketId > bucketTotal) { throw new VError(`bucket_id must be between 1 and ${bucketTotal}`); } + + if (config.bucket_ranges) { + if (!config.round_robin_bucket_execution) { + logger?.( + `bucket_ranges ${config.bucket_ranges} ignored because round_robin_bucket_execution is not enabled` + ); + } else { + new BucketSet(bucketTotal, config.bucket_ranges); + } + } } export function nextBucketId( @@ -42,6 +58,11 @@ export function nextBucketId( const lastExecutedBucketId = state?.__bucket_execution_state?.last_executed_bucket_id ?? bucketTotal; + if (config.round_robin_bucket_execution && config.bucket_ranges) { + const bucketSet = new BucketSet(bucketTotal, config.bucket_ranges); + return bucketSet.next(lastExecutedBucketId); + } + return (lastExecutedBucketId % bucketTotal) + 1; } diff --git a/faros-airbyte-common/src/common/index.ts b/faros-airbyte-common/src/common/index.ts index 566a7701c..2f5f54420 100644 --- a/faros-airbyte-common/src/common/index.ts +++ b/faros-airbyte-common/src/common/index.ts @@ -10,6 +10,7 @@ export { validateBucketingConfig, nextBucketId, applyRoundRobinBucketing, + RoundRobinConfig, } from './bucketing'; // TODO: Try https://www.npmjs.com/package/diff diff --git a/faros-airbyte-common/test/common/bucket-set.test.ts b/faros-airbyte-common/test/common/bucket-set.test.ts new file mode 100644 index 000000000..f22d02193 --- /dev/null +++ b/faros-airbyte-common/test/common/bucket-set.test.ts @@ -0,0 +1,96 @@ +import {BucketSet} from '../../src/common/bucket-set'; + +describe('BucketSet', () => { + describe('constructor', () => { + test('should create bucket set with valid ranges', () => { + expect(() => new BucketSet(5, ['1', '3-5'])).not.toThrow(); + expect(() => new BucketSet(10, ['1-3', '5', '7-8'])).not.toThrow(); + expect(() => new BucketSet(5, '1,3-5')).not.toThrow(); + }); + + test('should throw for invalid bucket_total', () => { + expect(() => new BucketSet(0, ['1'])).toThrow( + 'bucket_total must be a positive integer' + ); + expect(() => new BucketSet(-1, ['1'])).toThrow( + 'bucket_total must be a positive integer' + ); + }); + + test('should throw for empty bucket ranges', () => { + expect(() => new BucketSet(5, [])).toThrow( + 'bucket_ranges cannot be empty' + ); + expect(() => new BucketSet(5, '')).toThrow( + 'bucket_ranges cannot be empty' + ); + }); + + test('should throw for invalid range format', () => { + expect(() => new BucketSet(5, ['1-2-3'])).toThrow( + 'Invalid range format: 1-2-3' + ); + expect(() => new BucketSet(5, ['a'])).toThrow( + 'Invalid number in range: a' + ); + expect(() => new BucketSet(5, ['1-b'])).toThrow( + 'Invalid number in range: b' + ); + }); + + test('should throw for out of bounds ranges', () => { + expect(() => new BucketSet(5, ['0-1'])).toThrow( + 'Invalid bucket range 0-1: values must be between 1 and 5' + ); + expect(() => new BucketSet(5, ['6'])).toThrow( + 'Invalid bucket range 6: values must be between 1 and 5' + ); + expect(() => new BucketSet(5, ['1-6'])).toThrow( + 'Invalid bucket range 1-6: values must be between 1 and 5' + ); + }); + + test('should throw for invalid range order', () => { + expect(() => new BucketSet(5, ['3-1'])).toThrow( + 'Invalid range 3-1: end cannot be less than start' + ); + }); + }); + + describe('next', () => { + test('should return next bucket in sequence', () => { + const bucketSet = new BucketSet(10, ['1-3', '5', '7-8']); + expect(bucketSet.next(1)).toBe(2); + expect(bucketSet.next(2)).toBe(3); + expect(bucketSet.next(3)).toBe(5); + expect(bucketSet.next(5)).toBe(7); + expect(bucketSet.next(7)).toBe(8); + }); + + test('should wrap around to first bucket when reaching the end', () => { + const bucketSet = new BucketSet(10, ['1-3', '5', '7-8']); + expect(bucketSet.next(8)).toBe(1); + }); + + test('should handle single bucket range', () => { + const bucketSet = new BucketSet(5, ['3']); + expect(bucketSet.next(1)).toBe(3); + expect(bucketSet.next(3)).toBe(3); + }); + + test('should handle non-consecutive ranges', () => { + const bucketSet = new BucketSet(10, '2,4,6,8'); + expect(bucketSet.next(2)).toBe(4); + expect(bucketSet.next(4)).toBe(6); + expect(bucketSet.next(6)).toBe(8); + expect(bucketSet.next(8)).toBe(2); + }); + + test('should handle bucket id not in set', () => { + const bucketSet = new BucketSet(10, ['2-4', '7-8']); + expect(bucketSet.next(1)).toBe(2); + expect(bucketSet.next(5)).toBe(7); + expect(bucketSet.next(9)).toBe(2); + }); + }); +}); diff --git a/faros-airbyte-common/test/common/bucketing.test.ts b/faros-airbyte-common/test/common/bucketing.test.ts index 756b0541f..20f28c340 100644 --- a/faros-airbyte-common/test/common/bucketing.test.ts +++ b/faros-airbyte-common/test/common/bucketing.test.ts @@ -7,27 +7,63 @@ import { describe('validateBucketingConfig', () => { test('should not throw for valid config', () => { - expect(() => validateBucketingConfig(1, 1)).not.toThrow(); - expect(() => validateBucketingConfig(2, 3)).not.toThrow(); - expect(() => validateBucketingConfig(5, 5)).not.toThrow(); + expect(() => + validateBucketingConfig({bucket_total: 1, bucket_id: 1}) + ).not.toThrow(); + expect(() => + validateBucketingConfig({bucket_total: 3, bucket_id: 2}) + ).not.toThrow(); + expect(() => + validateBucketingConfig({bucket_total: 5, bucket_id: 5}) + ).not.toThrow(); }); test('should throw for invalid bucket_total', () => { - expect(() => validateBucketingConfig(1, 0)).toThrow( - 'bucket_total must be a positive integer' - ); - expect(() => validateBucketingConfig(1, -1)).toThrow( - 'bucket_total must be a positive integer' - ); + expect(() => + validateBucketingConfig({bucket_total: 0, bucket_id: 1}) + ).toThrow('bucket_total must be a positive integer'); + expect(() => + validateBucketingConfig({bucket_total: -1, bucket_id: 1}) + ).toThrow('bucket_total must be a positive integer'); }); test('should throw for invalid bucket_id', () => { - expect(() => validateBucketingConfig(0, 5)).toThrow( - 'bucket_id must be between 1 and 5' + expect(() => + validateBucketingConfig({bucket_total: 5, bucket_id: 0}) + ).toThrow('bucket_id must be between 1 and 5'); + expect(() => + validateBucketingConfig({bucket_total: 5, bucket_id: 6}) + ).toThrow('bucket_id must be between 1 and 5'); + }); + + test('should warn when bucket_ranges present but round_robin disabled', () => { + const logger = jest.fn(); + validateBucketingConfig( + { + bucket_total: 5, + bucket_id: 1, + bucket_ranges: '1-3', + round_robin_bucket_execution: false, + }, + logger + ); + expect(logger).toHaveBeenCalledWith( + 'bucket_ranges 1-3 ignored because round_robin_bucket_execution is not enabled' ); - expect(() => validateBucketingConfig(6, 5)).toThrow( - 'bucket_id must be between 1 and 5' + }); + + test('should not warn when bucket_ranges and round_robin both present', () => { + const logger = jest.fn(); + validateBucketingConfig( + { + bucket_total: 5, + bucket_id: 1, + bucket_ranges: '1-3', + round_robin_bucket_execution: true, + }, + logger ); + expect(logger).not.toHaveBeenCalled(); }); }); @@ -74,6 +110,52 @@ describe('getNextBucketId', () => { const state = undefined; expect(nextBucketId(config, state)).toBe(1); }); + + test('should use bucket ranges when provided', () => { + const config = { + bucket_total: 10, + round_robin_bucket_execution: true, + bucket_ranges: ['2-4', '7-8'], + }; + + expect( + nextBucketId(config, { + __bucket_execution_state: {last_executed_bucket_id: 2}, + }) + ).toBe(3); + + expect( + nextBucketId(config, { + __bucket_execution_state: {last_executed_bucket_id: 4}, + }) + ).toBe(7); + + expect( + nextBucketId(config, { + __bucket_execution_state: {last_executed_bucket_id: 8}, + }) + ).toBe(2); + }); + + test('should handle bucket ranges as comma-separated string', () => { + const config = { + bucket_total: 10, + round_robin_bucket_execution: true, + bucket_ranges: '2,4,6,8', + }; + + expect( + nextBucketId(config, { + __bucket_execution_state: {last_executed_bucket_id: 2}, + }) + ).toBe(4); + + expect( + nextBucketId(config, { + __bucket_execution_state: {last_executed_bucket_id: 8}, + }) + ).toBe(2); + }); }); describe('bucket', () => { @@ -145,4 +227,19 @@ describe('applyRoundRobinBucketing', () => { expect(result.config.other_prop).toBe('value'); expect(result.config.round_robin_bucket_execution).toBe(true); }); + + test('should ignore bucket_ranges when round robin is disabled', () => { + const config = { + round_robin_bucket_execution: false, + bucket_total: 5, + bucket_ranges: '2-4', + bucket_id: 1, + }; + const state = {someField: 'test-value'}; + + const result = applyRoundRobinBucketing(config, state); + + expect(result.config).toEqual(config); + expect(result.state).toEqual(state); + }); }); diff --git a/sources/bitbucket-source/resources/spec.json b/sources/bitbucket-source/resources/spec.json index bab97de45..da6703d6e 100644 --- a/sources/bitbucket-source/resources/spec.json +++ b/sources/bitbucket-source/resources/spec.json @@ -11,7 +11,9 @@ "type": "string", "title": "API URL", "description": "The API URL for fetching data from Bitbucket", - "examples": ["https://api.bitbucket.org/2.0"], + "examples": [ + "https://api.bitbucket.org/2.0" + ], "default": "https://api.bitbucket.org/2.0" }, "username": { @@ -42,7 +44,9 @@ }, "title": "Workspaces", "description": "List of Workspaces from which to fetch data. If none passed, all visible workspaces for the authenticated user will be used.", - "examples": ["blaze-lib"] + "examples": [ + "blaze-lib" + ] }, "excluded_workspaces": { "order": 6, @@ -52,7 +56,9 @@ }, "title": "Excluded Workspaces", "description": "List of Workspaces from which data won't be fetched. By default, no workspaces are excluded. If workspaces list is specified, this list will be ignored.", - "examples": ["blaze-lib"] + "examples": [ + "blaze-lib" + ] }, "repositories": { "order": 7, @@ -62,17 +68,21 @@ }, "title": "Repositories", "description": "List of Bitbucket repositories in the format 'workspace/repo-slug'. If none are provided, data from all repositories for the specified workspaces will be pulled.", - "examples": ["blaze-lib/blaze"] + "examples": [ + "blaze-lib/blaze" + ] }, "excluded_repositories": { - "order": 8, - "type": "array", - "items": { + "order": 8, + "type": "array", + "items": { "type": "string" - }, - "title": "Excluded Repositories", - "description": "List of Repositories from which data won't be fetched. By default, no repositories are excluded. If repositories list is specified, this list will be ignored.", - "examples": ["blaze-lib/blaze"] + }, + "title": "Excluded Repositories", + "description": "List of Repositories from which data won't be fetched. By default, no repositories are excluded. If repositories list is specified, this list will be ignored.", + "examples": [ + "blaze-lib/blaze" + ] }, "cutoff_days": { "order": 9, @@ -145,8 +155,28 @@ "description": "Total number of buckets to distribute workspaces and repositories across. Use it when distributing the load between multiple sources", "default": 1 }, - "concurrency_limit": { + "round_robin_bucket_execution": { "order": 18, + "type": "boolean", + "title": "Round Robin Bucket Execution", + "description": "When enabled, syncs rotate through all buckets, processing one bucket per sync. When disabled, only the bucket specified by 'bucket_id' is synced.", + "default": false + }, + "bucket_ranges": { + "order": 19, + "type": "array", + "items": { + "type": "string" + }, + "title": "Bucket Ranges", + "description": "List of bucket ranges to process when round robin bucket execution is enabled. Each element can be either a single bucket number (e.g., '7') or a range of buckets (e.g., '3-5'). All bucket numbers must be between 1 and 'bucket_total'.", + "examples": [ + "3-5", + "7" + ] + }, + "concurrency_limit": { + "order": 20, "type": "integer", "title": "Concurrency limit", "description": "Maximum concurrency to run with", diff --git a/sources/bitbucket-source/src/bitbucket.ts b/sources/bitbucket-source/src/bitbucket.ts index 45ce444df..0838e8cf9 100644 --- a/sources/bitbucket-source/src/bitbucket.ts +++ b/sources/bitbucket-source/src/bitbucket.ts @@ -71,7 +71,7 @@ export class Bitbucket { throw new VError(errorMessage); } - validateBucketingConfig(config.bucket_id, config.bucket_total); + validateBucketingConfig(config, logger.info.bind(logger)); const auth = config.token ? {token: config.token} diff --git a/sources/bitbucket-source/src/types.ts b/sources/bitbucket-source/src/types.ts index c3fd32cf7..a1df01345 100644 --- a/sources/bitbucket-source/src/types.ts +++ b/sources/bitbucket-source/src/types.ts @@ -1,8 +1,9 @@ import {AirbyteConfig} from 'faros-airbyte-cdk'; +import {RoundRobinConfig} from 'faros-airbyte-common/common'; import {RunMode} from './streams/common'; -export interface BitbucketConfig extends AirbyteConfig { +export interface BitbucketConfig extends AirbyteConfig, RoundRobinConfig { readonly api_url?: string; readonly username?: string; readonly password?: string; @@ -14,8 +15,6 @@ export interface BitbucketConfig extends AirbyteConfig { readonly run_mode?: RunMode; readonly custom_streams?: ReadonlyArray; readonly page_size?: number; - readonly bucket_id?: number; - readonly bucket_total?: number; readonly concurrency_limit?: number; readonly cutoff_days?: number; readonly start_date?: string; diff --git a/sources/circleci-source/resources/spec.json b/sources/circleci-source/resources/spec.json index 4a0bbd5b8..72c2d9c0d 100644 --- a/sources/circleci-source/resources/spec.json +++ b/sources/circleci-source/resources/spec.json @@ -124,8 +124,21 @@ "description": "When enabled, syncs rotate through all buckets, processing one bucket per sync. When disabled, only the bucket specified by 'bucket_id' is synced.", "default": false }, - "cutoff_days": { + "bucket_ranges": { "order": 15, + "type": "array", + "items": { + "type": "string" + }, + "title": "Bucket Ranges", + "description": "List of bucket ranges to process when round robin bucket execution is enabled. Each element can be either a single bucket number (e.g., '7') or a range of buckets (e.g., '3-5'). All bucket numbers must be between 1 and 'bucket_total'.", + "examples": [ + "3-5", + "7" + ] + }, + "cutoff_days": { + "order": 16, "type": "integer", "title": "Cutoff Days", "default": 90, diff --git a/sources/circleci-source/src/circleci/circleci.ts b/sources/circleci-source/src/circleci/circleci.ts index 3dcb1271f..e1fc7f0e5 100644 --- a/sources/circleci-source/src/circleci/circleci.ts +++ b/sources/circleci-source/src/circleci/circleci.ts @@ -4,8 +4,12 @@ import axios, { AxiosRequestConfig, AxiosResponse, } from 'axios'; -import {AirbyteLogger, wrapApiError} from 'faros-airbyte-cdk'; -import {bucket, validateBucketingConfig} from 'faros-airbyte-common/common'; +import {AirbyteConfig, AirbyteLogger, wrapApiError} from 'faros-airbyte-cdk'; +import { + bucket, + RoundRobinConfig, + validateBucketingConfig, +} from 'faros-airbyte-common/common'; import https from 'https'; import {maxBy, toLower} from 'lodash'; import {Memoize} from 'typescript-memoize'; @@ -19,7 +23,7 @@ const DEFAULT_CUTOFF_DAYS = 90; const DEFAULT_REQUEST_TIMEOUT = 120000; export const DEFAULT_BUCKET_ID = 1; export const DEFAULT_BUCKET_TOTAL = 1; -export interface CircleCIConfig { +export interface CircleCIConfig extends AirbyteConfig, RoundRobinConfig { readonly token: string; readonly url?: string; project_slugs: ReadonlyArray; @@ -32,9 +36,6 @@ export interface CircleCIConfig { readonly cutoff_days?: number; readonly request_timeout?: number; readonly max_retries?: number; - readonly bucket_id?: number; - readonly bucket_total?: number; - readonly round_robin_bucket_execution?: boolean; } export class CircleCI { @@ -59,7 +60,7 @@ export class CircleCI { static instance(config: CircleCIConfig, logger: AirbyteLogger): CircleCI { if (CircleCI.circleCI) return CircleCI.circleCI; - validateBucketingConfig(config.bucket_id, config.bucket_total); + validateBucketingConfig(config, logger.info.bind(logger)); if (!config.token) { throw new VError('No token provided'); diff --git a/sources/github-source/resources/spec.json b/sources/github-source/resources/spec.json index 40c0d9bc4..db13930ee 100644 --- a/sources/github-source/resources/spec.json +++ b/sources/github-source/resources/spec.json @@ -245,77 +245,90 @@ "description": "When enabled, syncs rotate through all buckets, processing one bucket per sync. When disabled, only the bucket specified by 'bucket_id' is synced.", "default": false }, - "api_url": { + "bucket_ranges": { "order": 20, + "type": "array", + "items": { + "type": "string" + }, + "title": "Bucket Ranges", + "description": "List of bucket ranges to process when round robin bucket execution is enabled. Each element can be either a single bucket number (e.g., '7') or a range of buckets (e.g., '3-5'). All bucket numbers must be between 1 and 'bucket_total'.", + "examples": [ + "3-5", + "7" + ] + }, + "api_url": { + "order": 21, "type": "string", "title": "Faros API URL", "description": "The Faros API URL.", "default": "https://prod.api.faros.ai" }, "api_key": { - "order": 21, + "order": 22, "title": "Faros API Key", "type": "string", "description": "The Faros API key to use to access the API.", "airbyte_secret": true }, "graph": { - "order": 22, + "order": 23, "type": "string", "title": "Graph name", "description": "The Faros graph name.", "default": "default" }, "page_size": { - "order": 23, + "order": 24, "type": "integer", "title": "Page Size", "description": "Maximum number of items in a paginated response", "default": 100 }, "commits_page_size": { - "order": 24, + "order": 25, "type": "integer", "title": "Commits Page Size", "description": "Maximum number of items in a paginated response for commits", "default": 100 }, "pull_requests_page_size": { - "order": 25, + "order": 26, "type": "integer", "title": "Pull Requests Page Size", "description": "Maximum number of items in a paginated response for pull requests", "default": 25 }, "timeout": { - "order": 26, + "order": 27, "type": "integer", "title": "Request Timeout", "description": "Timeout in milliseconds for each request to the GitHub API", "default": 120000 }, "concurrency_limit": { - "order": 27, + "order": 28, "type": "integer", "title": "Concurrency limit", "description": "Maximum concurrency to run with", "default": 4 }, "backfill": { - "order": 28, + "order": 29, "type": "boolean", "title": "Backfill", "description": "Backfill data from the start date to the end date.", "default": false }, "start_date": { - "order": 29, + "order": 30, "type": "string", "title": "Start Date", "description": "The date from which to start syncing data." }, "end_date": { - "order": 30, + "order": 31, "type": "string", "title": "End Date", "description": "The date at which to stop syncing data." @@ -324,14 +337,14 @@ "type": "boolean", "title": "Fetch Pull Request diff coverage", "default": false, - "order": 31 + "order": 32 }, "pull_request_cutoff_lag_seconds": { "title": "Pull Request Cutoff Lag (seconds)", "description": "Apply lag to the end cutoff saved in the state. PRs updated after this will be rewritten during the next sync.", "type": "integer", "default": 0, - "order": 32 + "order": 33 } } } diff --git a/sources/github-source/src/github.ts b/sources/github-source/src/github.ts index 92311f803..416f7a9cc 100644 --- a/sources/github-source/src/github.ts +++ b/sources/github-source/src/github.ts @@ -180,7 +180,7 @@ export abstract class GitHub { if (GitHub.github) { return GitHub.github; } - validateBucketingConfig(cfg.bucket_id, cfg.bucket_total); + validateBucketingConfig(cfg, logger.info.bind(logger)); const github = cfg.authentication.type === 'token' diff --git a/sources/github-source/src/types.ts b/sources/github-source/src/types.ts index af9d01146..e994125e3 100644 --- a/sources/github-source/src/types.ts +++ b/sources/github-source/src/types.ts @@ -1,10 +1,11 @@ import {GraphqlResponseError} from '@octokit/graphql'; import {AirbyteConfig} from 'faros-airbyte-cdk'; +import {RoundRobinConfig} from 'faros-airbyte-common/common'; import {ExtendedOctokit} from './octokit'; import {RunMode} from './streams/common'; -export interface GitHubConfig extends AirbyteConfig { +export interface GitHubConfig extends AirbyteConfig, RoundRobinConfig { readonly authentication: GitHubAuth; readonly reject_unauthorized?: boolean; readonly url?: string; @@ -22,9 +23,6 @@ export interface GitHubConfig extends AirbyteConfig { readonly copilot_licenses_dates_fix?: boolean; readonly copilot_metrics_ga?: boolean; readonly cutoff_days?: number; - readonly bucket_id?: number; - readonly bucket_total?: number; - readonly round_robin_bucket_execution?: boolean; readonly api_url?: string; readonly api_key?: string; readonly graph?: string; diff --git a/sources/jira-source/resources/spec.json b/sources/jira-source/resources/spec.json index 9342efb72..a1f1e12a8 100644 --- a/sources/jira-source/resources/spec.json +++ b/sources/jira-source/resources/spec.json @@ -221,75 +221,88 @@ "description": "When enabled, syncs rotate through all buckets, processing one bucket per sync. When disabled, only the bucket specified by 'bucket_id' is synced.", "default": false }, - "api_url": { + "bucket_ranges": { "order": 27, + "type": "array", + "items": { + "type": "string" + }, + "title": "Bucket Ranges", + "description": "List of bucket ranges to process when round robin bucket execution is enabled. Each element can be either a single bucket number (e.g., '7') or a range of buckets (e.g., '3-5'). All bucket numbers must be between 1 and 'bucket_total'.", + "examples": [ + "3-5", + "7" + ] + }, + "api_url": { + "order": 28, "type": "string", "title": "Faros API URL", "description": "The Faros API URL.", "default": "https://prod.api.faros.ai" }, "api_key": { - "order": 28, + "order": 29, "title": "Faros API Key", "type": "string", "description": "The Faros API key to use to access the API.", "airbyte_secret": true }, "graph": { - "order": 29, + "order": 30, "type": "string", "title": "Graph name", "description": "The Faros graph name.", "default": "default" }, "use_sprints_reverse_search": { - "order": 30, + "order": 31, "type": "boolean", "title": "Use Sprints Reverse Search", "description": "Fetch closed sprints starting with most recent in backlog. Use this for Jira instances with a lots closed sprints and syncing sprints is slow.", "default": false }, "backfill": { - "order": 31, + "order": 32, "type": "boolean", "title": "Backfill", "description": "Backfill data from the start date to the end date.", "default": false }, "start_date": { - "order": 32, + "order": 33, "type": "string", "title": "Start Date", "description": "The date from which to start syncing data." }, "end_date": { - "order": 33, + "order": 34, "type": "string", "title": "End Date", "description": "The date at which to stop syncing data." }, "fetch_teams": { - "order": 34, + "order": 35, "type": "boolean", "title": "Fetch Teams", "description": "Fetch teams from organization to populate teams and team memberships.", "default": false }, "organization_id": { - "order": 35, + "order": 36, "type": "string", "title": "Organization ID", "description": "Atlassian organization ID for fetching teams. Required only when using Fetch Teams with Jira Cloud. See https://confluence.atlassian.com/jirakb/what-it-is-the-organization-id-and-where-to-find-it-1207189876.html for information on how to find it." }, "use_faros_board_issue_tracker": { - "order": 36, + "order": 37, "type": "boolean", "title": "Use Board Issue Tracker", "description": "Use stateful board issue tracker to track issues on boards. This uses the Faros API to persist the state of boards between runs. Requires faros_source_id to be configured.", "default": false }, "source_qualifier": { - "order": 37, + "order": 38, "type": "string", "title": "Source Qualifier", "description": "The qualifier to append as a suffix to the Jira source name to ensure uniqueness among entities with similar IDs when syncing multiple Jira instances, while preserving their original IDs, e.g. for Boards and Sprints.", diff --git a/sources/jira-source/src/jira.ts b/sources/jira-source/src/jira.ts index 01fe44a4d..e24729370 100644 --- a/sources/jira-source/src/jira.ts +++ b/sources/jira-source/src/jira.ts @@ -4,6 +4,7 @@ import {AirbyteConfig, AirbyteLogger} from 'faros-airbyte-cdk'; import { bucket, normalizeString, + RoundRobinConfig, validateBucketingConfig, } from 'faros-airbyte-common/common'; import { @@ -45,7 +46,7 @@ import {JiraClient} from './client'; import {IssueTransformer} from './issue_transformer'; import {RunMode} from './streams/common'; -export interface JiraConfig extends AirbyteConfig { +export interface JiraConfig extends AirbyteConfig, RoundRobinConfig { readonly url: string; readonly username?: string; readonly password?: string; @@ -68,9 +69,6 @@ export interface JiraConfig extends AirbyteConfig { readonly cutoff_lag_days?: number; readonly run_mode?: RunMode; readonly custom_streams?: ReadonlyArray; - readonly bucket_id?: number; - readonly bucket_total?: number; - readonly round_robin_bucket_execution?: boolean; readonly api_url?: string; readonly api_key?: string; readonly graph?: string; @@ -216,7 +214,7 @@ export class Jira { const authentication = Jira.auth(cfg); - validateBucketingConfig(cfg.bucket_id, cfg.bucket_total); + validateBucketingConfig(cfg, logger.info.bind(logger)); const httpsAgent = new https.Agent({ rejectUnauthorized: