From 0ae9b2a877e2aa5361c507be68945f5a810ad001 Mon Sep 17 00:00:00 2001 From: Pedro Date: Mon, 4 Mar 2024 15:55:25 +0100 Subject: [PATCH] Implement the "onStepStarted" hook --- .eslintrc | 2 +- README.md | 46 +++++++++++------ benchmark/parallel.benchmark.ts | 5 +- benchmark/subFlow.benchmark.ts | 14 +++-- package-lock.json | 4 +- package.json | 2 +- src/Caminho.ts | 24 +++++---- src/index.ts | 9 ++-- src/operators/batch.ts | 8 +-- src/operators/generator.ts | 30 +++++++---- src/operators/pipe.ts | 8 +-- src/types.ts | 22 ++++++-- src/utils/onStepFinished.ts | 16 ++++++ src/utils/onStepStarted.ts | 11 ++++ src/utils/stepLogger.ts | 14 ----- test/integration/from.test.ts | 68 ++++++++++++------------- test/integration/onStepFinished.test.ts | 60 ++++++++++++++++++++++ test/integration/onStepStarted.test.ts | 67 ++++++++++++++++++++++++ test/integration/parallel.test.ts | 44 ++++++++-------- test/integration/stepLogger.test.ts | 56 -------------------- test/integration/subCaminho.test.ts | 8 +-- test/mocks/stepResult.mock.ts | 10 +++- 22 files changed, 338 insertions(+), 190 deletions(-) create mode 100644 src/utils/onStepFinished.ts create mode 100644 src/utils/onStepStarted.ts delete mode 100644 src/utils/stepLogger.ts create mode 100644 test/integration/onStepFinished.test.ts create mode 100644 test/integration/onStepStarted.test.ts delete mode 100644 test/integration/stepLogger.test.ts diff --git a/.eslintrc b/.eslintrc index 723556b..f687e1e 100644 --- a/.eslintrc +++ b/.eslintrc @@ -34,7 +34,7 @@ "no-plusplus": "off", "no-underscore-dangle": ["error"], "no-await-in-loop": "off", - "complexity": ["error", { "max": 3 }], + "complexity": ["error", { "max": 4 }], "class-methods-use-this": "off" }, "env": { diff --git a/README.md b/README.md index 45c2c05..51b5c2f 100644 --- a/README.md +++ b/README.md @@ -172,26 +172,44 @@ await from({ fn: generateCars, provides: 'carId' }) ``` #### Logging -Caminho features a simple log mechanism which executes a syncronous callback function on every step executed. -The function needs to be defined via the `onEachStep` parameter on the `from`. +Caminho features a simple log mechanism which executes a syncronous callback function on every step start and finish. +The functions can be defined with the `onStepStart` and `onStepFinished` parameter on one of the `from` flow initializers. -Every step execution, calls the `onEachStep` step, it provides the callback with the following information: +The **onStepStart** provides the callback with the following information: -- `name: string` - The `name` parameter on the step definition, defaults to the step function name - `step.fn.name`. -- `tookMs: number` - Time for the step to execute. -- `emitted: number` - Number of items processed, useful for batch operations +- *name: string* - The name provided on the step definition, fallback to the name of the step function. +- *valueBags: ValueBag[]* - Array of value bags at the moment this was executed. +- *received: number* - Time of items received (this will only be greater than 1 in case it's a batch). -Example of how the calls to `onEachStep` looks like: +The **onStepFinished** provides the callback with the following information: + +- *name: string* - The name provided on the step definition, fallback to the name of the step function. +- *valueBags: ValueBag[]* - Array of value bags at the moment this was executed. +- *emitted: number* - Number of items processed (this will only be greater than 1 in case it's a batch). +- *tookMs: number* - Time for the step to execute. + +Example: ```typescript -await from({ fn: generateCars, provides: 'carId' }, { onEachStep: console.log }) - // { name: 'generateCars', tookMs: number, emitted: 1 } - // { name: 'generateCars', tookMs: number, emitted: 1 } +await from( + { fn: generateCars, provides: 'carId' }, + { + onStepStarted: (log) => console.log('stepStarted', log), + onStepFinished: (log) => console.log('stepFinished', log), + } + ) + // stepStarted { name: 'generateCars', received: 1, valueBags: [{}}] } + // stepFinished { name: 'generateCars', tookMs: number, emitted: 1, valueBags: [{ carId: "1" }] } + // stepStarted { name: 'generateCars', received: 1, valueBags: [{}] } + // stepFinished { name: 'generateCars', tookMs: number, emitted: 1, valueBags: [{ carId: "2" }] } .pipe({ fn: fetchPrice, provides: 'price', name: 'customName' }) - // { name: 'customName', tookMs: number, emitted: 1 } - // { name: 'customName', tookMs: number, emitted: 1 } + // stepStarted { name: 'customName', received: 1, valueBags: [{ carId: "1" }] } + // stepFinished { name: 'customName', tookMs: number, emitted: 1, valueBags: [{ carId: "1", customName: "car-1" }] } + // stepStarted { name: 'customName', received: 1, valueBags: [{ carId: "2" }] } + // stepFinished { name: 'customName', tookMs: number, emitted: 1, valueBags: [{ carId: "2", customName: "car-2" }] } .pipe({ fn: fetchSpecs, provides: 'specs', batch: { maxSize: 50, timeoutMs: 500 } }) - // { name: 'fetchSpecs', tookMs: number, emitted: 2 } + // stepStarted { name: 'fetchSpecs', received: 2, valueBags: [{ carId: "1", customName: "car-1" }, { carId: "2", customName: "car-2" } }] } + // stepFinished { name: 'fetchSpecs', tookMs: number, emitted: 2, valueBags: [{ carId: "1", customName: "car-1", specs: { engineSize: 1600 } }, { carId: "2", customName: "car-2", specs: { engineSize: 2000 } }] } .run() ``` @@ -213,8 +231,6 @@ npm run test:watch ## Roadmap -- Add "onStartStep" callback for logging - Wrap steps in a try catch so we can call logger with the step error. - Proper typing on ValueBag and how it's handled in child steps -- Built-in Retry system - Should step errors continue the flow? diff --git a/benchmark/parallel.benchmark.ts b/benchmark/parallel.benchmark.ts index 6473a13..17b80cd 100644 --- a/benchmark/parallel.benchmark.ts +++ b/benchmark/parallel.benchmark.ts @@ -21,7 +21,10 @@ async function runParallelBenchmark(parentItems: number, childItemsPerParent: nu const childCaminho = from(steps.childGenerator, { maxItemsFlowing: 1_000 }) .parallel([steps.pipe3, steps.pipe4]) - const childStep = { fn: (valueBag) => childCaminho.run(valueBag, steps.accumulator), provides: 'accumulator1' } + const childStep = { + fn: (valueBag: ValueBag) => childCaminho.run(valueBag, steps.accumulator), + provides: 'accumulator1', + } console.time('initialize caminho') const benchmarkCaminho = from(steps.parentGenerator, { maxItemsFlowing: 1_000 }) diff --git a/benchmark/subFlow.benchmark.ts b/benchmark/subFlow.benchmark.ts index 4f3d701..ca0f118 100644 --- a/benchmark/subFlow.benchmark.ts +++ b/benchmark/subFlow.benchmark.ts @@ -16,7 +16,10 @@ async function runSubflowBenchmark(parentItems: number, childItemsPerParent: num childProcessed += valueBag.accumulator1 }, } - const childStep = { fn: (valueBag) => childCaminho.run(valueBag, steps.accumulator), provides: 'accumulator1' } + const childStep = { + fn: (valueBag: ValueBag) => childCaminho.run(valueBag, steps.accumulator), + provides: 'accumulator1', + } console.timeEnd('initialize steps') console.time('initialize caminho') @@ -42,9 +45,8 @@ function initializeSteps(parentItems: number, childItemsPerParent: number) { const accumulatorFn = (acc: number) => acc + 1 const parentGeneratorFn = getMockedGenerator(getNumberedArray(parentItems)) const childGeneratorFn = getMockedGenerator(getNumberedArray(childItemsPerParent)) - const pipeFn = (valueBag) => ({ id: valueBag.source1, name: 'pipe1' }) - const mapBagForBatch = () => (valueBag) => ({ parentId: valueBag.source1, id: valueBag.subSource1, name: 'batch1' }) - const batchFn = (valueBags) => valueBags.map(mapBagForBatch) + const pipeFn = (valueBag: ValueBag) => ({ id: valueBag.source1, name: 'pipe1' }) + const batchFn = (valueBags: ValueBag[]) => valueBags.map(mapBagForBatch) return { parentGenerator: { fn: parentGeneratorFn, provides: 'source1' }, @@ -55,4 +57,8 @@ function initializeSteps(parentItems: number, childItemsPerParent: number) { } } +function mapBagForBatch() { + return (valueBag: ValueBag) => ({ parentId: valueBag.source1, id: valueBag.subSource1, name: 'batch1' }) +} + runSubflowBenchmark(50_000, 1_000) diff --git a/package-lock.json b/package-lock.json index b2bc005..88e6225 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "caminho", - "version": "1.1.2", + "version": "1.2.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "caminho", - "version": "1.1.2", + "version": "1.2.0", "license": "ISC", "dependencies": { "rxjs": "^7.5.7" diff --git a/package.json b/package.json index 7520291..78822d0 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "caminho", - "version": "1.1.2", + "version": "1.2.0", "description": "Caminho means path, a new path to your data processing.", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/src/Caminho.ts b/src/Caminho.ts index 23e8cc4..a7f8280 100644 --- a/src/Caminho.ts +++ b/src/Caminho.ts @@ -1,6 +1,6 @@ import { from, lastValueFrom, reduce, tap } from 'rxjs' -import type { ValueBag, PipeGenericParams, CaminhoOptions, Accumulator } from './types' +import type { ValueBag, PipeGenericParams, CaminhoOptions, Accumulator, Loggers } from './types' import { GeneratorParams, wrapGenerator, wrapGeneratorWithBackPressure } from './operators/generator' import { pipe } from './operators/pipe' @@ -9,9 +9,11 @@ import { parallel } from './operators/parallel' import { filter, FilterPredicate } from './operators/filter' import { applyOperator, isBatch, OperatorApplier } from './operators/helpers/operatorHelpers' -import { getLogger } from './utils/stepLogger' import { PendingDataControl, PendingDataControlInMemory } from './utils/PendingDataControl' +import { getOnStepFinished } from './utils/onStepFinished' +import { getOnStepStarted } from './utils/onStepStarted' + export class Caminho { private generator: (initialBag: ValueBag) => AsyncGenerator private operators: OperatorApplier[] = [] @@ -63,14 +65,14 @@ export class Caminho { } private getGenerator(generatorParams: GeneratorParams): (initialBag: ValueBag) => AsyncGenerator { - const logger = this.getLogger(generatorParams) + const loggers = this.getLoggers(generatorParams) if (this.options?.maxItemsFlowing) { const pendingDataControl = this.pendingDataControl as PendingDataControl this.finalStep = tap(() => pendingDataControl.decrement()) - return wrapGeneratorWithBackPressure(generatorParams, this.options.maxItemsFlowing, pendingDataControl, logger) + return wrapGeneratorWithBackPressure(generatorParams, this.options.maxItemsFlowing, pendingDataControl, loggers) } - return wrapGenerator(generatorParams, logger) + return wrapGenerator(generatorParams, loggers) } private buildObservable(initialBag: ValueBag = {}) { @@ -91,12 +93,14 @@ export class Caminho { private getApplierForPipeOrBatch(params: PipeGenericParams): OperatorApplier { return isBatch(params) - ? batch(params, this.getLogger(params)) - : pipe(params, this.getLogger(params)) + ? batch(params, this.getLoggers(params)) + : pipe(params, this.getLoggers(params)) } - private getLogger(params: GeneratorParams | PipeGenericParams) { - const name = params.name ?? params.fn.name - return getLogger(name, this.options?.onEachStep) + private getLoggers(params: GeneratorParams | PipeGenericParams): Loggers { + const stepName = params.name ?? params.fn.name + const onStepStarted = getOnStepStarted(stepName, this.options?.onStepStarted) + const onStepFinished = getOnStepFinished(stepName, this.options?.onStepFinished) + return { onStepFinished, onStepStarted } } } diff --git a/src/index.ts b/src/index.ts index 28e1f3e..664c0ed 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,10 +1,13 @@ export type { ValueBag, PipeGenericParams, - OnEachStep, CaminhoOptions, - Accumulator as ReduceParams, - OnEachStepParams, + Accumulator, + + onStepStartedParams, + onStepFinishedParams, + onStepStarted, + onStepFinished, } from './types' export type { Caminho } from './Caminho' diff --git a/src/operators/batch.ts b/src/operators/batch.ts index ff0c801..1a51c29 100644 --- a/src/operators/batch.ts +++ b/src/operators/batch.ts @@ -1,7 +1,6 @@ import { bufferTime, filter, mergeAll, mergeMap } from 'rxjs' -import type { Operator, ValueBag } from '../types' -import type { Logger } from '../utils/stepLogger' +import type { Loggers, Operator, ValueBag } from '../types' import { batchHasProvides, applyOperatorsToObservable, OperatorApplier } from './helpers/operatorHelpers' import { getNewValueBag } from '../utils/valueBag' @@ -24,16 +23,17 @@ export interface BatchParamsNoProvides extends BaseBatchParams { fn: (valueBag: ValueBag[]) => void | Promise } -export function batch(params: BatchParams, logger: Logger): OperatorApplier { +export function batch(params: BatchParams, loggers: Loggers): OperatorApplier { const getBag = batchHasProvides(params) ? valueBagGetterBatchProvides(params.provides) : valueBagGetterBatchNoProvides() async function wrappedStep(valueBag: ValueBag[]): Promise { + loggers.onStepStarted(valueBag) const startTime = new Date() const values = await params.fn([...valueBag]) const newValueBags = getBag(valueBag, values as unknown[]) - logger(newValueBags, startTime) + loggers.onStepFinished(newValueBags, startTime) return newValueBags } diff --git a/src/operators/generator.ts b/src/operators/generator.ts index 74cd57d..e64c3ab 100644 --- a/src/operators/generator.ts +++ b/src/operators/generator.ts @@ -1,6 +1,5 @@ -import { ValueBag } from '../types' +import type { Loggers, ValueBag } from '../types' import { sleep } from '../utils/sleep' -import { Logger } from '../utils/stepLogger' import { PendingDataControl } from '../utils/PendingDataControl' import { getNewValueBag } from '../utils/valueBag' @@ -12,12 +11,19 @@ export interface GeneratorParams { name?: string } -export function wrapGenerator(generatorParams: GeneratorParams, logger: Logger) { +export function wrapGenerator(generatorParams: GeneratorParams, loggers: Loggers) { return async function* wrappedGenerator(initialBag: ValueBag) { + const bagArrayForLogger = [initialBag] + loggers.onStepStarted(bagArrayForLogger) + let isStart = true let startTime = new Date() for await (const value of generatorParams.fn(initialBag)) { + if (!isStart) { + loggers.onStepStarted(bagArrayForLogger) + } + isStart = false const newValueBag = getNewValueBag(initialBag, generatorParams.provides, value) - logger([newValueBag], startTime) + loggers.onStepFinished([newValueBag], startTime) yield newValueBag startTime = new Date() } @@ -28,18 +34,24 @@ export function wrapGeneratorWithBackPressure( generatorParams: GeneratorParams, maxItemsFlowing: number, pendingDataControl: PendingDataControl, - logger: Logger, + loggers: Loggers, ) { - return async function* wrappedGenerator(initialBag: ValueBag) { + return async function* wrappedGeneratorWithBackPressure(initialBag: ValueBag) { + const bagArrayForLogger = [initialBag] + loggers.onStepStarted(bagArrayForLogger) + let isStart = true let startTime = new Date() for await (const value of generatorParams.fn(initialBag)) { + if (!isStart) { + loggers.onStepStarted(bagArrayForLogger) + } + isStart = false + const newValueBag = getNewValueBag(initialBag, generatorParams.provides, value) if (needsToWaitForBackpressure(pendingDataControl, maxItemsFlowing)) { await waitOnBackpressure(maxItemsFlowing, pendingDataControl) } - pendingDataControl.increment() - const newValueBag = getNewValueBag(initialBag, generatorParams.provides, value) - logger([newValueBag], startTime) + loggers.onStepFinished([newValueBag], startTime) yield newValueBag startTime = new Date() } diff --git a/src/operators/pipe.ts b/src/operators/pipe.ts index 84a0018..6bf3146 100644 --- a/src/operators/pipe.ts +++ b/src/operators/pipe.ts @@ -1,7 +1,6 @@ import { mergeMap } from 'rxjs' -import { ValueBag } from '../types' +import type { ValueBag, Loggers } from '../types' import { OperatorApplier, pipeHasProvides } from './helpers/operatorHelpers' -import { Logger } from '../utils/stepLogger' import { getNewValueBag } from '../utils/valueBag' export type PipeParams = PipeParamsProvides | PipeParamsNoProvides @@ -20,14 +19,15 @@ export interface PipeParamsNoProvides extends BasePipeParams { fn: (valueBag: ValueBag) => void | Promise } -export function pipe(params: PipeParams, logger: Logger): OperatorApplier { +export function pipe(params: PipeParams, loggers: Loggers): OperatorApplier { const getBag = pipeHasProvides(params) ? valueBagGetterProvides(params.provides) : valueBagGetterNoProvides() async function wrappedStep(valueBag: ValueBag): Promise { + loggers.onStepStarted([valueBag]) const startTime = new Date() const value = await params.fn({ ...valueBag }) const newBag = getBag(valueBag, value) - logger([newBag], startTime) + loggers.onStepFinished([newBag], startTime) return newBag } return mergeMap(wrappedStep, params?.maxConcurrency) diff --git a/src/types.ts b/src/types.ts index acfb464..6974372 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,6 +1,8 @@ import type { OperatorFunction } from 'rxjs' import type { BatchParams } from './operators/batch' import type { PipeParams } from './operators/pipe' +import { InternalOnStepFinished } from './utils/onStepFinished' +import { InternalOnStepStarted } from './utils/onStepStarted' // TODO: Proper typing for ValueBag! // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -12,21 +14,31 @@ export type Operator = OperatorFunction export type PipeGenericParams = PipeParams | BatchParams -export type OnEachStep = (params: OnEachStepParams) => void +export type onStepStarted = (params: onStepStartedParams) => void +export type onStepFinished = (params: onStepFinishedParams) => void -export interface OnEachStepParams { +export interface onStepStartedParams { + name: string + received: number + valueBags: ValueBag[] +} + +export interface onStepFinishedParams { name: string tookMs: number emitted: number valueBags: ValueBag[] } +export type Loggers = { onStepFinished: InternalOnStepFinished; onStepStarted: InternalOnStepStarted } + export interface CaminhoOptions { - onEachStep?: OnEachStep + onStepFinished?: onStepFinished + onStepStarted?: onStepStarted maxItemsFlowing?: number } export interface Accumulator { - fn: (acc: A, value: ValueBag, index: number) => A, - seed: A, + fn: (acc: A, value: ValueBag, index: number) => A + seed: A } diff --git a/src/utils/onStepFinished.ts b/src/utils/onStepFinished.ts new file mode 100644 index 0000000..acf3abc --- /dev/null +++ b/src/utils/onStepFinished.ts @@ -0,0 +1,16 @@ +import type { onStepFinished, ValueBag } from '../types' + +export type InternalOnStepFinished = (valueBags: ValueBag[], stepStartedAt: Date) => void + +const stub = () => {} + +export function getOnStepFinished(name: string, onStepFinished?: onStepFinished): InternalOnStepFinished { + if (onStepFinished) { + return function internalOnStepFinished(valueBags: ValueBag[], stepStartedAt: Date) { + const now = Date.now() + const tookMs = now - stepStartedAt.getTime() + onStepFinished({ name, tookMs, valueBags, emitted: valueBags.length }) + } + } + return stub +} diff --git a/src/utils/onStepStarted.ts b/src/utils/onStepStarted.ts new file mode 100644 index 0000000..7a2d8fd --- /dev/null +++ b/src/utils/onStepStarted.ts @@ -0,0 +1,11 @@ +import type { onStepStarted, ValueBag } from '../types' + +export type InternalOnStepStarted = (valueBags: ValueBag[]) => void + +const stub = () => {} + +export function getOnStepStarted(name: string, onStepStarted: onStepStarted = stub): InternalOnStepStarted { + return function internalOnStepStarted(valueBags: ValueBag[]) { + onStepStarted({ name, valueBags, received: valueBags.length }) + } +} diff --git a/src/utils/stepLogger.ts b/src/utils/stepLogger.ts deleted file mode 100644 index 834833c..0000000 --- a/src/utils/stepLogger.ts +++ /dev/null @@ -1,14 +0,0 @@ -import type { OnEachStep, ValueBag } from '../types' - -export type Logger = (valueBags: ValueBag[], stepStartedAt: Date) => void - -export function getLogger(name: string, onEachStep?: OnEachStep): Logger { - if (onEachStep) { - return function logStep(valueBags: ValueBag[], stepStartedAt: Date) { - const now = Date.now() - const tookMs = now - stepStartedAt.getTime() - onEachStep({ name, tookMs, valueBags, emitted: valueBags.length }) - } - } - return function stubLogger() {} -} diff --git a/test/integration/from.test.ts b/test/integration/from.test.ts index 1f1d4b0..28c3760 100644 --- a/test/integration/from.test.ts +++ b/test/integration/from.test.ts @@ -1,8 +1,8 @@ -import { from } from '../../src' +import { CaminhoOptions, from } from '../../src' import { sleep } from '../../src/utils/sleep' import { getMockedJobGenerator } from '../mocks/generator.mock' -import { mockStepResult } from '../mocks/stepResult.mock' +import { getOnStepFinishedParamsFixture } from '../mocks/stepResult.mock' describe('from', () => { test('Should call generator and run all function provided to the flow', async () => { @@ -22,7 +22,7 @@ describe('from', () => { }) test('Should not emit more than maxItemsFlowing value concurrently', async () => { - const options = { onEachStep: jest.fn(), maxItemsFlowing: 3 } + const options: CaminhoOptions = { onStepFinished: jest.fn(), maxItemsFlowing: 3 } const NUMBER_OF_ITERATIONS = 7 const generatorStep = { fn: getMockedJobGenerator(NUMBER_OF_ITERATIONS), provides: 'job', name: 'generator' } @@ -31,47 +31,47 @@ describe('from', () => { .pipe({ fn: function fetchMock() { return sleep(10) }, provides: 'rawData', maxConcurrency: 1 }) .run() - expect(options.onEachStep.mock.calls).toEqual([ - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchMock' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchMock' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchMock' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchMock' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchMock' })], - [mockStepResult({ name: 'fetchMock' })], - [mockStepResult({ name: 'fetchMock' })], + expect((options.onStepFinished as jest.Mock).mock.calls).toEqual([ + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], ]) }) test('Should emit values from generator uncontrolably if maxItemsFlowing was not provided', async () => { - const options = { onEachStep: jest.fn() } + const options: CaminhoOptions = { onStepFinished: jest.fn() } const NUMBER_OF_ITERATIONS = 7 await from({ fn: getMockedJobGenerator(NUMBER_OF_ITERATIONS), provides: 'job', name: 'generator' }, options) .pipe({ fn: function fetchMock() { return sleep(10) }, provides: 'rawData' }) .run() - expect(options.onEachStep.mock.calls).toEqual([ - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchMock' })], - [mockStepResult({ name: 'fetchMock' })], - [mockStepResult({ name: 'fetchMock' })], - [mockStepResult({ name: 'fetchMock' })], - [mockStepResult({ name: 'fetchMock' })], - [mockStepResult({ name: 'fetchMock' })], - [mockStepResult({ name: 'fetchMock' })], + expect((options.onStepFinished as jest.Mock).mock.calls).toEqual([ + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], + [getOnStepFinishedParamsFixture({ name: 'fetchMock' })], ]) }) }) diff --git a/test/integration/onStepFinished.test.ts b/test/integration/onStepFinished.test.ts new file mode 100644 index 0000000..2a50b36 --- /dev/null +++ b/test/integration/onStepFinished.test.ts @@ -0,0 +1,60 @@ +import { from, fromFn } from '../../src' + +import { getMockedJobGenerator } from '../mocks/generator.mock' +import { getOnStepFinishedParamsFixture } from '../mocks/stepResult.mock' + +describe('onStepFinished', () => { + it('should call the onStepFinished callback with the proper values', async () => { + const NUMBER_OF_ITERATIONS = 5 + + const generatorMock = getMockedJobGenerator(NUMBER_OF_ITERATIONS) + const fetchMock = function fetchSomething() {} + const saveMock = function saveSomething() {} + const onStepFinishedMock = jest.fn().mockName('onStepFinishedLog') + + await from({ fn: generatorMock, provides: 'job' }, { onStepFinished: onStepFinishedMock }) + .pipe({ fn: fetchMock, provides: 'rawData', maxConcurrency: 5 }) + .pipe({ fn: saveMock, batch: { maxSize: 2, timeoutMs: 1 }, maxConcurrency: 5 }) + .run() + + expect(onStepFinishedMock.mock.calls).toEqual([ + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchSomething' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchSomething' })], + [getOnStepFinishedParamsFixture({ name: 'saveSomething' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchSomething' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchSomething' })], + [getOnStepFinishedParamsFixture({ name: 'saveSomething' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchSomething' })], + [getOnStepFinishedParamsFixture({ name: 'saveSomething' })], + ]) + }) + + it('should call the onStepFinished callback with the number of values emitted by batch operation', async () => { + const onStepFinishedMock = jest.fn() + const fetchSomething = jest.fn().mockName('fetchSomething') + + await from({ fn: getMockedJobGenerator(2), provides: 'job' }, { onStepFinished: onStepFinishedMock }) + .pipe({ fn: fetchSomething, batch: { maxSize: 2, timeoutMs: 100 }, maxConcurrency: 1 }) + .run() + + expect(onStepFinishedMock.mock.calls).toEqual([ + [getOnStepFinishedParamsFixture({ emitted: 1 })], + [getOnStepFinishedParamsFixture({ emitted: 1 })], + [getOnStepFinishedParamsFixture({ emitted: 2 })], + ]) + }) + + it('should properly handle step.name fallback on "fromFn" ', async () => { + const onStepFinishedMock = jest.fn() + async function getJob() { + return { id: '123' } + } + await fromFn({ fn: getJob, provides: 'job' }, { onStepFinished: onStepFinishedMock }).run() + expect(onStepFinishedMock.mock.calls).toEqual([[getOnStepFinishedParamsFixture({ name: 'getJob', emitted: 1 })]]) + }) +}) diff --git a/test/integration/onStepStarted.test.ts b/test/integration/onStepStarted.test.ts new file mode 100644 index 0000000..b7a930d --- /dev/null +++ b/test/integration/onStepStarted.test.ts @@ -0,0 +1,67 @@ +import { from, fromFn } from '../../src' + +import { getMockedJobGenerator } from '../mocks/generator.mock' +import { getOnStepStartedParamsFixture } from '../mocks/stepResult.mock' + +describe('onStepStarted', () => { + it('should call the callback with the proper values', async () => { + const NUMBER_OF_ITERATIONS = 5 + + const generatorMock = getMockedJobGenerator(NUMBER_OF_ITERATIONS) + const fetchMock = function fetchSomething() {} + const saveMock = function saveSomething() {} + const onStepStartedMock = jest.fn().mockName('onStepStartedLog') + + await from({ fn: generatorMock, provides: 'job' }, { onStepStarted: onStepStartedMock }) + .pipe({ fn: fetchMock, provides: 'rawData', maxConcurrency: 5 }) + .pipe({ fn: saveMock, batch: { maxSize: 2, timeoutMs: 1 }, maxConcurrency: 5 }) + .run() + + expect(onStepStartedMock.mock.calls).toEqual([ + [getOnStepStartedParamsFixture({ name: 'generator' })], + [getOnStepStartedParamsFixture({ name: 'fetchSomething' })], + [getOnStepStartedParamsFixture({ name: 'generator' })], + [getOnStepStartedParamsFixture({ name: 'fetchSomething' })], + [getOnStepStartedParamsFixture({ name: 'saveSomething' })], + [getOnStepStartedParamsFixture({ name: 'generator' })], + [getOnStepStartedParamsFixture({ name: 'fetchSomething' })], + [getOnStepStartedParamsFixture({ name: 'generator' })], + [getOnStepStartedParamsFixture({ name: 'fetchSomething' })], + [getOnStepStartedParamsFixture({ name: 'saveSomething' })], + [getOnStepStartedParamsFixture({ name: 'generator' })], + [getOnStepStartedParamsFixture({ name: 'fetchSomething' })], + [getOnStepStartedParamsFixture({ name: 'saveSomething' })], + ]) + }) + + it('should call the callback with the number of values received in batch operation', async () => { + const onStepStartedMock = jest.fn() + const fetchSomething = jest.fn().mockName('fetchSomething') + + await from( + { + name: 'generator', + fn: getMockedJobGenerator(2), + provides: 'job', + }, + { onStepStarted: onStepStartedMock }, + ) + .pipe({ name: 'batch', fn: fetchSomething, batch: { maxSize: 2, timeoutMs: 100 }, maxConcurrency: 1 }) + .run() + + expect(onStepStartedMock.mock.calls).toEqual([ + [getOnStepStartedParamsFixture({ name: 'generator', received: 1 })], + [getOnStepStartedParamsFixture({ name: 'generator', received: 1 })], + [getOnStepStartedParamsFixture({ name: 'batch', received: 2 })], + ]) + }) + + it('should properly handle step.name fallback on "fromFn" ', async () => { + const onStepStartedMock = jest.fn() + async function getJob() { + return { id: '123' } + } + await fromFn({ fn: getJob, provides: 'job' }, { onStepStarted: onStepStartedMock }).run() + expect(onStepStartedMock.mock.calls).toEqual([[getOnStepStartedParamsFixture({ name: 'getJob', received: 1 })]]) + }) +}) diff --git a/test/integration/parallel.test.ts b/test/integration/parallel.test.ts index 9a0e616..a559f17 100644 --- a/test/integration/parallel.test.ts +++ b/test/integration/parallel.test.ts @@ -2,7 +2,7 @@ import { from, ValueBag } from '../../src' import { sleep } from '../../src/utils/sleep' import { getMockedJobGenerator } from '../mocks/generator.mock' -import { mockStepResult } from '../mocks/stepResult.mock' +import { getOnStepFinishedParamsFixture } from '../mocks/stepResult.mock' test('Parallel steps should provide valueBag properly to the following steps', async () => { async function fetchStatusFn(valueBags: ValueBag[]) { @@ -49,7 +49,7 @@ test('Parallel steps should use the most efficient path for emiting values', asy const NUMBER_OF_ITERATIONS = 5 const generatorMock = getMockedJobGenerator(NUMBER_OF_ITERATIONS) - const onEachStepMock = jest.fn().mockName('onEachStepLog') + const onStepFinishedMock = jest.fn().mockName('onStepFinishedLog') const fetchStatus = { fn: async function fetchStatus(valueBag: ValueBag[]) { @@ -73,29 +73,29 @@ test('Parallel steps should use the most efficient path for emiting values', asy }, } - await from({ fn: generatorMock, provides: 'job' }, { onEachStep: onEachStepMock, maxItemsFlowing: 2 }) + await from({ fn: generatorMock, provides: 'job' }, { onStepFinished: onStepFinishedMock, maxItemsFlowing: 2 }) .parallel([fetchStatus, fetchPosition]) .pipe(saveAll) .run() - expect(onEachStepMock.mock.calls).toEqual([ - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchPosition' })], - [mockStepResult({ name: 'fetchPosition' })], - [mockStepResult({ name: 'fetchStatus' })], - [mockStepResult({ name: 'saveSomething' })], - [mockStepResult({ name: 'saveSomething' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchPosition' })], - [mockStepResult({ name: 'fetchPosition' })], - [mockStepResult({ name: 'fetchStatus' })], - [mockStepResult({ name: 'saveSomething' })], - [mockStepResult({ name: 'saveSomething' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchPosition' })], - [mockStepResult({ name: 'fetchStatus' })], - [mockStepResult({ name: 'saveSomething' })], + expect(onStepFinishedMock.mock.calls).toEqual([ + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchPosition' })], + [getOnStepFinishedParamsFixture({ name: 'fetchPosition' })], + [getOnStepFinishedParamsFixture({ name: 'fetchStatus' })], + [getOnStepFinishedParamsFixture({ name: 'saveSomething' })], + [getOnStepFinishedParamsFixture({ name: 'saveSomething' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchPosition' })], + [getOnStepFinishedParamsFixture({ name: 'fetchPosition' })], + [getOnStepFinishedParamsFixture({ name: 'fetchStatus' })], + [getOnStepFinishedParamsFixture({ name: 'saveSomething' })], + [getOnStepFinishedParamsFixture({ name: 'saveSomething' })], + [getOnStepFinishedParamsFixture({ name: 'generator' })], + [getOnStepFinishedParamsFixture({ name: 'fetchPosition' })], + [getOnStepFinishedParamsFixture({ name: 'fetchStatus' })], + [getOnStepFinishedParamsFixture({ name: 'saveSomething' })], ]) }) diff --git a/test/integration/stepLogger.test.ts b/test/integration/stepLogger.test.ts deleted file mode 100644 index 9207b5f..0000000 --- a/test/integration/stepLogger.test.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { from, fromFn } from '../../src' - -import { getMockedJobGenerator } from '../mocks/generator.mock' -import { mockStepResult } from '../mocks/stepResult.mock' - -test('Should call onEachStep provided callback with proper values', async () => { - const NUMBER_OF_ITERATIONS = 5 - - const generatorMock = getMockedJobGenerator(NUMBER_OF_ITERATIONS) - const fetchMock = function fetchSomething() {} - const saveMock = function saveSomething() {} - const onEachStepMock = jest.fn().mockName('onEachStepLog') - - await from({ fn: generatorMock, provides: 'job' }, { onEachStep: onEachStepMock }) - .pipe({ fn: fetchMock, provides: 'rawData', maxConcurrency: 5 }) - .pipe({ fn: saveMock, batch: { maxSize: 2, timeoutMs: 1 }, maxConcurrency: 5 }) - .run() - - expect(onEachStepMock.mock.calls).toEqual([ - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchSomething' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchSomething' })], - [mockStepResult({ name: 'saveSomething' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchSomething' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchSomething' })], - [mockStepResult({ name: 'saveSomething' })], - [mockStepResult({ name: 'generator' })], - [mockStepResult({ name: 'fetchSomething' })], - [mockStepResult({ name: 'saveSomething' })], - ]) -}) - -test('Should call onEachStep with the number of values emitted by batch operation', async () => { - const onEachStepMock = jest.fn() - const fetchSomething = jest.fn().mockName('fetchSomething') - - await from({ fn: getMockedJobGenerator(2), provides: 'job' }, { onEachStep: onEachStepMock }) - .pipe({ fn: fetchSomething, batch: { maxSize: 2, timeoutMs: 100 }, maxConcurrency: 1 }) - .run() - - expect(onEachStepMock.mock.calls).toEqual([ - [mockStepResult({ emitted: 1 })], - [mockStepResult({ emitted: 1 })], - [mockStepResult({ emitted: 2 })], - ]) -}) - -test('Should properly handle step.name fallback on "fromFn" ', async () => { - const onEachStepMock = jest.fn() - async function getJob() { return { id: '123' } } - await fromFn({ fn: getJob, provides: 'job' }, { onEachStep: onEachStepMock }).run() - expect(onEachStepMock.mock.calls).toEqual([[mockStepResult({ name: 'getJob', emitted: 1 })]]) -}) diff --git a/test/integration/subCaminho.test.ts b/test/integration/subCaminho.test.ts index 0616087..3e0c112 100644 --- a/test/integration/subCaminho.test.ts +++ b/test/integration/subCaminho.test.ts @@ -1,10 +1,10 @@ -import { Caminho, from, ReduceParams, ValueBag } from '../../src' +import { Caminho, from, Accumulator, ValueBag } from '../../src' import { getMockedGenerator } from '../mocks/generator.mock' import { getNumberedArray } from '../mocks/array.mock' describe('Sub-Caminho', () => { - function getStepForSubCaminho(subCaminho: Caminho, accumulator: ReduceParams, provides: string) { + function getStepForSubCaminho(subCaminho: Caminho, accumulator: Accumulator, provides: string) { return { fn: (valueBag: ValueBag) => subCaminho.run(valueBag, accumulator), provides, @@ -158,7 +158,7 @@ function getEmployeeSteps() { const generator = { fn: employeeGeneratorFn, provides: 'employeeName' } const mapper = { fn: mapEmployeeFn, provides: 'mappedEmployee' } const saver = { fn: saveEmployeeFn } - const accumulator: ReduceParams = { fn: (acc: number) => acc + 1, seed: 0 } + const accumulator: Accumulator = { fn: (acc: number) => acc + 1, seed: 0 } return { generator, @@ -174,7 +174,7 @@ function getDocumentSteps() { const generator = { fn: generatorFn, provides: 'documentId' } const saver = { fn: saverFn } - const accumulator: ReduceParams = { fn: (acc: number) => acc + 1, seed: 0 } + const accumulator: Accumulator = { fn: (acc: number) => acc + 1, seed: 0 } return { generator, diff --git a/test/mocks/stepResult.mock.ts b/test/mocks/stepResult.mock.ts index f89e67d..b7b1891 100644 --- a/test/mocks/stepResult.mock.ts +++ b/test/mocks/stepResult.mock.ts @@ -1,4 +1,4 @@ -export function mockStepResult(mock: { name?: string, emitted?: number }) { +export function getOnStepFinishedParamsFixture(mock: { name?: string, emitted?: number }) { return { name: mock.name ?? expect.any(String), tookMs: expect.any(Number), @@ -6,3 +6,11 @@ export function mockStepResult(mock: { name?: string, emitted?: number }) { valueBags: expect.any(Array), } } + +export function getOnStepStartedParamsFixture(mock: { name?: string, received?: number }) { + return { + name: mock.name ?? expect.any(String), + received: mock.received ?? expect.any(Number), + valueBags: expect.any(Array), + } +}