Skip to content

Commit

Permalink
📜 fix: Better OpenAI Assistants Annotation Processing (#3565)
Browse files Browse the repository at this point in the history
* fix: correctly handle openai assistants annotations

* fix: Handle adjacent identical citations only for our specific format

* fix: correctly handle adjacent identical citations

* refactor: make regex handling more robust

* fix: skip annotation condition, make regex handling more robust

* refactor: Handle FILE_PATH and FILE_CITATION annotation types in processMessages.spec.js

* ci: unit tests for real file path type annotations
  • Loading branch information
danny-avila authored Aug 6, 2024
1 parent c2a79ae commit 270c6d2
Show file tree
Hide file tree
Showing 2 changed files with 1,098 additions and 124 deletions.
239 changes: 115 additions & 124 deletions api/server/services/Threads/manage.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const { recordMessage, getMessages } = require('~/models/Message');
const { saveConvo } = require('~/models/Conversation');
const spendTokens = require('~/models/spendTokens');
const { countTokens } = require('~/server/utils');
const { logger } = require('~/config');

/**
* Initializes a new thread or adds messages to an existing thread.
Expand Down Expand Up @@ -516,80 +515,34 @@ const recordUsage = async ({
);
};

/**
* Creates a replaceAnnotation function with internal state for tracking the index offset.
*
* @returns {function} The replaceAnnotation function with closure for index offset.
*/
function createReplaceAnnotation() {
let indexOffset = 0;

/**
* Safely replaces the annotated text within the specified range denoted by start_index and end_index,
* after verifying that the text within that range matches the given annotation text.
* Proceeds with the replacement even if a mismatch is found, but logs a warning.
*
* @param {object} params The original text content.
* @param {string} params.currentText The current text content, with/without replacements.
* @param {number} params.start_index The starting index where replacement should begin.
* @param {number} params.end_index The ending index where replacement should end.
* @param {string} params.expectedText The text expected to be found in the specified range.
* @param {string} params.replacementText The text to insert in place of the existing content.
* @returns {string} The text with the replacement applied, regardless of text match.
*/
function replaceAnnotation({
currentText,
start_index,
end_index,
expectedText,
replacementText,
}) {
const adjustedStartIndex = start_index + indexOffset;
const adjustedEndIndex = end_index + indexOffset;

if (
adjustedStartIndex < 0 ||
adjustedEndIndex > currentText.length ||
adjustedStartIndex > adjustedEndIndex
) {
logger.warn(`Invalid range specified for annotation replacement.
Attempting replacement with \`replace\` method instead...
length: ${currentText.length}
start_index: ${adjustedStartIndex}
end_index: ${adjustedEndIndex}`);
return currentText.replace(expectedText, replacementText);
}

if (currentText.substring(adjustedStartIndex, adjustedEndIndex) !== expectedText) {
return currentText.replace(expectedText, replacementText);
}
const uniqueCitationStart = '^====||===';
const uniqueCitationEnd = '==|||||^';

indexOffset += replacementText.length - (adjustedEndIndex - adjustedStartIndex);
return (
currentText.slice(0, adjustedStartIndex) +
replacementText +
currentText.slice(adjustedEndIndex)
);
}

return replaceAnnotation;
/** Helper function to escape special characters in regex
* @param {string} string - The string to escape.
* @returns {string} The escaped string.
*/
function escapeRegExp(string) {
return string.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
}

/**
* Sorts, processes, and flattens messages to a single string.
*
* @param {object} params - The OpenAI client instance.
* @param {object} params - The parameters for processing messages.
* @param {OpenAIClient} params.openai - The OpenAI client instance.
* @param {RunClient} params.client - The LibreChat client that manages the run: either refers to `OpenAI` or `StreamRunManager`.
* @param {ThreadMessage[]} params.messages - An array of messages.
* @returns {Promise<{messages: ThreadMessage[], text: string}>} The sorted messages and the flattened text.
* @returns {Promise<{messages: ThreadMessage[], text: string, edited: boolean}>} The sorted messages, the flattened text, and whether it was edited.
*/
async function processMessages({ openai, client, messages = [] }) {
const sorted = messages.sort((a, b) => a.created_at - b.created_at);

let text = '';
let edited = false;
const sources = [];
const sources = new Map();
const fileRetrievalPromises = [];

for (const message of sorted) {
message.files = [];
for (const content of message.content) {
Expand All @@ -598,15 +551,21 @@ async function processMessages({ openai, client, messages = [] }) {
const currentFileId = contentType?.file_id;

if (type === ContentTypes.IMAGE_FILE && !client.processedFileIds.has(currentFileId)) {
const file = await retrieveAndProcessFile({
openai,
client,
file_id: currentFileId,
basename: `${currentFileId}.png`,
});

client.processedFileIds.add(currentFileId);
message.files.push(file);
fileRetrievalPromises.push(
retrieveAndProcessFile({
openai,
client,
file_id: currentFileId,
basename: `${currentFileId}.png`,
})
.then((file) => {
client.processedFileIds.add(currentFileId);
message.files.push(file);
})
.catch((error) => {
console.error(`Failed to retrieve file: ${error.message}`);
}),
);
continue;
}

Expand All @@ -615,78 +574,110 @@ async function processMessages({ openai, client, messages = [] }) {
/** @type {{ annotations: Annotation[] }} */
const { annotations } = contentType ?? {};

// Process annotations if they exist
if (!annotations?.length) {
text += currentText + ' ';
text += currentText;
continue;
}

const originalText = currentText;
text += originalText;

const replaceAnnotation = createReplaceAnnotation();

logger.debug('[processMessages] Processing annotations:', annotations);
for (const annotation of annotations) {
let file;
const replacements = [];
const annotationPromises = annotations.map(async (annotation) => {
const type = annotation.type;
const annotationType = annotation[type];
const file_id = annotationType?.file_id;
const alreadyProcessed = client.processedFileIds.has(file_id);

const replaceCurrentAnnotation = (replacementText = '') => {
const { start_index, end_index, text: expectedText } = annotation;
currentText = replaceAnnotation({
originalText,
currentText,
start_index,
end_index,
expectedText,
replacementText,
});
edited = true;
};

if (alreadyProcessed) {
const { file_id } = annotationType || {};
file = await retrieveAndProcessFile({ openai, client, file_id, unknownType: true });
} else if (type === AnnotationTypes.FILE_PATH) {
const basename = path.basename(annotation.text);
file = await retrieveAndProcessFile({
openai,
client,
file_id,
basename,
});
replaceCurrentAnnotation(file.filepath);
} else if (type === AnnotationTypes.FILE_CITATION) {
file = await retrieveAndProcessFile({
openai,
client,
file_id,
unknownType: true,
});
sources.push(file.filename);
replaceCurrentAnnotation(`^${sources.length}^`);
let file;
let replacementText = '';

try {
if (alreadyProcessed) {
file = await retrieveAndProcessFile({ openai, client, file_id, unknownType: true });
} else if (type === AnnotationTypes.FILE_PATH) {
const basename = path.basename(annotation.text);
file = await retrieveAndProcessFile({
openai,
client,
file_id,
basename,
});
replacementText = file.filepath;
} else if (type === AnnotationTypes.FILE_CITATION && file_id) {
file = await retrieveAndProcessFile({
openai,
client,
file_id,
unknownType: true,
});
if (file && file.filename) {
if (!sources.has(file.filename)) {
sources.set(file.filename, sources.size + 1);
}
replacementText = `${uniqueCitationStart}${sources.get(
file.filename,
)}${uniqueCitationEnd}`;
}
}

if (file && replacementText) {
replacements.push({
start: annotation.start_index,
end: annotation.end_index,
text: replacementText,
});
edited = true;
if (!alreadyProcessed) {
client.processedFileIds.add(file_id);
message.files.push(file);
}
}
} catch (error) {
console.error(`Failed to process annotation: ${error.message}`);
}
});

text = currentText;
await Promise.all(annotationPromises);

if (!file) {
continue;
}

client.processedFileIds.add(file_id);
message.files.push(file);
// Apply replacements in reverse order
replacements.sort((a, b) => b.start - a.start);
for (const { start, end, text: replacementText } of replacements) {
currentText = currentText.slice(0, start) + replacementText + currentText.slice(end);
}

text += currentText;
}
}

if (sources.length) {
await Promise.all(fileRetrievalPromises);

// Handle adjacent identical citations with the unique format
const adjacentCitationRegex = new RegExp(
`${escapeRegExp(uniqueCitationStart)}(\\d+)${escapeRegExp(
uniqueCitationEnd,
)}(\\s*)${escapeRegExp(uniqueCitationStart)}(\\d+)${escapeRegExp(uniqueCitationEnd)}`,
'g',
);
text = text.replace(adjacentCitationRegex, (match, num1, space, num2) => {
return num1 === num2
? `${uniqueCitationStart}${num1}${uniqueCitationEnd}`
: `${uniqueCitationStart}${num1}${uniqueCitationEnd}${space}${uniqueCitationStart}${num2}${uniqueCitationEnd}`;
});

// Remove any remaining adjacent identical citations
const remainingAdjacentRegex = new RegExp(
`(${escapeRegExp(uniqueCitationStart)}(\\d+)${escapeRegExp(uniqueCitationEnd)})\\s*\\1+`,
'g',
);
text = text.replace(remainingAdjacentRegex, '$1');

// Replace the unique citation format with the final format
text = text.replace(new RegExp(escapeRegExp(uniqueCitationStart), 'g'), '^');
text = text.replace(new RegExp(escapeRegExp(uniqueCitationEnd), 'g'), '^');

if (sources.size) {
text += '\n\n';
for (let i = 0; i < sources.length; i++) {
text += `^${i + 1}.^ ${sources[i]}${i === sources.length - 1 ? '' : '\n'}`;
}
Array.from(sources.entries()).forEach(([source, index], arrayIndex) => {
text += `^${index}.^ ${source}${arrayIndex === sources.size - 1 ? '' : '\n'}`;
});
}

return { messages: sorted, text, edited };
Expand Down
Loading

0 comments on commit 270c6d2

Please sign in to comment.