Skip to content

Commit

Permalink
ℹ️ refactor: Remove use of Agenda for Conversation Imports (#3024)
Browse files Browse the repository at this point in the history
* chore: remove agenda and npm audit fix

* refactor: import conversations without agenda

* chore: update package-lock.json and data-provider version to 0.6.7

* fix: import conversations

* chore: client npm audit fix
  • Loading branch information
danny-avila authored Jun 10, 2024
1 parent 92232af commit 2e55913
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 402 deletions.
1 change: 0 additions & 1 deletion api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
"@langchain/community": "^0.0.46",
"@langchain/google-genai": "^0.0.11",
"@langchain/google-vertexai": "^0.0.17",
"agenda": "^5.0.0",
"axios": "^1.3.4",
"bcryptjs": "^2.4.3",
"cheerio": "^1.0.0-rc.12",
Expand Down
30 changes: 4 additions & 26 deletions api/server/routes/convos.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ const express = require('express');
const { CacheKeys } = require('librechat-data-provider');
const { initializeClient } = require('~/server/services/Endpoints/assistants');
const { getConvosByPage, deleteConvos, getConvo, saveConvo } = require('~/models/Conversation');
const { IMPORT_CONVERSATION_JOB_NAME } = require('~/server/utils/import/jobDefinition');
const { storage, importFileFilter } = require('~/server/routes/files/multer');
const requireJwtAuth = require('~/server/middleware/requireJwtAuth');
const { forkConversation } = require('~/server/utils/import/fork');
const { importConversations } = require('~/server/utils/import');
const { createImportLimiters } = require('~/server/middleware');
const jobScheduler = require('~/server/utils/jobScheduler');
const getLogStores = require('~/cache/getLogStores');
const { sleep } = require('~/server/utils');
const { logger } = require('~/config');
Expand Down Expand Up @@ -129,10 +128,9 @@ router.post(
upload.single('file'),
async (req, res) => {
try {
const filepath = req.file.path;
const job = await jobScheduler.now(IMPORT_CONVERSATION_JOB_NAME, filepath, req.user.id);

res.status(201).json({ message: 'Import started', jobId: job.id });
/* TODO: optimize to return imported conversations and add manually */
await importConversations({ filepath: req.file.path, requestUserId: req.user.id });
res.status(201).json({ message: 'Conversation(s) imported successfully' });
} catch (error) {
logger.error('Error processing file', error);
res.status(500).send('Error processing file');
Expand Down Expand Up @@ -169,24 +167,4 @@ router.post('/fork', async (req, res) => {
}
});

// Get the status of an import job for polling
router.get('/import/jobs/:jobId', async (req, res) => {
try {
const { jobId } = req.params;
const { userId, ...jobStatus } = await jobScheduler.getJobStatus(jobId);
if (!jobStatus) {
return res.status(404).json({ message: 'Job not found.' });
}

if (userId !== req.user.id) {
return res.status(403).json({ message: 'Unauthorized' });
}

res.json(jobStatus);
} catch (error) {
logger.error('Error getting job details', error);
res.status(500).send('Error getting job details');
}
});

module.exports = router;
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
const fs = require('fs').promises;
const jobScheduler = require('~/server/utils/jobScheduler');
const { getImporter } = require('./importers');
const { indexSync } = require('~/lib/db');
const { logger } = require('~/config');

const IMPORT_CONVERSATION_JOB_NAME = 'import conversation';

/**
* Job definition for importing a conversation.
* @param {import('agenda').Job} job - The job object.
* @param {Function} done - The done function.
* @param {{ filepath, requestUserId }} job - The job object.
*/
const importConversationJob = async (job, done) => {
const { filepath, requestUserId } = job.attrs.data;
const importConversations = async (job) => {
const { filepath, requestUserId } = job;
try {
logger.debug(`user: ${requestUserId} | Importing conversation(s) from file...`);
const fileData = await fs.readFile(filepath, 'utf8');
Expand All @@ -22,10 +18,8 @@ const importConversationJob = async (job, done) => {
// Sync Meilisearch index
await indexSync();
logger.debug(`user: ${requestUserId} | Finished importing conversations`);
done();
} catch (error) {
logger.error(`user: ${requestUserId} | Failed to import conversation: `, error);
done(error);
} finally {
try {
await fs.unlink(filepath);
Expand All @@ -35,7 +29,4 @@ const importConversationJob = async (job, done) => {
}
};

// Call the jobScheduler.define function at startup
jobScheduler.define(IMPORT_CONVERSATION_JOB_NAME, importConversationJob);

module.exports = { IMPORT_CONVERSATION_JOB_NAME };
module.exports = importConversations;
2 changes: 2 additions & 0 deletions api/server/utils/import/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
const importers = require('./importers');
const importConversations = require('./importConversations');

module.exports = {
...importers,
importConversations,
};
99 changes: 0 additions & 99 deletions api/server/utils/jobScheduler.js

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ function ImportConversations() {
<span>{localize('com_ui_import_conversation_info')}</span>
<label
htmlFor={'import-conversations-file'}
className="flex h-auto cursor-pointer items-center rounded bg-transparent px-2 py-3 text-xs font-medium font-normal transition-colors hover:bg-gray-100 hover:text-green-700 dark:bg-transparent dark:text-white dark:hover:bg-gray-600 dark:hover:text-green-500"
className="flex h-auto cursor-pointer items-center rounded bg-transparent px-2 py-3 text-xs font-medium transition-colors hover:bg-gray-100 hover:text-green-700 dark:bg-transparent dark:text-white dark:hover:bg-gray-600 dark:hover:text-green-500"
>
{allowImport ? (
<Import className="mr-1 flex h-4 w-4 items-center stroke-1" />
Expand Down
73 changes: 5 additions & 68 deletions client/src/data-provider/mutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,81 +344,18 @@ export const useForkConvoMutation = (
};

export const useUploadConversationsMutation = (
_options?: t.MutationOptions<t.TImportJobStatus, FormData>,
_options?: t.MutationOptions<t.TImportResponse, FormData>,
) => {
const queryClient = useQueryClient();
const { onSuccess, onError, onMutate } = _options || {};

// returns the job status or reason of failure
const checkJobStatus = async (jobId) => {
try {
const response = await dataService.queryImportConversationJobStatus(jobId);
return response;
} catch (error) {
throw new Error('Failed to check job status');
}
};

// Polls the job status until it is completed, failed, or timed out
const pollJobStatus = (jobId, onSuccess, onError) => {
let timeElapsed = 0;
const timeout = 60000; // Timeout after a minute
const pollInterval = 500; // Poll every 500ms
const intervalId = setInterval(async () => {
try {
const statusResponse = await checkJobStatus(jobId);
console.log('Polling job status', statusResponse);
if (statusResponse.status === 'completed' || statusResponse.status === 'failed') {
clearInterval(intervalId);
if (statusResponse.status === 'completed') {
onSuccess && onSuccess(statusResponse);
} else {
onError &&
onError(
new Error(
statusResponse.failReason
? statusResponse.failReason
: 'Failed to import conversations',
),
);
}
}
timeElapsed += pollInterval; // Increment time elapsed by polling interval
if (timeElapsed >= timeout) {
clearInterval(intervalId);
onError && onError(new Error('Polling timed out'));
}
} catch (error) {
clearInterval(intervalId);
onError && onError(error);
}
}, pollInterval);
};

return useMutation<t.TImportStartResponse, unknown, FormData>({
return useMutation<t.TImportResponse, unknown, FormData>({
mutationFn: (formData: FormData) => dataService.importConversationsFile(formData),
onSuccess: (data, variables, context) => {
/* TODO: optimize to return imported conversations and add manually */
queryClient.invalidateQueries([QueryKeys.allConversations]);
// Assuming the job ID is in the response data
const jobId = data.jobId;
if (jobId) {
// Start polling for job status
pollJobStatus(
jobId,
(statusResponse) => {
// This is the final success callback when the job is completed
queryClient.invalidateQueries([QueryKeys.allConversations]); // Optionally refresh conversations query
if (onSuccess) {
onSuccess(statusResponse, variables, context);
}
},
(error) => {
// This is the error callback for job failure or polling errors
if (onError) {
onError(error, variables, context);
}
},
);
if (onSuccess) {
onSuccess(data, variables, context);
}
},
onError: (err, variables, context) => {
Expand Down
Loading

0 comments on commit 2e55913

Please sign in to comment.