Skip to content

Commit

Permalink
added: nodes clearing
Browse files Browse the repository at this point in the history
  • Loading branch information
Viktor Pasynok committed Nov 4, 2024
1 parent 67bf5cc commit 410027a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 51 deletions.
39 changes: 39 additions & 0 deletions src/compose/__tests__/up.clearNodes.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { allSettled, createEvent, createStore, fork, sample } from 'effector';
import { createContainer } from '../../createContainer';
import { compose } from '../index';

test('up.clearNodes | all containers resolved and API is available to use', async () => {
const scope = fork();
const trigger = createEvent<number>();
const $some = createStore<number | null>(null);

const a = createContainer({
id: 'a',
start: () => ({
api: {
multiply: (x: number) => x * 2,
},
}),
});
const b = createContainer({
id: 'b',
dependsOn: [a],
start: (d) => {
sample({
clock: trigger,
fn: d.a.multiply,
target: $some,
});

return { api: null };
},
});

await compose.up([a, b]);

await allSettled(trigger, { scope, params: 1 });
expect(scope.getState($some)).toBe(2);

await allSettled(trigger, { scope, params: 10 });
expect(scope.getState($some)).toBe(20);
});
2 changes: 1 addition & 1 deletion src/compose/__tests__/up.uniq-container-id.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('container.id is uniq', () => {
test('unhappy', () => {
const id = genContainerId();

expect(() => compose.up([createContainer(id), createContainer(id)])).toThrowError(
expect(() => compose.up([createContainer(id), createContainer(id)])).rejects.toThrowError(
`Duplicate container ID found: ${id}`,
);
});
Expand Down
107 changes: 57 additions & 50 deletions src/compose/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { combine, createEffect, launch, sample, type Store } from 'effector';
import { clearNode, combine, createEffect, launch, sample, type Store } from 'effector';
import { type AnyContainer, CONTAINER_STATUS, type ContainerStatus } from '../createContainer';

const validateContainerId = (id: string, set: Set<string>) => {
Expand All @@ -16,15 +16,13 @@ const statusIs = {
idle: (s: ContainerStatus) => s === CONTAINER_STATUS.idle,
};

const upFn = (containers: AnyContainer[], config?: { debug?: boolean }) => {
const upFn = async (containers: AnyContainer[], config?: { debug?: boolean }) => {
const CONTAINER_IDS = new Set<string>();

for (const container of containers) {
validateContainerId(container.id, CONTAINER_IDS);
}

let apis: Record<string, Awaited<ReturnType<AnyContainer['start']>>['api']> = {};

const containersStatuses = containers.reduce<Record<AnyContainer['id'], AnyContainer['$status']>>((acc, x) => {
acc[x.id] = x.$status;
return acc;
Expand All @@ -48,59 +46,68 @@ const upFn = (containers: AnyContainer[], config?: { debug?: boolean }) => {
});
}

for (const container of containers) {
const $strictDepsResolving: Store<ContainerStatus> = combine(
(container.dependsOn ?? []).map((d) => d.$status),
(x) => {
if (x.some(statusIs.off)) return CONTAINER_STATUS.off;
if (x.some(statusIs.fail)) return CONTAINER_STATUS.fail;
if (x.some(statusIs.pending)) return CONTAINER_STATUS.pending;

if (x.every(statusIs.done) || x.length === 0) return CONTAINER_STATUS.done;

return CONTAINER_STATUS.idle;
},
);
const $optionalDepsResolving: Store<ContainerStatus> = combine(
(container.optionalDependsOn ?? []).map((d) => d.$status),
(l) => {
if (l.some(statusIs.pending)) return CONTAINER_STATUS.pending;
if (l.some(statusIs.idle)) return CONTAINER_STATUS.idle;

return CONTAINER_STATUS.done;
},
);
const $depsDone = combine([$strictDepsResolving, $optionalDepsResolving], (l) => l.every(statusIs.done));

const enableFx = createEffect(async () => (container.enable ? await container.enable(apis, apis) : true));
const startFx = createEffect(async () => {
apis[container.id] = (await container.start(apis, apis))['api'];
});
let nodesToClear: Parameters<typeof clearNode>[0][] = [$result];
let apis: Record<string, Awaited<ReturnType<AnyContainer['start']>>['api']> = {};

sample({
clock: enableFx.doneData,
fn: (x) => (x ? CONTAINER_STATUS.pending : CONTAINER_STATUS.off),
target: container.$status,
});
sample({ clock: enableFx.failData, fn: () => CONTAINER_STATUS.fail, target: container.$status });
sample({ clock: container.$status, filter: statusIs.pending, target: startFx });
sample({ clock: startFx.finally, fn: (x) => x.status, target: container.$status });
await Promise.all(
containers.map(async (container) => {
const $strictDepsResolving: Store<ContainerStatus> = combine(
(container.dependsOn ?? []).map((d) => d.$status),
(x) => {
if (x.some(statusIs.off)) return CONTAINER_STATUS.off;
if (x.some(statusIs.fail)) return CONTAINER_STATUS.fail;
if (x.some(statusIs.pending)) return CONTAINER_STATUS.pending;

if (x.every(statusIs.done) || x.length === 0) return CONTAINER_STATUS.done;

return CONTAINER_STATUS.idle;
},
);
const $optionalDepsResolving: Store<ContainerStatus> = combine(
(container.optionalDependsOn ?? []).map((d) => d.$status),
(l) => {
if (l.some(statusIs.pending)) return CONTAINER_STATUS.pending;
if (l.some(statusIs.idle)) return CONTAINER_STATUS.idle;

return CONTAINER_STATUS.done;
},
);
const $depsDone = combine([$strictDepsResolving, $optionalDepsResolving], (l) => l.every(statusIs.done));

const enableFx = createEffect(async () => (container.enable ? await container.enable(apis, apis) : true));
const startFx = createEffect(async () => {
apis[container.id] = (await container.start(apis, apis))['api'];
});

sample({
clock: enableFx.doneData,
fn: (x) => (x ? CONTAINER_STATUS.pending : CONTAINER_STATUS.off),
target: container.$status,
});
sample({ clock: enableFx.failData, fn: () => CONTAINER_STATUS.fail, target: container.$status });
sample({ clock: container.$status, filter: statusIs.pending, target: startFx });
sample({ clock: startFx.finally, fn: (x) => x.status, target: container.$status });

$strictDepsResolving.watch((s) => {
if (statusIs.off(s) || statusIs.fail(s)) {
launch(container.$status, s);
}
});

$strictDepsResolving.watch((s) => {
if (statusIs.off(s) || statusIs.fail(s)) {
launch(container.$status, s);
}
});
$depsDone.watch((x) => {
if (x) enableFx();
});

$depsDone.watch((x) => {
if (x) enableFx();
});
}
nodesToClear = [...nodesToClear, $strictDepsResolving, $optionalDepsResolving, $depsDone, enableFx, startFx];
}),
);

return new Promise((resolve, reject) => {
$result.watch((x) => {
if (x.done === true) {
// fixme: clear all nodes
apis = {};
nodesToClear.forEach((x) => clearNode(x, { deep: true }));

if (x.hasErrors) {
reject(x);
}
Expand Down

0 comments on commit 410027a

Please sign in to comment.