Skip to content

Commit

Permalink
reduce fetch size, worker reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
radityaharya committed Feb 4, 2024
1 parent 8c7ad62 commit b1fb907
Show file tree
Hide file tree
Showing 19 changed files with 360 additions and 158 deletions.
Binary file modified bun.lockb
Binary file not shown.
7 changes: 3 additions & 4 deletions next.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { withSentryConfig } from "@sentry/nextjs";
/** @type {import("next").NextConfig} */
const config = {
experimental: {
instrumentationHook: true,
instrumentationHook: process.env.NO_WORKER ? false : true,
},
output: process.env.STANDALONE_OUTPUT ? "standalone" : undefined,
images: {
Expand Down Expand Up @@ -62,7 +62,6 @@ const sentryConfig = withSentryConfig(
},
);

const prodConfig = million.next(sentryConfig, { auto: { rsc: true } });
const devConfig = config;
const prodConfig = million.next(sentryConfig, { auto: { rsc: true }, mute: true });

export default process.env.NODE_ENV === "development" ? devConfig : prodConfig;
export default process.env.NODE_ENV === "development" ? config : prodConfig;
5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
"type": "module",
"scripts": {
"build": "next build",
"bun:build": "bun --bun next build",
"db:push": "dotenv drizzle-kit push:mysql",
"db:push:prod": "dotenv -e .env.prod.local drizzle-kit push:mysql",
"db:studio": "dotenv drizzle-kit studio",
"db:studio:prod": "dotenv -e .env.prod.local drizzle-kit studio",
"dev": "next dev",
"dev:turbo": "next dev --turbo",
"bun:dev": "bun --bun next dev",
"lint": "next lint",
"start": "next start",
"start:worker": "dotenv -e .env bun ./worker/worker.ts",
Expand Down
6 changes: 3 additions & 3 deletions src/app/api/workflow/workflowQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export const workflowQueue = new Queue("workflowQueue", {
attempts: 2,
backoff: {
type: "exponential",
delay: 5000,
delay: 2000,
},
},
});
Expand Down Expand Up @@ -113,7 +113,7 @@ export async function updateWorkflowRun(
returnValues?: any,
) {
try {
log.info("Updating workflow job", jobId);
log.info("Updating workflow run", jobId);
const job = await workflowQueue.getJob(jobId);
if (!job) {
throw new Error("Job not found");
Expand Down Expand Up @@ -156,7 +156,7 @@ export async function updateWorkflowRun(
.where(eq(workflowRuns.id, jobId));
return "updated";
} catch (err) {
log.error("Error updating job", err);
log.error("Error updating run", err);
throw err;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/app/utils/runWorkflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export async function runWorkflow(workflow: WorkflowResponse) {
};

toast.promise(pollRequest(jobId), {
loading: 'Running workflow...',
loading: "Running workflow...",
success: () => {
return "Workflow completed successfully";
},
Expand Down
2 changes: 1 addition & 1 deletion src/components/animatedBackground/gradients.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ export class Gradient {
this.isGradientLegendVisible &&
((this.isMetaKey = e.metaKey),
(this.isMouseDown = !0),
!1 === this.conf.playing && requestAnimationFrame(this.animate));
this.conf.playing === false && requestAnimationFrame(this.animate));
}),
e(this, "handleMouseUp", () => {
this.isMouseDown = !1;
Expand Down
7 changes: 3 additions & 4 deletions src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export const register = async () => {
const WORKER_ID =
hostname +
"-" +
`${process.env.WORKER_ID ?? "instrumentation-" + uuid()}`;
`${process.env.WORKER_ID ?? `instrumentation-${uuid()}`}`;
console.log("Registering worker");
console.log("Worker ID", WORKER_ID);
const { Worker } = await import("bullmq");
Expand All @@ -29,7 +29,6 @@ export const register = async () => {
if (!data) {
throw new Error("No data found in job");
}

const accessToken = await getAccessTokenFromUserId(
data.userId as string,
);
Expand All @@ -56,7 +55,7 @@ export const register = async () => {
removeOnFail: { count: 5000 },
},
);
} else {
console.log("Not registering worker");
return;
}
console.log("Not registering worker");
};
81 changes: 35 additions & 46 deletions src/lib/log.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// import fs from "fs";
// import path from "path";
enum LogLevel {
DEBUG,
INFO,
Expand All @@ -16,54 +14,47 @@ class Logger {
private logLevel: LogLevel = LogLevel.INFO,
) {}

private log(level: LogLevel, message: string, data?: any): void {
if (this.logLevel <= level) {
const currentTimestamp = Date.now();
const timestamp = new Date(currentTimestamp).toISOString().slice(11, -1);
console.log(
`[${LogLevel[level].slice(0, 3)}] ${timestamp} ${
this.name
}: ${message}`,
);
this.lastLogTimestamp = currentTimestamp;
if (data) {
console.log(data instanceof Object ? JSON.stringify(data) : data);
}
private getColor(level: LogLevel): string {
switch (level) {
case LogLevel.DEBUG:
return "\x1b[36m"; // Cyan
case LogLevel.INFO:
return "\x1b[32m"; // Green
case LogLevel.WARN:
return "\x1b[33m"; // Yellow
case LogLevel.ERROR:
return "\x1b[31m"; // Red
default:
return "\x1b[0m"; // No color
}
}

debug(message: string, data?: any, saveToFile?: boolean): void {
this.log(LogLevel.DEBUG, message, data);
// if (false) {
// const filePath = path.join(__dirname, "debug.json");
// const logEntry = { message, data, timestamp: new Date().toISOString() };

// let json: { message: string; data: any; timestamp: string }[] = [];
private log(level: LogLevel, message: string, data?: any): void {
if (this.logLevel > level) return;

// try {
// const fileData = fs.readFileSync(filePath, "utf8");
// if (fileData.trim() !== "") {
// json = JSON.parse(fileData);
// }
// } catch (e) {
// console.error("Error reading or parsing file", e);
// }
const currentTimestamp = Date.now();
const timestamp = new Date(currentTimestamp).toISOString().slice(11, -5);
const logLevelName = LogLevel[level].slice(0, 3);
const color = this.getColor(level);

// json.push(logEntry);
const logMessage = `${color}[${logLevelName}] ${timestamp} ${this.name}: ${message}\x1b[0m`;

// json.sort(
// (
// a: { timestamp: string | number | Date },
// b: { timestamp: string | number | Date }
// ) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()
// );
console.log(logMessage);
if (data) {
try {
if (data instanceof Error) {
data = { message: data.message, stack: data.stack };
}
console.log(JSON.stringify(data, null, 2));
} catch (error) {
console.log(data);
}
}
this.lastLogTimestamp = currentTimestamp;
}

// try {
// fs.writeFileSync(filePath, JSON.stringify(json, null, 2), "utf8");
// } catch (e) {
// console.error("Error writing file", e);
// }
// }
debug(message: string, data?: any): void {
this.log(LogLevel.DEBUG, message, data);
}

info(message: string, data?: any): void {
Expand All @@ -83,9 +74,7 @@ class Logger {
}

logTrackTitles(tracks: SpotifyApi.TrackObjectFull[]): void {
tracks.forEach((track) => {
this.info(`Track: ${track.name}`);
});
tracks.forEach((track) => this.info(`Track: ${track.name}`));
}
}

Expand Down
69 changes: 44 additions & 25 deletions src/lib/workflow/Base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ export interface AccessToken {
access_token: string;
}

const log = new Logger("Base");

export class Base {
public spClient: SpotifyWebApi;
public log: Logger = new Logger("Workflow");

constructor(
public accessToken: AccessToken,
Expand Down Expand Up @@ -36,29 +37,14 @@ export class Base {
return data.body;
}

// playlists

async getPlaylistTracks(playlistId: string) {
try {
const data = await this.spClient.getPlaylistTracks(playlistId);
return data.body.items;
} catch (err) {
console.error("Error getting playlist tracks", err);
throw new Error(
"Error getting playlist tracks " + (err as Error).message,
);
}
return [];
}
static async addTracksBatch(
spClient: SpotifyWebApi,
playlistId: string,
trackUris: string[],
) {
// A maximum of 100 items can be added in one request.
// handle this by chunking the array into batches of 100
try {
const chunkSize = 100;
const chunkSize = 50;
let retryAfter = 0;

function chunk<T>(array: T[], size: number): T[][] {
const result = [] as T[][];
Expand All @@ -70,8 +56,25 @@ export class Base {

const trackChunks = chunk(trackUris, chunkSize);

for (const chunk of trackChunks) {
await spClient.addTracksToPlaylist(playlistId, chunk);
for (const trackChunk of trackChunks) {
while (true) {
try {
log.debug(`Adding tracks to playlist ${playlistId}`);
await new Promise((resolve) =>
setTimeout(resolve, retryAfter * 1000),
);
await spClient.addTracksToPlaylist(playlistId, trackChunk);
break;
} catch (error: any) {
if (error.statusCode === 429) {
retryAfter = error.headers["retry-after"];
log.warn(`Rate limited. Retrying after ${retryAfter} seconds.`);
continue;
} else {
throw error;
}
}
}
}
} catch (err) {
console.error("Error adding tracks to playlist", err);
Expand All @@ -86,10 +89,9 @@ export class Base {
id: string,
trackUris: string[],
) {
// A maximum of 100 items can be added in one request.
// handle this by chunking the array into batches of 100
try {
const chunkSize = 100;
const chunkSize = 50;
let retryAfter = 0;

function chunk<T>(array: T[], size: number): T[][] {
const result = [] as T[][];
Expand All @@ -101,8 +103,25 @@ export class Base {

const trackChunks = chunk(trackUris, chunkSize);

for (const chunk of trackChunks) {
await spClient.replaceTracksInPlaylist(id, chunk);
for (const trackChunk of trackChunks) {
while (true) {
try {
log.debug(`Replacing tracks in playlist ${id}`);
await new Promise((resolve) =>
setTimeout(resolve, retryAfter * 1000),
);
await spClient.replaceTracksInPlaylist(id, trackChunk);
break;
} catch (error: any) {
if (error.statusCode === 429) {
retryAfter = error.headers["retry-after"];
log.warn(`Rate limited. Retrying after ${retryAfter} seconds.`);
continue;
} else {
throw error;
}
}
}
}
} catch (err) {
console.error("Error replacing tracks in playlist", err);
Expand Down
4 changes: 2 additions & 2 deletions src/lib/workflow/Combiner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export default class Combiner extends Base {
}

static push(sources: any[], params: {}) {
log.debug("Push Sources:", sources, true);
log.debug("Push Sources:", sources);
log.info("Pushing...");
const result = [] as SpotifyApi.PlaylistTrackObject[];
sources.forEach((source) => {
Expand All @@ -35,7 +35,7 @@ export default class Combiner extends Base {
}

static alternate(sources: any[], params: {}) {
log.debug("Alternate Sources:", sources, true);
log.debug("Alternate Sources:", sources);
log.info("Alternating...");
const result = [] as SpotifyApi.PlaylistTrackObject[];
let longestSourceLength = 0;
Expand Down
10 changes: 5 additions & 5 deletions src/lib/workflow/Filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export default class Filter extends Base {
params: { filterKey: string; filterValue: string },
) {
log.info("Filtering...");
log.debug("Filter Sources:", sources, true);
log.debug("Filter Sources:", sources);

let tracks = [] as any;

Expand Down Expand Up @@ -135,7 +135,7 @@ export default class Filter extends Base {

static dedupeTracks(sources: any[], params: {}) {
log.info("Deduping tracks...");
log.debug("DedupeTracks Sources:", sources, true);
log.debug("DedupeTracks Sources:", sources);

let tracks = [] as any;

Expand Down Expand Up @@ -171,7 +171,7 @@ export default class Filter extends Base {

static dedupeArtists(sources: any[], params: {}) {
log.info("Deduping artists...");
log.debug("DedupeArtists Sources:", sources, true);
log.debug("DedupeArtists Sources:", sources);
let tracks = [] as any;

if (
Expand Down Expand Up @@ -209,7 +209,7 @@ export default class Filter extends Base {
params: { matchKey: string; matchValue: string },
) {
log.info("Matching...");
log.debug("Match Sources:", sources, true);
log.debug("Match Sources:", sources);

let tracks = [] as any;

Expand Down Expand Up @@ -312,7 +312,7 @@ export default class Filter extends Base {

static limit(sources: any[], params: { limit?: number }) {
log.info("Limiting...");
log.debug("Limit Sources:", sources, true);
log.debug("Limit Sources:", sources);

let tracks = [] as any;

Expand Down
Loading

0 comments on commit b1fb907

Please sign in to comment.