diff --git a/packages/api/src/routes/jobs.ts b/packages/api/src/routes/jobs.ts index 6cb43a61..b8170b64 100644 --- a/packages/api/src/routes/jobs.ts +++ b/packages/api/src/routes/jobs.ts @@ -1,7 +1,9 @@ import { randomUUID } from "crypto"; import { addToQueue, + DEFAULT_CONCURRENCY, DEFAULT_PACKAGE_NAME, + DEFAULT_PUBLIC, DEFAULT_SEGMENT_SIZE, packageQueue, pipelineQueue, @@ -69,6 +71,8 @@ export const jobs = new Elysia() assetId: randomUUID(), segmentSize: DEFAULT_SEGMENT_SIZE, name: DEFAULT_PACKAGE_NAME, + concurrency: DEFAULT_CONCURRENCY, + public: DEFAULT_PUBLIC, ...body, }; const jobId = await addToQueue(pipelineQueue, data, { @@ -82,15 +86,17 @@ export const jobs = new Elysia() tags: ["Jobs"], }, body: t.Object({ - inputs: t.Array(InputSchema), - streams: t.Array(StreamSchema), assetId: t.Optional( t.String({ format: "uuid", }), ), + inputs: t.Array(InputSchema), + streams: t.Array(StreamSchema), group: t.Optional(t.String()), language: t.Optional(t.String()), + concurrency: t.Optional(t.Number()), + public: t.Optional(t.Boolean()), }), response: { 200: t.Object({ @@ -118,14 +124,14 @@ export const jobs = new Elysia() tags: ["Jobs"], }, body: t.Object({ - inputs: t.Array(InputSchema), - streams: t.Array(StreamSchema), assetId: t.Optional( t.String({ format: "uuid", }), ), segmentSize: t.Optional(t.Number()), + inputs: t.Array(InputSchema), + streams: t.Array(StreamSchema), group: t.Optional(t.String()), }), response: { @@ -140,6 +146,8 @@ export const jobs = new Elysia() async ({ body }) => { const data = { name: DEFAULT_PACKAGE_NAME, + concurrency: DEFAULT_CONCURRENCY, + public: DEFAULT_PUBLIC, ...body, }; const jobId = await addToQueue(packageQueue, data, { @@ -156,9 +164,11 @@ export const jobs = new Elysia() assetId: t.String({ format: "uuid", }), - name: t.Optional(t.String()), segmentSize: t.Optional(t.Number()), + name: t.Optional(t.String()), language: t.Optional(t.String()), + concurrency: t.Optional(t.Number()), + public: t.Optional(t.Boolean()), }), response: { 200: t.Object({ diff --git a/packages/artisan/src/lib/s3.ts b/packages/artisan/src/lib/s3.ts index 4ee96413..16aac6f9 100644 --- a/packages/artisan/src/lib/s3.ts +++ b/packages/artisan/src/lib/s3.ts @@ -34,6 +34,8 @@ export async function syncToS3( remotePath: string, options?: { del?: boolean; + public?: boolean; + concurrency?: number; }, ) { const commandInput: CommandInput = (input) => { @@ -43,12 +45,14 @@ export async function syncToS3( } return { ContentType: contentType, + ACL: options?.public ? "public-read" : "private", }; }; await sync(localPath, `s3://${env.S3_BUCKET}/${remotePath}`, { del: options?.del, commandInput, + maxConcurrentTransfers: options?.concurrency, }); } diff --git a/packages/artisan/src/workers/package.ts b/packages/artisan/src/workers/package.ts index 559fe001..252e46dd 100644 --- a/packages/artisan/src/workers/package.ts +++ b/packages/artisan/src/workers/package.ts @@ -148,6 +148,7 @@ async function handleStepInitial(job: Job, dir: WorkerDir) { await syncToS3(outDir, s3Dir, { del: true, + concurrency: job.data.concurrency, }); } diff --git a/packages/artisan/src/workers/pipeline.ts b/packages/artisan/src/workers/pipeline.ts index db7dd565..ef6019c9 100644 --- a/packages/artisan/src/workers/pipeline.ts +++ b/packages/artisan/src/workers/pipeline.ts @@ -82,6 +82,8 @@ async function handleStepContinue(job: Job, token?: string) { { assetId: job.data.assetId, name: job.data.name, + concurrency: job.data.concurrency, + public: job.data.public, language: job.data.language, }, { diff --git a/packages/bolt/src/add-to-queue.ts b/packages/bolt/src/add-to-queue.ts index 04f43d20..74bc33eb 100644 --- a/packages/bolt/src/add-to-queue.ts +++ b/packages/bolt/src/add-to-queue.ts @@ -5,6 +5,10 @@ export const DEFAULT_SEGMENT_SIZE = 2.24; export const DEFAULT_PACKAGE_NAME = "hls"; +export const DEFAULT_CONCURRENCY = 5; + +export const DEFAULT_PUBLIC = true; + export async function addToQueue( queue: Q, data: Q extends Queue ? D : never, diff --git a/packages/bolt/src/queue.ts b/packages/bolt/src/queue.ts index cc9a35ae..87c32b8e 100644 --- a/packages/bolt/src/queue.ts +++ b/packages/bolt/src/queue.ts @@ -3,13 +3,18 @@ import { connection } from "./env"; import type { Input, PartialInput, PartialStream, Stream } from "./types"; export interface PipelineData { + // Shared assetId: string; + segmentSize: number; + // Transcode inputs: PartialInput[]; streams: PartialStream[]; - segmentSize: number; group?: string; - language?: string; + // Package name: string; + concurrency: number; + public: boolean; + language?: string; } export const pipelineQueue = new Queue("pipeline", { @@ -18,9 +23,9 @@ export const pipelineQueue = new Queue("pipeline", { export interface TranscodeData { assetId: string; + segmentSize: number; inputs: PartialInput[]; streams: PartialStream[]; - segmentSize: number; group?: string; } @@ -30,9 +35,11 @@ export const transcodeQueue = new Queue("transcode", { export interface PackageData { assetId: string; - language?: string; segmentSize?: number; name: string; + concurrency: number; + public: boolean; + language?: string; } export const packageQueue = new Queue("package", {