Skip to content

Commit

Permalink
Implement the "onStepStarted" hook
Browse files Browse the repository at this point in the history
  • Loading branch information
pedrokehl committed Mar 4, 2024
1 parent b5dd230 commit 0ae9b2a
Show file tree
Hide file tree
Showing 22 changed files with 338 additions and 190 deletions.
2 changes: 1 addition & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"no-plusplus": "off",
"no-underscore-dangle": ["error"],
"no-await-in-loop": "off",
"complexity": ["error", { "max": 3 }],
"complexity": ["error", { "max": 4 }],
"class-methods-use-this": "off"
},
"env": {
Expand Down
46 changes: 31 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,26 +172,44 @@ await from({ fn: generateCars, provides: 'carId' })
```

#### Logging
Caminho features a simple log mechanism which executes a syncronous callback function on every step executed.
The function needs to be defined via the `onEachStep` parameter on the `from`.
Caminho features a simple log mechanism which executes a syncronous callback function on every step start and finish.
The functions can be defined with the `onStepStart` and `onStepFinished` parameter on one of the `from` flow initializers.

Every step execution, calls the `onEachStep` step, it provides the callback with the following information:
The **onStepStart** provides the callback with the following information:

- `name: string` - The `name` parameter on the step definition, defaults to the step function name - `step.fn.name`.
- `tookMs: number` - Time for the step to execute.
- `emitted: number` - Number of items processed, useful for batch operations
- *name: string* - The name provided on the step definition, fallback to the name of the step function.
- *valueBags: ValueBag[]* - Array of value bags at the moment this was executed.
- *received: number* - Time of items received (this will only be greater than 1 in case it's a batch).

Example of how the calls to `onEachStep` looks like:
The **onStepFinished** provides the callback with the following information:

- *name: string* - The name provided on the step definition, fallback to the name of the step function.
- *valueBags: ValueBag[]* - Array of value bags at the moment this was executed.
- *emitted: number* - Number of items processed (this will only be greater than 1 in case it's a batch).
- *tookMs: number* - Time for the step to execute.

Example:

```typescript
await from({ fn: generateCars, provides: 'carId' }, { onEachStep: console.log })
// { name: 'generateCars', tookMs: number, emitted: 1 }
// { name: 'generateCars', tookMs: number, emitted: 1 }
await from(
{ fn: generateCars, provides: 'carId' },
{
onStepStarted: (log) => console.log('stepStarted', log),
onStepFinished: (log) => console.log('stepFinished', log),
}
)
// stepStarted { name: 'generateCars', received: 1, valueBags: [{}}] }
// stepFinished { name: 'generateCars', tookMs: number, emitted: 1, valueBags: [{ carId: "1" }] }
// stepStarted { name: 'generateCars', received: 1, valueBags: [{}] }
// stepFinished { name: 'generateCars', tookMs: number, emitted: 1, valueBags: [{ carId: "2" }] }
.pipe({ fn: fetchPrice, provides: 'price', name: 'customName' })
// { name: 'customName', tookMs: number, emitted: 1 }
// { name: 'customName', tookMs: number, emitted: 1 }
// stepStarted { name: 'customName', received: 1, valueBags: [{ carId: "1" }] }
// stepFinished { name: 'customName', tookMs: number, emitted: 1, valueBags: [{ carId: "1", customName: "car-1" }] }
// stepStarted { name: 'customName', received: 1, valueBags: [{ carId: "2" }] }
// stepFinished { name: 'customName', tookMs: number, emitted: 1, valueBags: [{ carId: "2", customName: "car-2" }] }
.pipe({ fn: fetchSpecs, provides: 'specs', batch: { maxSize: 50, timeoutMs: 500 } })
// { name: 'fetchSpecs', tookMs: number, emitted: 2 }
// stepStarted { name: 'fetchSpecs', received: 2, valueBags: [{ carId: "1", customName: "car-1" }, { carId: "2", customName: "car-2" } }] }
// stepFinished { name: 'fetchSpecs', tookMs: number, emitted: 2, valueBags: [{ carId: "1", customName: "car-1", specs: { engineSize: 1600 } }, { carId: "2", customName: "car-2", specs: { engineSize: 2000 } }] }
.run()
```

Expand All @@ -213,8 +231,6 @@ npm run test:watch

## Roadmap

- Add "onStartStep" callback for logging
- Wrap steps in a try catch so we can call logger with the step error.
- Proper typing on ValueBag and how it's handled in child steps
- Built-in Retry system
- Should step errors continue the flow?
5 changes: 4 additions & 1 deletion benchmark/parallel.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ async function runParallelBenchmark(parentItems: number, childItemsPerParent: nu
const childCaminho = from(steps.childGenerator, { maxItemsFlowing: 1_000 })
.parallel([steps.pipe3, steps.pipe4])

const childStep = { fn: (valueBag) => childCaminho.run(valueBag, steps.accumulator), provides: 'accumulator1' }
const childStep = {
fn: (valueBag: ValueBag) => childCaminho.run(valueBag, steps.accumulator),
provides: 'accumulator1',
}

console.time('initialize caminho')
const benchmarkCaminho = from(steps.parentGenerator, { maxItemsFlowing: 1_000 })
Expand Down
14 changes: 10 additions & 4 deletions benchmark/subFlow.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ async function runSubflowBenchmark(parentItems: number, childItemsPerParent: num
childProcessed += valueBag.accumulator1
},
}
const childStep = { fn: (valueBag) => childCaminho.run(valueBag, steps.accumulator), provides: 'accumulator1' }
const childStep = {
fn: (valueBag: ValueBag) => childCaminho.run(valueBag, steps.accumulator),
provides: 'accumulator1',
}
console.timeEnd('initialize steps')
console.time('initialize caminho')

Expand All @@ -42,9 +45,8 @@ function initializeSteps(parentItems: number, childItemsPerParent: number) {
const accumulatorFn = (acc: number) => acc + 1
const parentGeneratorFn = getMockedGenerator(getNumberedArray(parentItems))
const childGeneratorFn = getMockedGenerator(getNumberedArray(childItemsPerParent))
const pipeFn = (valueBag) => ({ id: valueBag.source1, name: 'pipe1' })
const mapBagForBatch = () => (valueBag) => ({ parentId: valueBag.source1, id: valueBag.subSource1, name: 'batch1' })
const batchFn = (valueBags) => valueBags.map(mapBagForBatch)
const pipeFn = (valueBag: ValueBag) => ({ id: valueBag.source1, name: 'pipe1' })
const batchFn = (valueBags: ValueBag[]) => valueBags.map(mapBagForBatch)

return {
parentGenerator: { fn: parentGeneratorFn, provides: 'source1' },
Expand All @@ -55,4 +57,8 @@ function initializeSteps(parentItems: number, childItemsPerParent: number) {
}
}

function mapBagForBatch() {
return (valueBag: ValueBag) => ({ parentId: valueBag.source1, id: valueBag.subSource1, name: 'batch1' })
}

runSubflowBenchmark(50_000, 1_000)
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "caminho",
"version": "1.1.2",
"version": "1.2.0",
"description": "Caminho means path, a new path to your data processing.",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
24 changes: 14 additions & 10 deletions src/Caminho.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { from, lastValueFrom, reduce, tap } from 'rxjs'

import type { ValueBag, PipeGenericParams, CaminhoOptions, Accumulator } from './types'
import type { ValueBag, PipeGenericParams, CaminhoOptions, Accumulator, Loggers } from './types'

import { GeneratorParams, wrapGenerator, wrapGeneratorWithBackPressure } from './operators/generator'
import { pipe } from './operators/pipe'
Expand All @@ -9,9 +9,11 @@ import { parallel } from './operators/parallel'
import { filter, FilterPredicate } from './operators/filter'

import { applyOperator, isBatch, OperatorApplier } from './operators/helpers/operatorHelpers'
import { getLogger } from './utils/stepLogger'
import { PendingDataControl, PendingDataControlInMemory } from './utils/PendingDataControl'

import { getOnStepFinished } from './utils/onStepFinished'
import { getOnStepStarted } from './utils/onStepStarted'

export class Caminho {
private generator: (initialBag: ValueBag) => AsyncGenerator<ValueBag>
private operators: OperatorApplier[] = []
Expand Down Expand Up @@ -63,14 +65,14 @@ export class Caminho {
}

private getGenerator(generatorParams: GeneratorParams): (initialBag: ValueBag) => AsyncGenerator<ValueBag> {
const logger = this.getLogger(generatorParams)
const loggers = this.getLoggers(generatorParams)
if (this.options?.maxItemsFlowing) {
const pendingDataControl = this.pendingDataControl as PendingDataControl
this.finalStep = tap(() => pendingDataControl.decrement())
return wrapGeneratorWithBackPressure(generatorParams, this.options.maxItemsFlowing, pendingDataControl, logger)
return wrapGeneratorWithBackPressure(generatorParams, this.options.maxItemsFlowing, pendingDataControl, loggers)
}

return wrapGenerator(generatorParams, logger)
return wrapGenerator(generatorParams, loggers)
}

private buildObservable(initialBag: ValueBag = {}) {
Expand All @@ -91,12 +93,14 @@ export class Caminho {

private getApplierForPipeOrBatch(params: PipeGenericParams): OperatorApplier {
return isBatch(params)
? batch(params, this.getLogger(params))
: pipe(params, this.getLogger(params))
? batch(params, this.getLoggers(params))
: pipe(params, this.getLoggers(params))
}

private getLogger(params: GeneratorParams | PipeGenericParams) {
const name = params.name ?? params.fn.name
return getLogger(name, this.options?.onEachStep)
private getLoggers(params: GeneratorParams | PipeGenericParams): Loggers {
const stepName = params.name ?? params.fn.name
const onStepStarted = getOnStepStarted(stepName, this.options?.onStepStarted)
const onStepFinished = getOnStepFinished(stepName, this.options?.onStepFinished)
return { onStepFinished, onStepStarted }
}
}
9 changes: 6 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
export type {
ValueBag,
PipeGenericParams,
OnEachStep,
CaminhoOptions,
Accumulator as ReduceParams,
OnEachStepParams,
Accumulator,

onStepStartedParams,
onStepFinishedParams,
onStepStarted,
onStepFinished,
} from './types'
export type { Caminho } from './Caminho'

Expand Down
8 changes: 4 additions & 4 deletions src/operators/batch.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { bufferTime, filter, mergeAll, mergeMap } from 'rxjs'

import type { Operator, ValueBag } from '../types'
import type { Logger } from '../utils/stepLogger'
import type { Loggers, Operator, ValueBag } from '../types'
import { batchHasProvides, applyOperatorsToObservable, OperatorApplier } from './helpers/operatorHelpers'
import { getNewValueBag } from '../utils/valueBag'

Expand All @@ -24,16 +23,17 @@ export interface BatchParamsNoProvides extends BaseBatchParams {
fn: (valueBag: ValueBag[]) => void | Promise<void>
}

export function batch(params: BatchParams, logger: Logger): OperatorApplier {
export function batch(params: BatchParams, loggers: Loggers): OperatorApplier {
const getBag = batchHasProvides(params)
? valueBagGetterBatchProvides(params.provides)
: valueBagGetterBatchNoProvides()

async function wrappedStep(valueBag: ValueBag[]): Promise<ValueBag[]> {
loggers.onStepStarted(valueBag)
const startTime = new Date()
const values = await params.fn([...valueBag])
const newValueBags = getBag(valueBag, values as unknown[])
logger(newValueBags, startTime)
loggers.onStepFinished(newValueBags, startTime)
return newValueBags
}

Expand Down
30 changes: 21 additions & 9 deletions src/operators/generator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { ValueBag } from '../types'
import type { Loggers, ValueBag } from '../types'
import { sleep } from '../utils/sleep'
import { Logger } from '../utils/stepLogger'
import { PendingDataControl } from '../utils/PendingDataControl'
import { getNewValueBag } from '../utils/valueBag'

Expand All @@ -12,12 +11,19 @@ export interface GeneratorParams {
name?: string
}

export function wrapGenerator(generatorParams: GeneratorParams, logger: Logger) {
export function wrapGenerator(generatorParams: GeneratorParams, loggers: Loggers) {
return async function* wrappedGenerator(initialBag: ValueBag) {
const bagArrayForLogger = [initialBag]
loggers.onStepStarted(bagArrayForLogger)
let isStart = true
let startTime = new Date()
for await (const value of generatorParams.fn(initialBag)) {
if (!isStart) {
loggers.onStepStarted(bagArrayForLogger)
}
isStart = false
const newValueBag = getNewValueBag(initialBag, generatorParams.provides, value)
logger([newValueBag], startTime)
loggers.onStepFinished([newValueBag], startTime)
yield newValueBag
startTime = new Date()
}
Expand All @@ -28,18 +34,24 @@ export function wrapGeneratorWithBackPressure(
generatorParams: GeneratorParams,
maxItemsFlowing: number,
pendingDataControl: PendingDataControl,
logger: Logger,
loggers: Loggers,
) {
return async function* wrappedGenerator(initialBag: ValueBag) {
return async function* wrappedGeneratorWithBackPressure(initialBag: ValueBag) {
const bagArrayForLogger = [initialBag]
loggers.onStepStarted(bagArrayForLogger)
let isStart = true
let startTime = new Date()
for await (const value of generatorParams.fn(initialBag)) {
if (!isStart) {
loggers.onStepStarted(bagArrayForLogger)
}
isStart = false
const newValueBag = getNewValueBag(initialBag, generatorParams.provides, value)
if (needsToWaitForBackpressure(pendingDataControl, maxItemsFlowing)) {
await waitOnBackpressure(maxItemsFlowing, pendingDataControl)
}

pendingDataControl.increment()
const newValueBag = getNewValueBag(initialBag, generatorParams.provides, value)
logger([newValueBag], startTime)
loggers.onStepFinished([newValueBag], startTime)
yield newValueBag
startTime = new Date()
}
Expand Down
8 changes: 4 additions & 4 deletions src/operators/pipe.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { mergeMap } from 'rxjs'
import { ValueBag } from '../types'
import type { ValueBag, Loggers } from '../types'
import { OperatorApplier, pipeHasProvides } from './helpers/operatorHelpers'
import { Logger } from '../utils/stepLogger'
import { getNewValueBag } from '../utils/valueBag'

export type PipeParams = PipeParamsProvides | PipeParamsNoProvides
Expand All @@ -20,14 +19,15 @@ export interface PipeParamsNoProvides extends BasePipeParams {
fn: (valueBag: ValueBag) => void | Promise<void>
}

export function pipe(params: PipeParams, logger: Logger): OperatorApplier {
export function pipe(params: PipeParams, loggers: Loggers): OperatorApplier {
const getBag = pipeHasProvides(params) ? valueBagGetterProvides(params.provides) : valueBagGetterNoProvides()

async function wrappedStep(valueBag: ValueBag): Promise<ValueBag> {
loggers.onStepStarted([valueBag])
const startTime = new Date()
const value = await params.fn({ ...valueBag })
const newBag = getBag(valueBag, value)
logger([newBag], startTime)
loggers.onStepFinished([newBag], startTime)
return newBag
}
return mergeMap(wrappedStep, params?.maxConcurrency)
Expand Down
22 changes: 17 additions & 5 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { OperatorFunction } from 'rxjs'
import type { BatchParams } from './operators/batch'
import type { PipeParams } from './operators/pipe'
import { InternalOnStepFinished } from './utils/onStepFinished'
import { InternalOnStepStarted } from './utils/onStepStarted'

// TODO: Proper typing for ValueBag!
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand All @@ -12,21 +14,31 @@ export type Operator = OperatorFunction<ValueBagOrBatch, ValueBagOrBatch>

export type PipeGenericParams = PipeParams | BatchParams

export type OnEachStep = (params: OnEachStepParams) => void
export type onStepStarted = (params: onStepStartedParams) => void
export type onStepFinished = (params: onStepFinishedParams) => void

export interface OnEachStepParams {
export interface onStepStartedParams {
name: string
received: number
valueBags: ValueBag[]
}

export interface onStepFinishedParams {
name: string
tookMs: number
emitted: number
valueBags: ValueBag[]
}

export type Loggers = { onStepFinished: InternalOnStepFinished; onStepStarted: InternalOnStepStarted }

export interface CaminhoOptions {
onEachStep?: OnEachStep
onStepFinished?: onStepFinished
onStepStarted?: onStepStarted
maxItemsFlowing?: number
}

export interface Accumulator<A> {
fn: (acc: A, value: ValueBag, index: number) => A,
seed: A,
fn: (acc: A, value: ValueBag, index: number) => A
seed: A
}
16 changes: 16 additions & 0 deletions src/utils/onStepFinished.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import type { onStepFinished, ValueBag } from '../types'

export type InternalOnStepFinished = (valueBags: ValueBag[], stepStartedAt: Date) => void

const stub = () => {}

export function getOnStepFinished(name: string, onStepFinished?: onStepFinished): InternalOnStepFinished {
if (onStepFinished) {
return function internalOnStepFinished(valueBags: ValueBag[], stepStartedAt: Date) {
const now = Date.now()
const tookMs = now - stepStartedAt.getTime()
onStepFinished({ name, tookMs, valueBags, emitted: valueBags.length })
}
}
return stub
}
Loading

0 comments on commit 0ae9b2a

Please sign in to comment.