From 2e559137aeb8cb600ba0dac14da40a0a642fbcca Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Mon, 10 Jun 2024 13:00:34 -0400 Subject: [PATCH] =?UTF-8?q?=E2=84=B9=EF=B8=8F=20refactor:=20Remove=20use?= =?UTF-8?q?=20of=20Agenda=20for=20Conversation=20Imports=20(#3024)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: remove agenda and npm audit fix * refactor: import conversations without agenda * chore: update package-lock.json and data-provider version to 0.6.7 * fix: import conversations * chore: client npm audit fix --- api/package.json | 1 - api/server/routes/convos.js | 30 +-- ...obDefinition.js => importConversations.js} | 17 +- api/server/utils/import/index.js | 2 + api/server/utils/jobScheduler.js | 99 ---------- .../SettingsTabs/Data/ImportConversations.tsx | 2 +- client/src/data-provider/mutations.ts | 73 +------ package-lock.json | 179 ++++-------------- packages/data-provider/package.json | 2 +- packages/data-provider/src/api-endpoints.ts | 3 - packages/data-provider/src/data-service.ts | 14 +- packages/data-provider/src/types.ts | 32 +--- 12 files changed, 52 insertions(+), 402 deletions(-) rename api/server/utils/import/{jobDefinition.js => importConversations.js} (63%) delete mode 100644 api/server/utils/jobScheduler.js diff --git a/api/package.json b/api/package.json index ca1a62daf8c..31b08312738 100644 --- a/api/package.json +++ b/api/package.json @@ -41,7 +41,6 @@ "@langchain/community": "^0.0.46", "@langchain/google-genai": "^0.0.11", "@langchain/google-vertexai": "^0.0.17", - "agenda": "^5.0.0", "axios": "^1.3.4", "bcryptjs": "^2.4.3", "cheerio": "^1.0.0-rc.12", diff --git a/api/server/routes/convos.js b/api/server/routes/convos.js index 275f5a9755e..b22d159827d 100644 --- a/api/server/routes/convos.js +++ b/api/server/routes/convos.js @@ -3,12 +3,11 @@ const express = require('express'); const { CacheKeys } = require('librechat-data-provider'); const { initializeClient } = require('~/server/services/Endpoints/assistants'); const { getConvosByPage, deleteConvos, getConvo, saveConvo } = require('~/models/Conversation'); -const { IMPORT_CONVERSATION_JOB_NAME } = require('~/server/utils/import/jobDefinition'); const { storage, importFileFilter } = require('~/server/routes/files/multer'); const requireJwtAuth = require('~/server/middleware/requireJwtAuth'); const { forkConversation } = require('~/server/utils/import/fork'); +const { importConversations } = require('~/server/utils/import'); const { createImportLimiters } = require('~/server/middleware'); -const jobScheduler = require('~/server/utils/jobScheduler'); const getLogStores = require('~/cache/getLogStores'); const { sleep } = require('~/server/utils'); const { logger } = require('~/config'); @@ -129,10 +128,9 @@ router.post( upload.single('file'), async (req, res) => { try { - const filepath = req.file.path; - const job = await jobScheduler.now(IMPORT_CONVERSATION_JOB_NAME, filepath, req.user.id); - - res.status(201).json({ message: 'Import started', jobId: job.id }); + /* TODO: optimize to return imported conversations and add manually */ + await importConversations({ filepath: req.file.path, requestUserId: req.user.id }); + res.status(201).json({ message: 'Conversation(s) imported successfully' }); } catch (error) { logger.error('Error processing file', error); res.status(500).send('Error processing file'); @@ -169,24 +167,4 @@ router.post('/fork', async (req, res) => { } }); -// Get the status of an import job for polling -router.get('/import/jobs/:jobId', async (req, res) => { - try { - const { jobId } = req.params; - const { userId, ...jobStatus } = await jobScheduler.getJobStatus(jobId); - if (!jobStatus) { - return res.status(404).json({ message: 'Job not found.' }); - } - - if (userId !== req.user.id) { - return res.status(403).json({ message: 'Unauthorized' }); - } - - res.json(jobStatus); - } catch (error) { - logger.error('Error getting job details', error); - res.status(500).send('Error getting job details'); - } -}); - module.exports = router; diff --git a/api/server/utils/import/jobDefinition.js b/api/server/utils/import/importConversations.js similarity index 63% rename from api/server/utils/import/jobDefinition.js rename to api/server/utils/import/importConversations.js index 7b5d217229f..eb578c3bb4b 100644 --- a/api/server/utils/import/jobDefinition.js +++ b/api/server/utils/import/importConversations.js @@ -1,18 +1,14 @@ const fs = require('fs').promises; -const jobScheduler = require('~/server/utils/jobScheduler'); const { getImporter } = require('./importers'); const { indexSync } = require('~/lib/db'); const { logger } = require('~/config'); -const IMPORT_CONVERSATION_JOB_NAME = 'import conversation'; - /** * Job definition for importing a conversation. - * @param {import('agenda').Job} job - The job object. - * @param {Function} done - The done function. + * @param {{ filepath, requestUserId }} job - The job object. */ -const importConversationJob = async (job, done) => { - const { filepath, requestUserId } = job.attrs.data; +const importConversations = async (job) => { + const { filepath, requestUserId } = job; try { logger.debug(`user: ${requestUserId} | Importing conversation(s) from file...`); const fileData = await fs.readFile(filepath, 'utf8'); @@ -22,10 +18,8 @@ const importConversationJob = async (job, done) => { // Sync Meilisearch index await indexSync(); logger.debug(`user: ${requestUserId} | Finished importing conversations`); - done(); } catch (error) { logger.error(`user: ${requestUserId} | Failed to import conversation: `, error); - done(error); } finally { try { await fs.unlink(filepath); @@ -35,7 +29,4 @@ const importConversationJob = async (job, done) => { } }; -// Call the jobScheduler.define function at startup -jobScheduler.define(IMPORT_CONVERSATION_JOB_NAME, importConversationJob); - -module.exports = { IMPORT_CONVERSATION_JOB_NAME }; +module.exports = importConversations; diff --git a/api/server/utils/import/index.js b/api/server/utils/import/index.js index 0afa7bfa800..f1bca86af00 100644 --- a/api/server/utils/import/index.js +++ b/api/server/utils/import/index.js @@ -1,5 +1,7 @@ const importers = require('./importers'); +const importConversations = require('./importConversations'); module.exports = { ...importers, + importConversations, }; diff --git a/api/server/utils/jobScheduler.js b/api/server/utils/jobScheduler.js deleted file mode 100644 index d297b3bbd87..00000000000 --- a/api/server/utils/jobScheduler.js +++ /dev/null @@ -1,99 +0,0 @@ -const Agenda = require('agenda'); -const { logger } = require('~/config'); -const mongodb = require('mongodb'); - -/** - * Class for scheduling and running jobs. - * The workflow is as follows: start the job scheduler, define a job, and then schedule the job using defined job name. - */ -class JobScheduler { - constructor() { - this.agenda = new Agenda({ db: { address: process.env.MONGO_URI } }); - } - - /** - * Starts the job scheduler. - */ - async start() { - try { - logger.info('Starting Agenda...'); - await this.agenda.start(); - logger.info('Agenda successfully started and connected to MongoDB.'); - } catch (error) { - logger.error('Failed to start Agenda:', error); - } - } - - /** - * Schedules a job to start immediately. - * @param {string} jobName - The name of the job to schedule. - * @param {string} filepath - The filepath to pass to the job. - * @param {string} userId - The ID of the user requesting the job. - * @returns {Promise<{ id: string }>} - A promise that resolves with the ID of the scheduled job. - * @throws {Error} - If the job fails to schedule. - */ - async now(jobName, filepath, userId) { - try { - const job = await this.agenda.now(jobName, { filepath, requestUserId: userId }); - logger.debug(`Job '${job.attrs.name}' scheduled successfully.`); - return { id: job.attrs._id.toString() }; - } catch (error) { - throw new Error(`Failed to schedule job '${jobName}': ${error}`); - } - } - - /** - * Gets the status of a job. - * @param {string} jobId - The ID of the job to get the status of. - * @returns {Promise<{ id: string, userId: string, name: string, failReason: string, status: string } | null>} - A promise that resolves with the job status or null if the job is not found. - * @throws {Error} - If multiple jobs are found. - */ - async getJobStatus(jobId) { - const job = await this.agenda.jobs({ _id: new mongodb.ObjectId(jobId) }); - if (!job || job.length === 0) { - return null; - } - - if (job.length > 1) { - // This should never happen - throw new Error('Multiple jobs found.'); - } - - const jobDetails = { - id: job[0]._id, - userId: job[0].attrs.data.requestUserId, - name: job[0].attrs.name, - failReason: job[0].attrs.failReason, - status: !job[0].attrs.lastRunAt - ? 'scheduled' - : job[0].attrs.failedAt - ? 'failed' - : job[0].attrs.lastFinishedAt - ? 'completed' - : 'running', - }; - - return jobDetails; - } - - /** - * Defines a new job. - * @param {string} name - The name of the job. - * @param {Function} jobFunction - The function to run when the job is executed. - */ - define(name, jobFunction) { - this.agenda.define(name, async (job, done) => { - try { - await jobFunction(job, done); - } catch (error) { - logger.error(`Failed to run job '${name}': ${error}`); - done(error); - } - }); - } -} - -const jobScheduler = new JobScheduler(); -jobScheduler.start(); - -module.exports = jobScheduler; diff --git a/client/src/components/Nav/SettingsTabs/Data/ImportConversations.tsx b/client/src/components/Nav/SettingsTabs/Data/ImportConversations.tsx index e0149c26692..caa72fbb2b0 100644 --- a/client/src/components/Nav/SettingsTabs/Data/ImportConversations.tsx +++ b/client/src/components/Nav/SettingsTabs/Data/ImportConversations.tsx @@ -71,7 +71,7 @@ function ImportConversations() { {localize('com_ui_import_conversation_info')}