diff --git a/README.md b/README.md index 4032892..f286475 100644 --- a/README.md +++ b/README.md @@ -1122,7 +1122,6 @@ Now, these tasks define the behaviour of our agent under certain conditions, but // Sensor.of.from works here too const temperature = Heater.sensor({ // The sensor returns values to set the path given by the lens below - // (no lens variables supported in this case) lens: '/roomTemp', // The sensor is a generator function that yields values obtained from // the measuring hardware @@ -1214,7 +1213,7 @@ const agent = Agent.from({ }); ``` -Passing a `trace` function to the agent will allow to get structured feedback of the planning progress. The rest of the functions (debug, info, warn,error) are called only by the agent runtime with textual information with different levels of detail. We intend to move to an [open telemetry](https://github.com/balena-io-modules/mahler/issues/40) for standardized logging. +Passing a `trace` function to the agent will allow to get structured feedback of the planning progress. The rest of the functions (debug, info, warn,error) are called only by the agent runtime with textual information with different levels of detail. ## Troubleshooting diff --git a/lib/agent.spec.ts b/lib/agent.spec.ts index 6532d24..69f0965 100644 --- a/lib/agent.spec.ts +++ b/lib/agent.spec.ts @@ -6,6 +6,9 @@ import { Sensor } from './sensor'; import { stub } from 'sinon'; import { setTimeout } from 'timers/promises'; +import { Observable } from './observable'; +import * as memoizee from 'memoizee'; +import { UNDEFINED } from './target'; describe('Agent', () => { describe('basic operations', () => { @@ -389,4 +392,229 @@ describe('Agent', () => { agent.stop(); }); }); + + // A more complex example that the heater, for a multi-room + // temperature control system + describe('Climate controller', () => { + type ClimateControl = { + [room: string]: { temperature: number; heaterOn: boolean }; + }; + const INITIAL_STATE: ClimateControl = { + office: { temperature: 15, heaterOn: false }, + bedroom: { temperature: 15, heaterOn: false }, + }; + const buildingState = structuredClone(INITIAL_STATE); + + // Reset the state before each test + beforeEach(() => { + Object.assign(buildingState, INITIAL_STATE); + }); + + // Memoize the update function so it's called at most per counter + // eslint-disable-next-line + const updateTemp = memoizee((_) => + Object.fromEntries( + Object.entries(buildingState).map(([roomName, roomState]) => + roomState.heaterOn + ? [roomName, ++roomState.temperature] + : [roomName, --roomState.temperature], + ), + ), + ); + + // Global monitor of temperature + // this simulates temperature change on rooms of a building + // the temperature of each room will drop 1 degree every + // 10ms if the heater is off and increase 1 degree if heater is on + const climateMonitor = Observable.interval(10).map(updateTemp); + + const roomSensor = Sensor.of().from({ + lens: '/:room/temperature', + sensor: ({ room }) => climateMonitor.map((climate) => climate[room]), + }); + + const turnOn = Task.of().from({ + lens: '/:room', + condition: (room, { target }) => + room.temperature < target.temperature && !room.heaterOn, + effect(room, { target }) { + // Turning the resistor on does not change the temperature + // immediately, but the effect is that the temperature eventually + // will reach that point + room._.temperature = target.temperature; + room._.heaterOn = true; + }, + async action(room) { + room._.heaterOn = true; + }, + description: ({ room }) => `turn heater on in ${room}`, + }); + + const turnOff = Task.of().from({ + lens: '/:room', + condition: (room, { target }) => + room.temperature > target.temperature && room.heaterOn, + effect(room, { target }) { + // Turning the resistor on does not change the temperature + // immediately, but the effect is that the temperature eventually + // will reach that point + room._.temperature = target.temperature; + room._.heaterOn = false; + }, + async action(room) { + room._.heaterOn = false; + }, + description: ({ room }) => `turn heater off in ${room}`, + }); + + const wait = Task.of().from({ + lens: '/:room', + condition: (room, { target }) => + // We have not reached the target but the resistor is already off + (room.temperature > target.temperature && !room.heaterOn) || + // We have not reached the target but the resistor is already on + (room.temperature < target.temperature && room.heaterOn), + effect: (room, { target }) => { + room._.temperature = target.temperature; + }, + action: NoAction, + description: ({ room, target }) => + `wait for temperature in ${room} to reach ${target.temperature}`, + }); + + const addRoom = Task.of().from({ + op: 'create', + lens: '/:room', + effect(room, { target }) { + room._ = target; + }, + }); + + const removeRoom = Task.of().from({ + op: 'delete', + lens: '/:room', + effect() { + /* noop */ + }, + }); + + it('should allow controlling the tempereture of a single room', async () => { + const climateControl = Agent.from({ + initial: INITIAL_STATE, + tasks: [turnOn, turnOff, wait, addRoom], + sensors: [roomSensor], + opts: { minWaitMs: 10, logger }, + }); + + climateControl.subscribe((s) => { + // Update the building state when + // the agent state changes + Object.assign(buildingState, s); + }); + + climateControl.seek({ bedroom: { temperature: 20 } }); + await expect(climateControl.wait(300)).to.be.fulfilled; + expect(climateControl.state().bedroom).to.deep.equal({ + temperature: 20, + heaterOn: true, + }); + + climateControl.stop(); + await setTimeout(50); + }); + + it('should allow controlling the temperature of multiple rooms', async () => { + const climateControl = Agent.from({ + initial: INITIAL_STATE, + tasks: [turnOn, turnOff, wait, addRoom], + sensors: [roomSensor], + opts: { minWaitMs: 10, logger }, + }); + + climateControl.subscribe((s) => { + // Update the building state when + // the agent state changes + Object.assign(buildingState, s); + }); + + // This is not a great example, because if the target for both + // rooms is not the same, then the controller will keep iterating + // as temperature will never settle + climateControl.seek({ + bedroom: { temperature: 20 }, + office: { temperature: 20 }, + }); + await expect(climateControl.wait(300)).to.be.fulfilled; + expect(climateControl.state()).to.deep.equal({ + bedroom: { temperature: 20, heaterOn: true }, + office: { temperature: 20, heaterOn: true }, + }); + + climateControl.stop(); + await setTimeout(50); + }); + + it('should allow controlling the temperature of a new room', async () => { + const climateControl = Agent.from({ + initial: INITIAL_STATE, + tasks: [turnOn, turnOff, wait, addRoom], + sensors: [roomSensor], + opts: { minWaitMs: 10, logger }, + }); + + climateControl.subscribe((s) => { + // Update the building state when + // the agent state changes + Object.assign(buildingState, s); + }); + + // This is not a great example, because if the target for both + // rooms is not the same, then the controller will keep iterating + // as temperature will never settle + climateControl.seek({ + studio: { temperature: 20 }, + }); + await expect(climateControl.wait(300)).to.be.fulfilled; + expect(climateControl.state().studio).to.deep.equal({ + temperature: 20, + heaterOn: true, + }); + + climateControl.stop(); + await setTimeout(50); + }); + + it('should allow removing a room and still control temperature', async () => { + const climateControl = Agent.from({ + initial: INITIAL_STATE, + tasks: [turnOn, turnOff, wait, addRoom, removeRoom], + sensors: [roomSensor], + opts: { minWaitMs: 10, logger }, + }); + + climateControl.subscribe((s) => { + // Update the building state when + // the agent state changes + Object.assign(buildingState, s); + }); + + // This is not a great example, because if the target for both + // rooms is not the same, then the controller will keep iterating + // as temperature will never settle + climateControl.seek({ + bedroom: { temperature: 20 }, + office: UNDEFINED, + }); + await expect(climateControl.wait(300)).to.be.fulfilled; + expect(climateControl.state()).to.deep.equal({ + bedroom: { + temperature: 20, + heaterOn: true, + }, + }); + + climateControl.stop(); + await setTimeout(50); + }); + }); }); diff --git a/lib/agent/index.ts b/lib/agent/index.ts index d56f081..223ea28 100644 --- a/lib/agent/index.ts +++ b/lib/agent/index.ts @@ -9,6 +9,7 @@ import type { Task } from '../task'; import { Runtime } from './runtime'; import type { AgentOpts, Result } from './types'; import { NotStarted } from './types'; +import type { Path } from '../path'; export * from './types'; @@ -176,13 +177,13 @@ function from( | { initial: TState; planner?: Planner; - sensors?: Array>; + sensors?: Array>; opts?: DeepPartial; } | { initial: TState; tasks?: Array>; - sensors?: Array>; + sensors?: Array>; opts?: DeepPartial; }, ): Agent; @@ -199,7 +200,7 @@ function from({ initial: TState; tasks?: Array>; planner?: Planner; - sensors?: Array>; + sensors?: Array>; opts?: DeepPartial; }): Agent { const opts: AgentOpts = { @@ -220,10 +221,12 @@ function from({ assert(opts.maxWaitMs > 0, 'opts.maxWaitMs must be greater than 0'); assert(opts.minWaitMs > 0, 'opts.minWaitMs must be greater than 0'); - const subject: Subject = new Subject(); + // Isolate the local state from the user input + state = structuredClone(state); // Subscribe to runtime changes to keep // the local copy of state up-to-date + const subject: Subject = new Subject(); subject.subscribe((s) => { state = s; }); diff --git a/lib/agent/runtime.ts b/lib/agent/runtime.ts index 5b67935..39773ed 100644 --- a/lib/agent/runtime.ts +++ b/lib/agent/runtime.ts @@ -16,6 +16,9 @@ import type { AgentOpts, Result } from './types'; import { Failure, NotStarted, Stopped, Timeout, UnknownError } from './types'; import * as DAG from '../dag'; +import { Path } from '../path'; +import { Lens } from '../lens'; +import { Pointer } from 'lib/pointer'; class ActionError extends Error { constructor( @@ -81,7 +84,7 @@ export class Runtime { private running = false; private stopped = false; - private subscribed: Subscription[] = []; + private subscriptions: Record = {}; private stateRef: Ref; constructor( @@ -89,27 +92,13 @@ export class Runtime { state: TState, private readonly target: Target | StrictTarget, private readonly planner: Planner, - sensors: Array>, + private readonly sensors: Array>, private readonly opts: AgentOpts, private readonly strict: boolean, ) { this.stateRef = Ref.of(state); - // add subscribers to sensors - this.subscribed = sensors.map((sensor) => - sensor(this.stateRef).subscribe((s) => { - // There is no need to update the state reference as the sensor already - // modifies the state. We don't handle concurrency as we expect that whatever - // value modified by sensologrs does not conflict with the value modified by - // actions - if (opts.follow) { - // Trigger a re-plan to see if the state is still on target - this.start(); - } else { - // Notify the observer of the new state - this.observer.next(s); - } - }), - ); + // Perform actions based on the new state + this.onStateChange(Path.from('/')); } public get state() { @@ -167,6 +156,51 @@ export class Runtime { return result; } + private onStateChange(changedPath: Path) { + // for every existing subscription, check if the path still + // exists, if it doesn't unsusbcribe + (Object.keys(this.subscriptions) as Path[]) + .filter((p) => Lens.startsWith(p, changedPath)) + .forEach((p) => { + const parent = Pointer.from(this.stateRef._, Path.source(p)); + // If the parent does not exist or the key does not exist + // then delete the sensor + if (parent == null || !Object.hasOwn(parent, Path.basename(p))) { + this.subscriptions[p].unsubscribe(); + delete this.subscriptions[p]; + } + }); + + // For every sensor, find the applicable paths + // under the changed path + const sApplicablePaths = this.sensors.map((sensor) => ({ + sensor, + paths: Lens.findAll(this.stateRef._, sensor.lens, changedPath), + })); + + // for every sensor, see if there are new elements + // matching the sensor path, if there are, subscribe + for (const { sensor, paths } of sApplicablePaths) { + for (const p of paths) { + if (p in this.subscriptions) { + continue; + } + this.subscriptions[p] = sensor(this.stateRef, p).subscribe((s) => { + // There is no need to update the state reference as the sensor already + // modifies the state. We don't handle concurrency as we assume sensors + // do not conflict with each other (should we check?) + if (this.opts.follow) { + // Trigger a re-plan to see if the state is still on target + this.start(); + } else { + // Notify the observer of the new state + this.observer.next(s); + } + }); + } + } + } + private async runPlan(root: PlanNode | null) { const { logger } = this.opts; @@ -197,6 +231,7 @@ export class Runtime { // if an error occurs logger.info(`${action.description}: running ...`); await observe(action, this.observer)(this.stateRef); + this.onStateChange(action.path); // update the state of the runtime logger.info(`${action.description}: success`); } catch (e) { logger.error(`${action.description}: failed`, e); @@ -342,7 +377,7 @@ export class Runtime { await this.promise; // Unsubscribe from sensors - this.subscribed.forEach((s) => s.unsubscribe()); + Object.values(this.subscriptions).forEach((s) => s.unsubscribe()); } async wait(timeout = 0): Promise> { diff --git a/lib/lens.spec.ts b/lib/lens.spec.ts index 1b2addd..6131ced 100644 --- a/lib/lens.spec.ts +++ b/lib/lens.spec.ts @@ -72,4 +72,79 @@ describe('Lens', () => { }); }); }); + + describe('startsWith', () => { + it('compares a lens with a starting path', () => { + expect(Lens.startsWith(Path.from('/a/b/c'), Path.from('/a'))).to.be.true; + expect(Lens.startsWith(Path.from('/a/b/c'), Path.from('/a/b'))).to.be + .true; + expect(Lens.startsWith(Path.from('/a/b/c'), Path.from('/a/b/c'))).to.be + .true; + expect(Lens.startsWith(Path.from('/a/:loc/c'), Path.from('/a/b/c'))).to.be + .true; + expect(Lens.startsWith(Path.from('/:key/:loc/c'), Path.from('/a/b/c'))).to + .be.true; + expect( + Lens.startsWith(Path.from('/:key/:loc/:other'), Path.from('/a/b/c')), + ).to.be.true; + expect(Lens.startsWith(Path.from('/x/:loc/:other'), Path.from('/a/b/c'))) + .to.be.false; + expect(Lens.startsWith(Path.from('/:key/x/:other'), Path.from('/a/b/c'))) + .to.be.false; + expect(Lens.startsWith(Path.from('/a/x/:other'), Path.from('/a/b/c'))).to + .be.false; + expect(Lens.startsWith(Path.from('/a/b/:other'), Path.from('/a/b/c/d'))) + .to.be.false; + expect(Lens.startsWith(Path.from('/a/b/c'), Path.from('/a/b/c/d'))).to.be + .false; + }); + }); + + describe('findAll', () => { + it('gets the context for all elements matching the lens', () => { + type State = { + [k: string]: { b: { c: Array<{ e: string }> }; d: number } | null; + }; + const s: State = { + a: { b: { c: [{ e: 'one' }, { e: 'two' }, { e: 'three' }] }, d: 4 }, + z: { b: { c: [{ e: 'nine' }, { e: 'ten' }] }, d: 5 }, + x: null, + }; + + expect(Lens.findAll(s, Path.from('/a/b/c/:pos/e'))).to.deep.equal([ + '/a/b/c/0/e', + '/a/b/c/1/e', + '/a/b/c/2/e', + ]); + expect(Lens.findAll(s, Path.from('/:key/b/c/:pos/e'))).to.deep.equal([ + '/a/b/c/0/e', + '/a/b/c/1/e', + '/a/b/c/2/e', + '/z/b/c/0/e', + '/z/b/c/1/e', + ]); + expect(Lens.findAll(s, Path.from('/a/b/c'))).to.deep.equal(['/a/b/c']); + expect(Lens.findAll(s, Path.from('/a/d'))).to.deep.equal(['/a/d']); + expect(Lens.findAll(s, Path.from('/x'))).to.deep.equal(['/x']); + expect(Lens.findAll(s, Path.from('/x/b/c'))).to.deep.equal([]); + }); + + it('allows to start the search at a subpath', () => { + type State = { + [k: string]: { b: { c: Array<{ e: string }> }; d: number } | null; + }; + const s: State = { + a: { b: { c: [{ e: 'one' }, { e: 'two' }, { e: 'three' }] }, d: 4 }, + z: { b: { c: [{ e: 'nine' }, { e: 'ten' }] }, d: 5 }, + x: null, + }; + + expect( + Lens.findAll(s, Path.from('/a/b/c/:pos/e'), Path.from('/a/b')), + ).to.deep.equal(['/a/b/c/0/e', '/a/b/c/1/e', '/a/b/c/2/e']); + expect( + Lens.findAll(s, Path.from('/:key/b/c/:pos/e'), Path.from('/a/b')), + ).to.deep.equal(['/a/b/c/0/e', '/a/b/c/1/e', '/a/b/c/2/e']); + }); + }); }); diff --git a/lib/lens.ts b/lib/lens.ts index 08f9a80..8d4b1c9 100644 --- a/lib/lens.ts +++ b/lib/lens.ts @@ -16,6 +16,14 @@ export type LensContext = LensWithSlash< object >; +/** + * The arguments from the lens + */ +export type LensArgs = Omit< + LensContext, + 'target' | 'path' +>; + // A lens to evaluate paths starting with a slash type LensWithSlash< TChildState, @@ -181,7 +189,126 @@ function createLens( return Pointer.from(s, p) as Lens; } +/** + * Test if a lens given as first argument starts + * with the path given as second argument + */ +function startsWith( + lens: Path, + path: Path, +): boolean { + const pathParts = Path.split(path); + const lensParts = Path.split(lens); + + // There will never be a match in this case + // no point in searching + if (lensParts.length < pathParts.length) { + return false; + } + + // Find the starting context comparing the + // initial path with the lens + for (const k of pathParts) { + // We know the key cannot be undefined because + // of the length comparison before + const key = lensParts.shift()!; + + // If the keys don't match terminate the search + if (!key.startsWith(':') && k !== key) { + return false; + } + } + return true; +} + +/** + * Find all elements of the given state object that match + * the lens, starting at `initialPath` + * + * This function never throws, if the lens does not + * start with the initial path or there are no matches + * the function will return an empty array + */ +function findAll< + TState, + TPath extends PathType, + TStartPath extends PathType = '/', +>( + state: TState, + lens: Path, + initialPath: Path = Path.from('/') as Path, +): Path[] { + const initialParts = Path.split(initialPath); + const lensParts = Path.split(lens); + + // There will never be a match in this case + // no point in searching + if (lensParts.length < initialParts.length) { + return []; + } + + let initialTarget: any = state; + // Find the starting context comparing the + // initial path with the lens + for (const k of initialParts) { + // No point in continuing the search in this case + if (initialTarget == null || typeof initialTarget !== 'object') { + return []; + } + + // We know the key cannot be undefined because + // of the length comparison before + const key = lensParts.shift()!; + + // If the key starts with `:` we continue searching + if (!key.startsWith(':') && (k !== key || !(k in initialTarget))) { + // The initial path does not match the lens or + // the state + return []; + } + initialTarget = initialTarget[k]; + } + + let contextList: Array> = [ + { target: initialTarget, path: initialPath }, + ]; + + for (const key of lensParts) { + contextList = contextList.flatMap(({ target, path }) => { + // This is not a valid search path so we ignore it + if (target == null || typeof target !== 'object') { + return []; + } + + if (key.startsWith(':')) { + // If the key is a varible, we need to add all sub elements + // of the current object + return Object.entries(target).map(([k, v]) => ({ + target: v, + path: Path.join(path, k), + })); + } else if (key in target) { + // Otherwise just return the subelement of + // the current object + return [ + { + target: (target as any)[key], + path: Path.join(path, key), + }, + ]; + } else { + return []; + } + }); + } + + return contextList.map(({ path }) => path); +} + export const Lens = { context, + args: params, from: createLens, + findAll, + startsWith, }; diff --git a/lib/observable.spec.ts b/lib/observable.spec.ts index 10edd75..aba569a 100644 --- a/lib/observable.spec.ts +++ b/lib/observable.spec.ts @@ -1,22 +1,8 @@ import { expect } from '~/test-utils'; -import { Observable } from './observable'; -import { promisify } from 'util'; +import { Observable, interval } from './observable'; import { stub, useFakeTimers } from 'sinon'; -const interval = (period: number): Observable => { - const sleep = promisify(setTimeout); - return Observable.from( - (async function* () { - let i = 0; - while (true) { - await sleep(period); - yield i++; - } - })(), - ); -}; - describe('Observable', () => { let clock: sinon.SinonFakeTimers; @@ -176,6 +162,24 @@ describe('Observable', () => { subscriber.unsubscribe(); }); + it('allows filtering values', async () => { + const o = interval(10).filter((x) => x % 2 === 0); + + // Add a subscriber + const next = stub(); + const subscriber = o.subscribe(next); + + await clock.tickAsync(50); + + // Only now the sensor function should be called + expect(next).to.have.been.calledThrice; + expect(next).to.have.been.calledWith(0); + expect(next).to.have.been.calledWith(2); + expect(next).to.have.been.calledWith(4); + + subscriber.unsubscribe(); + }); + it('propagates errors', async () => { const letters = Observable.from( (function* () { diff --git a/lib/observable.ts b/lib/observable.ts index 7230239..6e9fdaf 100644 --- a/lib/observable.ts +++ b/lib/observable.ts @@ -1,3 +1,5 @@ +import { promisify } from 'util'; + export type Next = (t: T) => void; export interface Observer { @@ -19,7 +21,17 @@ export interface Subscribable { } export interface Observable extends Subscribable { + /** + * Transform a stream of values passing it through + * a mapping function + */ map(f: (t: T) => U): Observable; + + /** + * Create a new stream only for values that match a + * certain filtering function + */ + filter(f: (t: T) => boolean): Observable; } /** @@ -110,7 +122,7 @@ function isSyncIterable(x: unknown): x is Iterable { return x != null && typeof x === 'object' && Symbol.iterator in x; } -function iIterable(x: unknown): x is AsyncIterable | Iterable { +function isIterable(x: unknown): x is AsyncIterable | Iterable { return ( x != null && typeof x === 'object' && @@ -209,7 +221,7 @@ function multiplexIterable(input: Iterable | AsyncIterable) { function from(input: ObservableInput): Observable { let items: () => AsyncIterable; - if (iIterable(input)) { + if (isIterable(input)) { items = multiplexIterable(input); } const self: Observable = { @@ -253,12 +265,18 @@ function from(input: ObservableInput): Observable { map(f: (t: T) => U): Observable { return from(map(self, f)); }, + filter(f: (t: T) => boolean): Observable { + return from(filter(self, f)); + }, }; return self; } function map(o: Subscribable, f: (t: T) => U): Subscribable { + // QUESTION: should we memoize f so it's called at most once with + // each value? Currently, it will be N*M times where N is the number of subscriptors + // and M is the number of calls per subscriptor return { subscribe(subscriber: Observer): Subscription { return o.subscribe({ @@ -269,6 +287,17 @@ function map(o: Subscribable, f: (t: T) => U): Subscribable { }; } +function filter(o: Subscribable, f: (t: T) => boolean): Subscribable { + return { + subscribe(subscriber: Observer): Subscription { + return o.subscribe({ + ...subscriber, + next: (t) => f(t) && subscriber.next(t), + }); + }, + }; +} + function of(...values: T[]): Observable { return from(values); } @@ -277,9 +306,30 @@ function is(x: unknown): x is Observable { return isSubscribable(x) && typeof (x as any).map === 'function'; } +/** + * Utility function to return a value on an interval. + * This is a useful observable to build new observables from + */ +export function interval(periodMs: number): Observable { + // We use promisify instead of `timers/promises` because it works + // works for testing with sinon faketimers + const sleep = promisify(setTimeout); + return Observable.from( + (async function* () { + let i = 0; + while (true) { + await sleep(periodMs); + yield i++; + } + })(), + ); +} + export const Observable = { of, from, is, map, + filter, + interval, }; diff --git a/lib/path.ts b/lib/path.ts index 227068e..9efa357 100644 --- a/lib/path.ts +++ b/lib/path.ts @@ -64,7 +64,41 @@ function from(p: T): Path { return res as Path; } +function join(p: Path, s: T) { + return from(split(p).concat(s)); +} + +/** + * Return the source (parent) of the path + * + * e.g. + * ``` + * Path.source(Path.from('/a/b/c')) // '/a/b' + * ``` + */ +function source(p: Path) { + const parts = split(p); + parts.pop(); + return from(parts); +} + +/** + * Return the path base name + * + * e.g. + * ``` + * Path.basename(Path.from('/a/b/c')) // 'a' + * ``` + */ +function basename(p: Path) { + const parts = split(p); + return parts.pop() || ''; +} + export const Path = { from, split, + join, + source, + basename, }; diff --git a/lib/pointer.spec.ts b/lib/pointer.spec.ts index a2e9b19..82bdcd8 100644 --- a/lib/pointer.spec.ts +++ b/lib/pointer.spec.ts @@ -1,52 +1,70 @@ import { expect } from '~/test-utils'; import { Pointer } from './pointer'; +import { Path } from './path'; describe('Pointer', () => { describe('of', () => { it('calculates the pointer to the path', () => { expect( - Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, '/a'), + Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, Path.from('/a')), ).to.equal(1); expect( - Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, '/b'), + Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, Path.from('/b')), ).to.deep.equal({ c: 2, d: { e: 'hello' } }); expect( - Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, '/b/c'), + Pointer.from( + { a: 1, b: { c: 2, d: { e: 'hello' } } }, + Path.from('/b/c'), + ), ).to.equal(2); expect( - Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, '/b/d'), + Pointer.from( + { a: 1, b: { c: 2, d: { e: 'hello' } } }, + Path.from('/b/d'), + ), ).to.deep.equal({ e: 'hello' }); expect( - Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, '/b/d/e'), + Pointer.from( + { a: 1, b: { c: 2, d: { e: 'hello' } } }, + Path.from('/b/d/e'), + ), ).to.deep.equal('hello'); expect( - Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, '/'), + Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, Path.from('/')), ).to.deep.equal({ a: 1, b: { c: 2, d: { e: 'hello' } } }); - expect(Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, '/x')).to.be - .undefined; - expect(Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, '/b/d/x')) - .to.be.undefined; + expect( + Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, Path.from('/x')), + ).to.be.undefined; + expect( + Pointer.from( + { a: 1, b: { c: 2, d: { e: 'hello' } } }, + Path.from('/b/d/x'), + ), + ).to.be.undefined; expect(() => - Pointer.from({ a: 1, b: { c: 2, d: { e: 'hello' } } }, '/a/b/x'), + Pointer.from( + { a: 1, b: { c: 2, d: { e: 'hello' } } }, + Path.from('/a/b/x'), + ), ).to.throw; }); it('calculates the pointer to a path in an array', () => { - expect(Pointer.from([1, 2, 3], '')).to.deep.equal([1, 2, 3]); - expect(Pointer.from([1, 2, 3], '/')).to.deep.equal([1, 2, 3]); - expect(Pointer.from([1, 2, 3], '/0')).to.equal(1); - expect(Pointer.from([1, 2, 3], '/1')).to.equal(2); - expect(Pointer.from({ a: [1, 2, 3] }, '/a/1')).to.equal(2); - expect(() => Pointer.from({ a: [1, 2, 3] }, '/a/b')).to.throw; - expect(Pointer.from({ a: [1, 2, { b: 'hello' }] }, '/a/2')).to.deep.equal( - { - b: 'hello', - }, - ); - expect(Pointer.from({ a: [1, 2, { b: 'hello' }] }, '/a/2/b')).to.equal( - 'hello', - ); + expect(Pointer.from([1, 2, 3], Path.from(''))).to.deep.equal([1, 2, 3]); + expect(Pointer.from([1, 2, 3], Path.from('/'))).to.deep.equal([1, 2, 3]); + expect(Pointer.from([1, 2, 3], Path.from('/0'))).to.equal(1); + expect(Pointer.from([1, 2, 3], Path.from('/1'))).to.equal(2); + expect(Pointer.from({ a: [1, 2, 3] }, Path.from('/a/1'))).to.equal(2); + expect(() => Pointer.from({ a: [1, 2, 3] }, Path.from('/a/b'))).to.throw; + expect( + Pointer.from({ a: [1, 2, { b: 'hello' }] }, Path.from('/a/2')), + ).to.deep.equal({ + b: 'hello', + }); + expect( + Pointer.from({ a: [1, 2, { b: 'hello' }] }, Path.from('/a/2/b')), + ).to.equal('hello'); }); }); }); diff --git a/lib/sensor.spec.ts b/lib/sensor.spec.ts index c2832e4..1e3352e 100644 --- a/lib/sensor.spec.ts +++ b/lib/sensor.spec.ts @@ -3,6 +3,7 @@ import { Sensor } from './sensor'; import { Ref } from './ref'; import { stub } from 'sinon'; import { setTimeout } from 'timers/promises'; +import { Observable } from './observable'; describe('Sensor', () => { it('only starts execution once subscribers have been added', async () => { @@ -31,7 +32,7 @@ describe('Sensor', () => { expect(state._).to.equal(123); }); - it('allows defining a value using lenses', async () => { + it('allows reporting on a value using lenses', async () => { type Heater = { temperature: number; on: boolean }; const sensor = Sensor.of().from({ lens: '/temperature', @@ -52,4 +53,118 @@ describe('Sensor', () => { expect(next).to.have.been.calledWith({ temperature: 23, on: false }); expect(state._.temperature).to.equal(23); }); + + it('allows reporting on a value using observable', async () => { + type Heater = { temperature: number; on: boolean }; + const sensor = Sensor.of().from({ + lens: '/temperature', + sensor: () => Observable.from([20, 23]), + }); + + const state = Ref.of({ temperature: 0, on: false }); + + const next = stub(); + sensor(state).subscribe(next); + + await setTimeout(10); + + expect(next).to.have.been.calledWith({ temperature: 20, on: false }); + expect(next).to.have.been.calledWith({ temperature: 23, on: false }); + expect(state._.temperature).to.equal(23); + }); + + it('allows reporting on a value using observables and lenses', async () => { + type Heater = { temperature: { [room: string]: number }; on: boolean }; + const sensor = Sensor.of().from({ + lens: '/temperature/:room', + sensor: ({ room }) => + Observable.from([ + { room: 'office', temp: 20 }, + { room: 'patio', temp: 30 }, + { room: 'office', temp: 23 }, + ]) + .filter(({ room: r }) => room === r) + .map(({ temp }) => temp), + }); + + const state: Ref = Ref.of({ + temperature: { office: 0, patio: 0 }, + on: false, + }); + + const next = stub(); + const nextOther = stub(); + sensor(state, '/temperature/office').subscribe(next); + sensor(state, '/temperature/patio').subscribe(nextOther); + + // A sensor for an uninitialized path should not throw + expect(() => sensor(state, '/temperature/bedroom')).to.not.throw; + + await setTimeout(10); + + expect(next.getCalls().length).to.equal(2); + expect(next).to.have.been.calledWith({ + temperature: { office: 20, patio: 0 }, + on: false, + }); + expect(next).to.have.been.calledWith({ + temperature: { office: 23, patio: 30 }, + on: false, + }); + expect(nextOther).to.have.been.calledOnceWith({ + temperature: { office: 20, patio: 30 }, + on: false, + }); + expect(state._.temperature.office).to.equal(23); + }); + + it('allows reporting on a value using lenses with args', async () => { + type Heater = { temperature: { [room: string]: number }; on: boolean }; + const sensor = Sensor.of().from({ + lens: '/temperature/:room', + sensor: async function* ({ room }) { + if (room === 'office') { + // First result + yield 20; + await setTimeout(15); + // Third result + yield 23; + } else { + await setTimeout(10); + // Second result + yield 30; + } + }, + }); + + const state: Ref = Ref.of({ + temperature: { office: 0, patio: 0 }, + on: false, + }); + + const next = stub(); + const nextOther = stub(); + sensor(state, '/temperature/office').subscribe(next); + sensor(state, '/temperature/patio').subscribe(nextOther); + + // A sensor for an uninitialized path should throw + expect(() => sensor(state, '/temperature/bedroom')).to.throw; + + await setTimeout(20); + + expect(next.getCalls().length).to.equal(2); + expect(next).to.have.been.calledWith({ + temperature: { office: 20, patio: 0 }, + on: false, + }); + expect(next).to.have.been.calledWith({ + temperature: { office: 23, patio: 30 }, + on: false, + }); + expect(nextOther).to.have.been.calledOnceWith({ + temperature: { office: 20, patio: 30 }, + on: false, + }); + expect(state._.temperature.office).to.equal(23); + }); }); diff --git a/lib/sensor.ts b/lib/sensor.ts index 883cce9..17a71d7 100644 --- a/lib/sensor.ts +++ b/lib/sensor.ts @@ -1,8 +1,9 @@ -import type { Lens } from './lens'; +import { Lens } from './lens'; import type { PathType } from './path'; import { Path } from './path'; import type { Ref } from './ref'; import { View } from './view'; +import type { LensArgs } from './lens'; import type { Subscribable } from './observable'; import { Observable } from './observable'; @@ -11,33 +12,43 @@ import { Observable } from './observable'; * A Sensor function for type T is a function that returns a generator * that yields values of type T */ -export type SensorFn = () => - | AsyncGenerator - | Generator; +export type SensorFn = ( + args: LensArgs, +) => + | AsyncGenerator, never | void | Lens, void> + | Generator, never | void, void | undefined> + | Subscribable>; /** * A sensor receives a reference to a global state and * returns a subscribable that allows to observe changes * to the state returned by the sensor operation. */ -export type Sensor = (s: Ref) => Subscribable; +export type Sensor = + unknown extends Lens + ? // Default to the version with path if the lens cannot be resolved + { (s: Ref, path: PathType): Subscribable; lens: Path

} + : // Otherwise add a path if lens arguments are not empty + LensArgs extends Record + ? { (s: Ref): Subscribable; lens: Path

} + : { (s: Ref, path: PathType): Subscribable; lens: Path

}; /** * The sensor constructor properties */ -export interface SensorProps { +export interface SensorProps { /** * A lens to indicate what part of the state the sensor * will update as it runs. Unlike task lense, sensor lenses * cannot have properties for now */ - lens: P; + lens: TPath; /** * A sensor function. The function returns a generator that yields * values of the type of the lens */ - sensor: SensorFn>; + sensor: SensorFn; } /** @@ -48,30 +59,36 @@ export interface SensorProps { * If a function is passed, it is assumed to be the `sensor` function, and * the `lens` is assumed to be the root of the state. */ -function from( - input: SensorFn> | Partial>, -): Sensor { +function from( + input: SensorFn | Partial>, +): Sensor { const { - lens = '/' as P, + lens = '/' as TPath, sensor = function* () { /* noop */ }, } = typeof input === 'function' ? { sensor: input } : input; - return function (s) { - const view = View.from(s, Path.from(lens)); + const lensPath = Path.from(lens); + return Object.assign( + function (s: Ref, path: PathType = lens) { + const refPath = Path.from(path); + const args = Lens.args(lensPath, refPath) as LensArgs; + const view = View.from(s, refPath); - return Observable.from(sensor()).map((value) => { - // For each value emmited by the sensor - // we update the view and return the updated state - // to the subscriber - view._ = value; + return Observable.from(sensor(args)).map((value) => { + // For each value emmited by the sensor + // we update the view and return the updated state + // to the subscriber + view._ = value; - // We need to return a copy of the state here, otherwise - // subscribers would be able to change the behavior of the - // agent or other subscribers - return structuredClone(s._); - }); - }; + // We need to return a copy of the state here, otherwise + // subscribers would be able to change the behavior of the + // agent or other subscribers + return structuredClone(s._); + }); + }, + { lens: lensPath }, + ) as Sensor; } /** @@ -79,7 +96,7 @@ function from( * the same type. */ interface SensorBuilder { - from

(t: SensorProps): Sensor; + from

(t: SensorProps): Sensor; } function of(): SensorBuilder { diff --git a/package.json b/package.json index 9bcdb23..ea52386 100644 --- a/package.json +++ b/package.json @@ -54,6 +54,7 @@ "@types/chai-as-promised": "^7.1.4", "@types/debug": "^4.1.7", "@types/dockerode": "^3.3.16", + "@types/memoizee": "^0.4.11", "@types/mocha": "^8.2.2", "@types/sinon": "^10.0.13", "@types/sinon-chai": "^3.2.9", @@ -66,6 +67,7 @@ "dockerode": "^3.3.5", "husky": "^4.2.5", "lint-staged": "^11.0.0", + "memoizee": "^0.4.15", "mocha": "^8.4.0", "rimraf": "^3.0.2", "sinon": "^16.1.0",