Skip to content

Commit

Permalink
chore: Dashboard filters refactor (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
matvp91 authored Aug 30, 2024
1 parent f699140 commit 7526da6
Show file tree
Hide file tree
Showing 25 changed files with 415 additions and 263 deletions.
2 changes: 1 addition & 1 deletion packages/api/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ export { streamSchema, inputSchema } from "@mixwave/artisan/schemas";

export * from "./contract";

export type { JobDto, JobNodeDto } from "./types";
export type { JobDto } from "./types";
16 changes: 8 additions & 8 deletions packages/api/src/contract.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { initContract } from "@ts-rest/core";
import { streamSchema, inputSchema } from "@mixwave/artisan/schemas";
import * as z from "zod";
import type { JobDto, JobNodeDto } from "./types.js";
import type { JobDto } from "./types.js";

const c = initContract();

Expand All @@ -10,14 +10,14 @@ export const postTranscodeBodySchema = z.object({
streams: z.array(streamSchema),
segmentSize: z.number().default(4),
assetId: z.string().uuid().optional(),
package: z.boolean().default(false),
tag: z.string().default("default"),
packageAfter: z.boolean().default(false),
tag: z.string().optional(),
});

export const postPackageBodySchema = z.object({
assetId: z.string(),
segmentSize: z.number().default(4),
tag: z.string().default("default"),
tag: z.string().optional(),
});

export const contract = c.router({
Expand Down Expand Up @@ -50,11 +50,11 @@ export const contract = c.router({
method: "GET",
path: "/jobs/:id",
responses: {
200: c.type<{
job: JobDto;
rootTree: JobNodeDto;
}>(),
200: c.type<JobDto>(),
},
query: z.object({
fromRoot: z.coerce.boolean().default(false),
}),
},
getJobLogs: {
method: "GET",
Expand Down
15 changes: 4 additions & 11 deletions packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import { contract } from "./contract.js";
import { bullBoardPlugin } from "./plugins/bull-board.js";
import { initServer } from "@ts-rest/fastify";
import { addTranscodeJob, addPackageJob } from "@mixwave/artisan/producer";
import { getJobs, getJob, getRootTreeForJobById, getJobLogs } from "./jobs.js";
import { getJobs, getJob, getJobLogs } from "./jobs.js";
import { generateOpenApi } from "@ts-rest/open-api";
import { randomUUID } from "crypto";

async function buildServer() {
const app = Fastify();
Expand All @@ -18,10 +17,7 @@ async function buildServer() {

const router = s.router(contract, {
postTranscode: async ({ body }) => {
const job = await addTranscodeJob({
assetId: randomUUID(),
...body,
});
const job = await addTranscodeJob(body);
return {
status: 201,
body: { jobId: job.id },
Expand All @@ -40,13 +36,10 @@ async function buildServer() {
body: await getJobs(),
};
},
getJob: async ({ params }) => {
getJob: async ({ params, query }) => {
return {
status: 200,
body: {
job: await getJob(params.id),
rootTree: await getRootTreeForJobById(params.id),
},
body: await getJob(params.id, query.fromRoot),
};
},
getJobLogs: async ({ params }) => {
Expand Down
166 changes: 98 additions & 68 deletions packages/api/src/jobs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { allQueus, flowProducer } from "@mixwave/artisan/producer";
import { JobNode, Job } from "bullmq";
import { JobNode, Job, JobState } from "bullmq";
import extract from "object-property-extractor";
import type { JobDto, JobNodeDto } from "./types.js";
import type { JobDto } from "./types.js";

function findQueueByName(name: string) {
const queue = allQueus.find((queue) => queue.name === name);
Expand All @@ -11,112 +11,142 @@ function findQueueByName(name: string) {
return queue;
}

function formatIdPair(id: string) {
const queueName = id.split("_", 1)[0];
return [findQueueByName(queueName), id] as const;
}

export async function getJobs(): Promise<JobDto[]> {
const result: JobDto[] = [];

for (const queue of allQueus) {
const jobs = await queue.getJobs();

const filteredJobs = jobs.filter((job) => !job.parent);

result.push(...(await Promise.all(filteredJobs.map(formatJobDto))));
for (const job of jobs) {
if (!job.id || job.parent) {
continue;
}
result.push(await getJob(job.id, false));
}
}

result.sort((a, b) => b.createdOn - a.createdOn);

return result;
}

async function formatJobDto(job: Job): Promise<JobDto> {
if (!job.id) {
throw new Error("Missing jobId");
}

let progress = 0;
if (typeof job.progress === "number") {
progress = job.progress;
}

return {
id: job.id,
name: job.name,
state: await job.getState(),
progress,
finishedOn: job.finishedOn ?? null,
processedOn: job.processedOn ?? null,
createdOn: job.timestamp,
inputData: JSON.stringify(job.data),
outputData: job.returnvalue ? JSON.stringify(job.returnvalue) : null,
failedReason: job.failedReason ?? null,
tag: extract(job.data, "metadata.tag", null),
};
export async function getJob(id: string, fromRoot: boolean) {
const node = await getJobNode(id, fromRoot);
return await formatJobNode(node);
}

export async function getJobLogs(id: string) {
const queueName = id.split("_", 1)[0];
const queue = findQueueByName(queueName);
const [queue, jobId] = formatIdPair(id);

const { logs } = await queue.getJobLogs(id);
const { logs } = await queue.getJobLogs(jobId);

return logs;
}

export async function getJob(id: string) {
const queueName = id.split("_", 1)[0];
const queue = findQueueByName(queueName);
async function getJobNode(id: string, fromRoot: boolean) {
const [queue, jobId] = formatIdPair(id);

const job = await Job.fromId(queue, id);
if (!job) {
let job = await Job.fromId(queue, jobId);
if (fromRoot) {
// If we want the root, resolve it and work with that as our job.
job = await findRootJob(job);
}

if (!job?.id) {
throw new Error("No job found.");
}

return await formatJobDto(job);
return await flowProducer.getFlow({
id: job.id,
queueName: job.queueName,
});
}

async function formatJobNodeDto(node: JobNode): Promise<JobNodeDto> {
const children = node.children ?? [];

const findParentSortKey = (obj: unknown) =>
extract(obj, "data.metadata.parentSortKey", 0);
children.sort((a, b) => findParentSortKey(a.job) - findParentSortKey(b.job));

return {
job: await formatJobDto(node.job),
children: await Promise.all(children.map(formatJobNodeDto)),
};
}
async function findRootJob(job?: Job) {
if (!job) {
return;
}

export async function getRootTreeForJob(job: Job) {
while (job.parent) {
// TODO: Replacing bull internals is not a good idea, find another way to
// properly get queue.
const queue = findQueueByName(job.parent.queueKey.replace("bull:", ""));
const parentJob = await Job.fromId(queue, job.parent.id);
const [queue, jobId] = formatIdPair(job.parent.id);
const parentJob = await Job.fromId(queue, jobId);
if (!parentJob) {
throw new Error("No parent job found.");
}
job = parentJob;
}

return job;
}

async function formatJobNode(node: JobNode): Promise<JobDto> {
const { job, children } = node;
if (!job.id) {
throw new Error("Missing job id.");
throw new Error("Missing job id");
}

const node = await flowProducer.getFlow({
id: job.id,
queueName: job.queueName,
});
let progress = 0;
if (typeof job.progress === "number") {
progress = job.progress;
}

return await formatJobNodeDto(node);
}
const state = mapJobState(await job.getState());

export async function getRootTreeForJobById(id: string) {
const queueName = id.split("_", 1)[0];
const queue = findQueueByName(queueName);
const failedReason = state === "failed" ? job.failedReason : null;

const job = await Job.fromId(queue, id);
if (!job) {
throw new Error("No job found.");
const findParentSortKey = (obj: unknown) =>
extract(obj, "data.metadata.parentSortKey", 0);
(children ?? []).sort(
(a, b) => findParentSortKey(a.job) - findParentSortKey(b.job),
);

const jobChildren = await Promise.all((children ?? []).map(formatJobNode));

let processedOn = job.processedOn;
if (processedOn) {
for (const jobChild of jobChildren) {
if (jobChild.processedOn && jobChild.processedOn < processedOn) {
processedOn = jobChild.processedOn;
}
}
}

return await getRootTreeForJob(job);
const duration =
state === "completed" && processedOn && job.finishedOn
? job.finishedOn - processedOn
: null;

return {
id: job.id,
name: job.name,
state,
progress,
duration,
processedOn: job.processedOn ?? null,
finishedOn: job.finishedOn ?? null,
createdOn: job.timestamp,
inputData: JSON.stringify(job.data),
outputData: job.returnvalue ? JSON.stringify(job.returnvalue) : null,
failedReason,
tag: extract(job.data, "metadata.tag", null),
children: jobChildren,
};
}

function mapJobState(jobState: JobState | "unknown"): JobDto["state"] {
if (jobState === "active" || jobState === "waiting-children") {
return "running";
}
if (jobState === "completed") {
return "completed";
}
if (jobState === "failed") {
return "failed";
}
return "waiting";
}
15 changes: 5 additions & 10 deletions packages/api/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
import type { JobState } from "bullmq";

export type JobDto = {
id: string;
name: string;
state: JobState | "unknown";
state: "waiting" | "running" | "failed" | "completed";
progress: number;
finishedOn: number | null;
processedOn: number | null;
createdOn: number;
processedOn: number | null;
finishedOn: number | null;
duration: number | null;
inputData: string;
outputData: string | null;
failedReason: string | null;
tag: string | null;
};

export type JobNodeDto = {
job: JobDto;
children: JobNodeDto[];
children: JobDto[];
};
2 changes: 1 addition & 1 deletion packages/artisan/src/consumer/workers/package.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export type PackageData = {
segmentSize: number;
};
metadata: {
tag: string;
tag?: string;
};
};

Expand Down
6 changes: 3 additions & 3 deletions packages/artisan/src/consumer/workers/transcode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ export type TranscodeData = {
params: {
assetId: string;
segmentSize: number;
package: boolean;
packageAfter: boolean;
};
metadata: {
tag: string;
tag?: string;
};
};

Expand Down Expand Up @@ -45,7 +45,7 @@ export default async function (job: Job<TranscodeData, TranscodeResult>) {
JSON.stringify(meta, null, 2),
);

if (params.package) {
if (params.packageAfter) {
await job.log("Will queue package job");
await addPackageJob({
assetId: params.assetId,
Expand Down
Loading

0 comments on commit 7526da6

Please sign in to comment.