Skip to content

Commit

Permalink
feat: Support for public and concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
matvp91 committed Jan 6, 2025
1 parent 453907e commit 9e868ac
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 9 deletions.
20 changes: 15 additions & 5 deletions packages/api/src/routes/jobs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { randomUUID } from "crypto";
import {
addToQueue,
DEFAULT_CONCURRENCY,
DEFAULT_PACKAGE_NAME,
DEFAULT_PUBLIC,
DEFAULT_SEGMENT_SIZE,
packageQueue,
pipelineQueue,
Expand Down Expand Up @@ -69,6 +71,8 @@ export const jobs = new Elysia()
assetId: randomUUID(),
segmentSize: DEFAULT_SEGMENT_SIZE,
name: DEFAULT_PACKAGE_NAME,
concurrency: DEFAULT_CONCURRENCY,
public: DEFAULT_PUBLIC,
...body,
};
const jobId = await addToQueue(pipelineQueue, data, {
Expand All @@ -82,15 +86,17 @@ export const jobs = new Elysia()
tags: ["Jobs"],
},
body: t.Object({
inputs: t.Array(InputSchema),
streams: t.Array(StreamSchema),
assetId: t.Optional(
t.String({
format: "uuid",
}),
),
inputs: t.Array(InputSchema),
streams: t.Array(StreamSchema),
group: t.Optional(t.String()),
language: t.Optional(t.String()),
concurrency: t.Optional(t.Number()),
public: t.Optional(t.Boolean()),
}),
response: {
200: t.Object({
Expand Down Expand Up @@ -118,14 +124,14 @@ export const jobs = new Elysia()
tags: ["Jobs"],
},
body: t.Object({
inputs: t.Array(InputSchema),
streams: t.Array(StreamSchema),
assetId: t.Optional(
t.String({
format: "uuid",
}),
),
segmentSize: t.Optional(t.Number()),
inputs: t.Array(InputSchema),
streams: t.Array(StreamSchema),
group: t.Optional(t.String()),
}),
response: {
Expand All @@ -140,6 +146,8 @@ export const jobs = new Elysia()
async ({ body }) => {
const data = {
name: DEFAULT_PACKAGE_NAME,
concurrency: DEFAULT_CONCURRENCY,
public: DEFAULT_PUBLIC,
...body,
};
const jobId = await addToQueue(packageQueue, data, {
Expand All @@ -156,9 +164,11 @@ export const jobs = new Elysia()
assetId: t.String({
format: "uuid",
}),
name: t.Optional(t.String()),
segmentSize: t.Optional(t.Number()),
name: t.Optional(t.String()),
language: t.Optional(t.String()),
concurrency: t.Optional(t.Number()),
public: t.Optional(t.Boolean()),
}),
response: {
200: t.Object({
Expand Down
4 changes: 4 additions & 0 deletions packages/artisan/src/lib/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ export async function syncToS3(
remotePath: string,
options?: {
del?: boolean;
public?: boolean;
concurrency?: number;
},
) {
const commandInput: CommandInput<PutObjectCommandInput> = (input) => {
Expand All @@ -43,12 +45,14 @@ export async function syncToS3(
}
return {
ContentType: contentType,
ACL: options?.public ? "public-read" : "private",
};
};

await sync(localPath, `s3://${env.S3_BUCKET}/${remotePath}`, {
del: options?.del,
commandInput,
maxConcurrentTransfers: options?.concurrency,
});
}

Expand Down
1 change: 1 addition & 0 deletions packages/artisan/src/workers/package.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ async function handleStepInitial(job: Job<PackageData>, dir: WorkerDir) {

await syncToS3(outDir, s3Dir, {
del: true,
concurrency: job.data.concurrency,
});
}

Expand Down
2 changes: 2 additions & 0 deletions packages/artisan/src/workers/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ async function handleStepContinue(job: Job<PipelineData>, token?: string) {
{
assetId: job.data.assetId,
name: job.data.name,
concurrency: job.data.concurrency,
public: job.data.public,
language: job.data.language,
},
{
Expand Down
4 changes: 4 additions & 0 deletions packages/bolt/src/add-to-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ export const DEFAULT_SEGMENT_SIZE = 2.24;

export const DEFAULT_PACKAGE_NAME = "hls";

export const DEFAULT_CONCURRENCY = 5;

export const DEFAULT_PUBLIC = true;

export async function addToQueue<Q extends Queue>(
queue: Q,
data: Q extends Queue<infer D> ? D : never,
Expand Down
15 changes: 11 additions & 4 deletions packages/bolt/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@ import { connection } from "./env";
import type { Input, PartialInput, PartialStream, Stream } from "./types";

export interface PipelineData {
// Shared
assetId: string;
segmentSize: number;
// Transcode
inputs: PartialInput[];
streams: PartialStream[];
segmentSize: number;
group?: string;
language?: string;
// Package
name: string;
concurrency: number;
public: boolean;
language?: string;
}

export const pipelineQueue = new Queue<PipelineData>("pipeline", {
Expand All @@ -18,9 +23,9 @@ export const pipelineQueue = new Queue<PipelineData>("pipeline", {

export interface TranscodeData {
assetId: string;
segmentSize: number;
inputs: PartialInput[];
streams: PartialStream[];
segmentSize: number;
group?: string;
}

Expand All @@ -30,9 +35,11 @@ export const transcodeQueue = new Queue<TranscodeData>("transcode", {

export interface PackageData {
assetId: string;
language?: string;
segmentSize?: number;
name: string;
concurrency: number;
public: boolean;
language?: string;
}

export const packageQueue = new Queue<PackageData>("package", {
Expand Down

0 comments on commit 9e868ac

Please sign in to comment.