Skip to content

Commit

Permalink
cache to prevent sending another message while message is sending
Browse files Browse the repository at this point in the history
  • Loading branch information
wqyeo committed Jun 29, 2024
1 parent 1cf2688 commit 831e6f2
Showing 1 changed file with 32 additions and 3 deletions.
35 changes: 32 additions & 3 deletions src/api/consumers/chatMessageConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import selectAccountByUUID from '../repositories/selectAccountByUUID';
import { db } from 'src/db';
import updateAccountTokenByUUID from '../repositories/updateAccountTokenByUUID';
import requestEncrypt from '../services/crypt/requestEncrypt';
import { REDIS_CONNECTION_STRING } from 'src/configs/redisConnectionString';
import { createClient } from 'redis';

function getSenderUserUUID(data: any): Promise<string | null> {
const sessionToken: string = data.session_token
Expand All @@ -38,7 +40,17 @@ function getSenderUserUUID(data: any): Promise<string | null> {

// TODO: Support custom system messages...
export default async function chatMessageConsumer(socketClient: WebSocket, content: RawData) {
const redisClient = createClient({
url: REDIS_CONNECTION_STRING
})
.on('error', async err => {
console.error('Connecting Redis Client Error', err);
await insertLog(`Error connecting to Redis :: ${err}`);
})

try {
await redisClient.connect();

const data = JSON.parse(content.toString())

const userUUID = await getSenderUserUUID(data)
Expand All @@ -47,6 +59,7 @@ export default async function chatMessageConsumer(socketClient: WebSocket, conte
socketClient.send(JSON.stringify({ status: "BAD_SESSION", message: "Bad session token, user should login again!" }));
return
}

const matchingAccounts = await selectAccountByUUID(userUUID)
if (matchingAccounts.length <= 0) {
const warningMessage = `Client tried to login as ${data.username} (${userUUID}). But failed to match UUID in database.`
Expand Down Expand Up @@ -81,16 +94,28 @@ export default async function chatMessageConsumer(socketClient: WebSocket, conte
}

const conversationID: number | null = data.conversation_id
const attachments: MessageAttachment[] = data.attachments

// TODO: Handle attachment size/input validation.

// TODO: Handle attachments

const openAiClient = CreateOpenAiClient()
if (openAiClient === null) {
console.error("An incoming message on websocket consumer was ignored due to unset OpenAI keys!")
return
}

// validate from cache if user is currently sending message
const userSendingMessage = await redisClient.get(`${userUUID}_is_messaging`)
if (userSendingMessage != null) {
await insertLog(`User ${data.username} (${userUUID}) tried to send message but is already sending one.`, "WARNING")
console.warn(`User (${data.username}) tried to send message but is already sending one.`)
socketClient.send(JSON.stringify({ status: "ALREADY_SENDING", message: "User is already sending a message!" }));
return
}
await redisClient.set(`${userUUID}_is_messaging`, "true", {
EX: 20,
NX: true
})

// Create conversation if new conversation,
// otherwise, try to fetch from database.
let conversationObject;
Expand Down Expand Up @@ -196,9 +221,13 @@ export default async function chatMessageConsumer(socketClient: WebSocket, conte
token_cost: completionTokenCost
}
}));

await redisClient.del(`${userUUID}_is_messaging`)
} catch (err) {
const logMessage = `Failed to handle chat message consumer :: ${err}`
console.error(logMessage)
await insertLog(logMessage, "CRITICAL")
} finally {
await redisClient.disconnect()
}
}

0 comments on commit 831e6f2

Please sign in to comment.