diff --git a/.env.test b/.env.test new file mode 100644 index 0000000..4279049 --- /dev/null +++ b/.env.test @@ -0,0 +1,3 @@ +NATS_URL=localhost:4222 +CATALYST_CONTENT_URL_LOADBALANCER=http://peer.decentraland.local/ +PROFILE_IMAGES_URL=https://profile-images.decentraland.local diff --git a/package.json b/package.json index 2ec1f16..2eebdce 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,7 @@ }, "dependencies": { "@dcl/platform-crypto-middleware": "^1.1.0", - "@dcl/protocol": "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12916692077.commit-190ed21.tgz", + "@dcl/protocol": "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12933824416.commit-ed8c3aa.tgz", "@dcl/rpc": "^1.1.2", "@dcl/schemas": "^15.6.0", "@well-known-components/env-config-provider": "^1.2.0", diff --git a/src/adapters/catalyst-client.ts b/src/adapters/catalyst-client.ts index 1cf14b8..9b88b39 100644 --- a/src/adapters/catalyst-client.ts +++ b/src/adapters/catalyst-client.ts @@ -1,7 +1,7 @@ import { Entity } from '@dcl/schemas' import { createContentClient, ContentClient } from 'dcl-catalyst-client' import { getCatalystServersFromCache } from 'dcl-catalyst-client/dist/contracts-snapshots' -import { AppComponents, ICatalystClient, ICatalystClientRequestOptions } from '../types' +import { AppComponents, ICatalystClientComponent, ICatalystClientRequestOptions } from '../types' import { retry } from '../utils/retrier' import { shuffleArray } from '../utils/array' @@ -11,7 +11,7 @@ const L1_TESTNET = 'sepolia' export async function createCatalystClient({ fetcher, config -}: Pick): Promise { +}: Pick): Promise { const loadBalancer = await config.requireString('CATALYST_CONTENT_URL_LOADBALANCER') const contractNetwork = (await config.getString('ENV')) === 'prod' ? L1_MAINNET : L1_TESTNET @@ -53,5 +53,15 @@ export async function createCatalystClient({ return retry(executeClientRequest, retries, waitTime) } - return { getEntitiesByPointers } + async function getEntityByPointer(pointer: string, options: ICatalystClientRequestOptions = {}): Promise { + const [entity] = await getEntitiesByPointers([pointer], options) + + if (!entity) { + throw new Error(`Entity not found for pointer ${pointer}`) + } + + return entity + } + + return { getEntitiesByPointers, getEntityByPointer } } diff --git a/src/adapters/db.ts b/src/adapters/db.ts index 6722ee3..ff20826 100644 --- a/src/adapters/db.ts +++ b/src/adapters/db.ts @@ -214,7 +214,7 @@ export function createDBComponent(components: Pick }, async getLastFriendshipActionByUsers(loggedUser: string, friendUser: string) { const query = SQL` - SELECT fa.action, fa.acting_user as by + SELECT fa.* FROM friendships f INNER JOIN friendship_actions fa ON f.id = fa.friendship_id WHERE (f.address_requester, f.address_requested) IN ((${loggedUser}, ${friendUser}), (${friendUser}, ${loggedUser})) @@ -235,7 +235,9 @@ export function createDBComponent(components: Pick const { rows: [{ id, created_at }] - } = txClient ? await txClient.query(query) : await pg.query(query) + } = txClient + ? await txClient.query<{ id: string; created_at: Date }>(query) + : await pg.query<{ id: string; created_at: Date }>(query) return { id, @@ -244,10 +246,18 @@ export function createDBComponent(components: Pick }, async updateFriendshipStatus(friendshipId, isActive, txClient) { logger.debug(`updating ${friendshipId} - ${isActive}`) - const query = SQL`UPDATE friendships SET is_active = ${isActive}, updated_at = now() WHERE id = ${friendshipId}` + const query = SQL`UPDATE friendships SET is_active = ${isActive}, updated_at = now() WHERE id = ${friendshipId} RETURNING id, created_at` - const results = txClient ? await txClient?.query(query) : await pg.query(query) - return results.rowCount === 1 + const { + rows: [{ id, created_at }] + } = txClient + ? await txClient.query<{ id: string; created_at: Date }>(query) + : await pg.query<{ id: string; created_at: Date }>(query) + + return { + id, + created_at + } }, async recordFriendshipAction(friendshipId, actingUser, action, metadata, txClient) { const uuid = randomUUID() diff --git a/src/adapters/peer-tracking.ts b/src/adapters/peer-tracking.ts index d0da90f..536f4e7 100644 --- a/src/adapters/peer-tracking.ts +++ b/src/adapters/peer-tracking.ts @@ -56,10 +56,16 @@ export function createPeerTrackingComponent({ } return { - async start() { + async subscribeToPeerStatusUpdates() { PEER_STATUS_HANDLERS.forEach((handler) => { - const subscription = nats.subscribe(handler.pattern, createMessageHandler(handler)) - subscriptions.set(handler.event, subscription) + try { + const subscription = nats.subscribe(handler.pattern, createMessageHandler(handler)) + subscriptions.set(handler.event, subscription) + } catch (error: any) { + logger.error(`Error subscribing to ${handler.pattern}`, { + error: error.message + }) + } }) }, async stop() { diff --git a/src/adapters/rpc-server/rpc-server.ts b/src/adapters/rpc-server/rpc-server.ts index 91b42f1..983ef04 100644 --- a/src/adapters/rpc-server/rpc-server.ts +++ b/src/adapters/rpc-server/rpc-server.ts @@ -1,6 +1,6 @@ import { createRpcServer } from '@dcl/rpc' import { registerService } from '@dcl/rpc/dist/codegen' -import { AppComponents, IRPCServerComponent, RpcServerContext, SubscriptionEventsEmitter } from '../../types' +import { AppComponents, IRPCServerComponent, RpcServerContext } from '../../types' import { getFriendsService } from './services/get-friends' import { getMutualFriendsService } from './services/get-mutual-friends' import { getPendingFriendshipRequestsService } from './services/get-pending-friendship-requests' @@ -9,8 +9,9 @@ import { subscribeToFriendshipUpdatesService } from './services/subscribe-to-fri import { SocialServiceDefinition } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { getSentFriendshipRequestsService } from './services/get-sent-friendship-requests' import { getFriendshipStatusService } from './services/get-friendship-status' -import { subscribeToFriendUpdatesService } from './services/subscribe-to-friend-updates' +import { subscribeToFriendConnectivityUpdatesService } from './services/subscribe-to-friend-connectivity-updates' import { FRIEND_STATUS_UPDATES_CHANNEL, FRIENDSHIP_UPDATES_CHANNEL } from '../pubsub' +import { friendshipUpdateHandler, friendConnectivityUpdateHandler } from '../../logic/updates' export async function createRpcServerComponent({ logs, @@ -45,45 +46,14 @@ export async function createRpcServerComponent({ const getSentFriendshipRequests = await getSentFriendshipRequestsService({ components: { logs, db, catalystClient, config } }) - const upsertFriendship = upsertFriendshipService({ components: { logs, db, pubsub } }) + const upsertFriendship = await upsertFriendshipService({ components: { logs, db, pubsub, config, catalystClient } }) const getFriendshipStatus = getFriendshipStatusService({ components: { logs, db } }) - const subscribeToFriendshipUpdates = subscribeToFriendshipUpdatesService({ components: { logs } }) - const subscribeToFriendUpdates = subscribeToFriendUpdatesService({ - components: { logs, db, archipelagoStats } + const subscribeToFriendshipUpdates = await subscribeToFriendshipUpdatesService({ + components: { logs, config, catalystClient } + }) + const subscribeToFriendConnectivityUpdates = await subscribeToFriendConnectivityUpdatesService({ + components: { logs, db, archipelagoStats, config, catalystClient } }) - - function handleFriendshipUpdate(message: string) { - try { - const update = JSON.parse(message) as SubscriptionEventsEmitter['friendshipUpdate'] - const updateEmitter = SHARED_CONTEXT.subscribers[update.to] - if (updateEmitter) { - updateEmitter.emit('friendshipUpdate', update) - } - } catch (error: any) { - logger.error(`Error handling friendship update: ${error.message}`, { - message - }) - } - } - - async function handleFriendStatusUpdate(message: string) { - try { - // TODO: this may be a problem if the user has a lot of friends or there are a lot of users online - const update = JSON.parse(message) as SubscriptionEventsEmitter['friendStatusUpdate'] - const friends = await db.getOnlineFriends(update.address, Object.keys(SHARED_CONTEXT.subscribers)) - - friends.forEach(({ address: friendAddress }) => { - const emitter = SHARED_CONTEXT.subscribers[friendAddress] - if (emitter) { - emitter.emit('friendStatusUpdate', update) - } - }) - } catch (error: any) { - logger.error(`Error handling friend status update: ${error.message}`, { - message - }) - } - } rpcServer.setHandler(async function handler(port) { registerService(port, SocialServiceDefinition, async () => ({ @@ -94,7 +64,7 @@ export async function createRpcServerComponent({ getFriendshipStatus, upsertFriendship, subscribeToFriendshipUpdates, - subscribeToFriendUpdates + subscribeToFriendConnectivityUpdates })) }) @@ -104,8 +74,11 @@ export async function createRpcServerComponent({ logger.info(`[RPC] RPC Server listening on port ${rpcServerPort}`) }) - await pubsub.subscribeToChannel(FRIENDSHIP_UPDATES_CHANNEL, handleFriendshipUpdate) - await pubsub.subscribeToChannel(FRIEND_STATUS_UPDATES_CHANNEL, handleFriendStatusUpdate) + await pubsub.subscribeToChannel(FRIENDSHIP_UPDATES_CHANNEL, friendshipUpdateHandler(SHARED_CONTEXT, logger)) + await pubsub.subscribeToChannel( + FRIEND_STATUS_UPDATES_CHANNEL, + friendConnectivityUpdateHandler(SHARED_CONTEXT, logger, db) + ) }, attachUser({ transport, address }) { transport.on('close', () => { diff --git a/src/adapters/rpc-server/services/subscribe-to-friend-connectivity-updates.ts b/src/adapters/rpc-server/services/subscribe-to-friend-connectivity-updates.ts new file mode 100644 index 0000000..61c6390 --- /dev/null +++ b/src/adapters/rpc-server/services/subscribe-to-friend-connectivity-updates.ts @@ -0,0 +1,46 @@ +import { Empty } from '@dcl/protocol/out-js/google/protobuf/empty.gen' +import { SubscriptionEventsEmitter, RpcServerContext, RPCServiceContext } from '../../../types' +import { + FriendConnectivityUpdate, + ConnectivityStatus +} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' +import { parseEmittedUpdateToFriendConnectivityUpdate } from '../../../logic/friendships' +import { parseProfilesToFriends } from '../../../logic/friends' +import { handleSubscriptionUpdates } from '../../../logic/updates' + +export async function subscribeToFriendConnectivityUpdatesService({ + components: { logs, db, archipelagoStats, config, catalystClient } +}: RPCServiceContext<'logs' | 'db' | 'archipelagoStats' | 'config' | 'catalystClient'>) { + const logger = logs.getLogger('subscribe-to-friend-connectivity-updates-service') + const profileImagesUrl = await config.requireString('PROFILE_IMAGES_URL') + + return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator { + try { + const onlinePeers = await archipelagoStats.getPeersFromCache() + const onlineFriends = await db.getOnlineFriends(context.address, onlinePeers) + + const profiles = await catalystClient.getEntitiesByPointers(onlineFriends.map((friend) => friend.address)) + const parsedProfiles = parseProfilesToFriends(profiles, profileImagesUrl).map((friend) => ({ + friend, + status: ConnectivityStatus.ONLINE + })) + + yield* parsedProfiles + + yield* handleSubscriptionUpdates({ + rpcContext: context, + eventName: 'friendConnectivityUpdate', + components: { + catalystClient, + logger + }, + getAddressFromUpdate: (update: SubscriptionEventsEmitter['friendConnectivityUpdate']) => update.address, + parser: parseEmittedUpdateToFriendConnectivityUpdate, + parseArgs: [profileImagesUrl] + }) + } catch (error: any) { + logger.error('Error in friend updates subscription:', error) + throw error + } + } +} diff --git a/src/adapters/rpc-server/services/subscribe-to-friend-updates.ts b/src/adapters/rpc-server/services/subscribe-to-friend-updates.ts deleted file mode 100644 index 625ea2f..0000000 --- a/src/adapters/rpc-server/services/subscribe-to-friend-updates.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { Empty } from '@dcl/protocol/out-js/google/protobuf/empty.gen' -import { SubscriptionEventsEmitter, RpcServerContext, RPCServiceContext } from '../../../types' -import { - FriendUpdate, - ConnectivityStatus -} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' -import mitt from 'mitt' -import emitterToAsyncGenerator from '../../../utils/emitterToGenerator' -import { parseEmittedUpdateToFriendStatusUpdate } from '../../../logic/friendships' - -export function subscribeToFriendUpdatesService({ - components: { logs, db, archipelagoStats } -}: RPCServiceContext<'logs' | 'db' | 'archipelagoStats'>) { - const logger = logs.getLogger('subscribe-to-friend-updates-service') - - return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator { - try { - const eventEmitter = context.subscribers[context.address] || mitt() - - if (!context.subscribers[context.address]) { - context.subscribers[context.address] = eventEmitter - } - - const onlinePeers = await archipelagoStats.getPeersFromCache() - const onlineFriends = db.streamOnlineFriends(context.address, onlinePeers) - - for await (const friend of onlineFriends) { - yield { - user: { address: friend.address }, - status: ConnectivityStatus.ONLINE - } - } - - const updatesGenerator = emitterToAsyncGenerator(eventEmitter, 'friendStatusUpdate') - for await (const update of updatesGenerator) { - logger.debug('Friend status update received:', { update: JSON.stringify(update) }) - const updateToResponse = parseEmittedUpdateToFriendStatusUpdate(update) - if (updateToResponse) { - yield updateToResponse - } else { - logger.error('Unable to parse friend status update: ', { update: JSON.stringify(update) }) - } - } - } catch (error: any) { - logger.error('Error in friend updates subscription:', error) - throw error - } - } -} diff --git a/src/adapters/rpc-server/services/subscribe-to-friendship-updates.ts b/src/adapters/rpc-server/services/subscribe-to-friendship-updates.ts index 3a5ab42..9c1ff5e 100644 --- a/src/adapters/rpc-server/services/subscribe-to-friendship-updates.ts +++ b/src/adapters/rpc-server/services/subscribe-to-friendship-updates.ts @@ -1,30 +1,26 @@ import { Empty } from '@dcl/protocol/out-js/google/protobuf/empty.gen' import { RpcServerContext, RPCServiceContext, SubscriptionEventsEmitter } from '../../../types' import { FriendshipUpdate } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' -import mitt from 'mitt' import { parseEmittedUpdateToFriendshipUpdate } from '../../../logic/friendships' -import emitterToAsyncGenerator from '../../../utils/emitterToGenerator' +import { handleSubscriptionUpdates } from '../../../logic/updates' -export function subscribeToFriendshipUpdatesService({ components: { logs } }: RPCServiceContext<'logs'>) { +export async function subscribeToFriendshipUpdatesService({ + components: { logs, config, catalystClient } +}: RPCServiceContext<'logs' | 'config' | 'catalystClient'>) { const logger = logs.getLogger('subscribe-to-friendship-updates-service') + const profileImagesUrl = await config.requireString('PROFILE_IMAGES_URL') return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator { - const eventEmitter = context.subscribers[context.address] || mitt() - - if (!context.subscribers[context.address]) { - context.subscribers[context.address] = eventEmitter - } - - const updatesGenerator = emitterToAsyncGenerator(eventEmitter, 'friendshipUpdate') - - for await (const update of updatesGenerator) { - logger.debug('Friendship update received:', { update: JSON.stringify(update) }) - const updateToResponse = parseEmittedUpdateToFriendshipUpdate(update) - if (updateToResponse) { - yield updateToResponse - } else { - logger.error('Unable to parse friendship update: ', { update: JSON.stringify(update) }) - } - } + yield* handleSubscriptionUpdates({ + rpcContext: context, + eventName: 'friendshipUpdate', + components: { + logger, + catalystClient + }, + getAddressFromUpdate: (update: SubscriptionEventsEmitter['friendshipUpdate']) => update.to, + parser: parseEmittedUpdateToFriendshipUpdate, + parseArgs: [profileImagesUrl] + }) } } diff --git a/src/adapters/rpc-server/services/upsert-friendship.ts b/src/adapters/rpc-server/services/upsert-friendship.ts index 5edfc67..2775237 100644 --- a/src/adapters/rpc-server/services/upsert-friendship.ts +++ b/src/adapters/rpc-server/services/upsert-friendship.ts @@ -6,20 +6,23 @@ import { import { parseUpsertFriendshipRequest, validateNewFriendshipAction, - getNewFriendshipStatus + getNewFriendshipStatus, + parseFriendshipRequestToFriendshipRequestResponse } from '../../../logic/friendships' import { FRIENDSHIP_UPDATES_CHANNEL } from '../../pubsub' -export function upsertFriendshipService({ - components: { logs, db, pubsub } -}: RPCServiceContext<'logs' | 'db' | 'pubsub'>) { +export async function upsertFriendshipService({ + components: { logs, db, pubsub, config, catalystClient } +}: RPCServiceContext<'logs' | 'db' | 'pubsub' | 'config' | 'catalystClient'>) { const logger = logs.getLogger('upsert-friendship-service') + const profileImagesUrl = await config.requireString('PROFILE_IMAGES_URL') return async function ( request: UpsertFriendshipPayload, context: RpcServerContext ): Promise { const parsedRequest = parseUpsertFriendshipRequest(request) + if (!parsedRequest) { logger.error('upsert friendship received unknown message: ', request as any) return { @@ -33,20 +36,9 @@ export function upsertFriendshipService({ logger.debug(`upsert friendship > `, parsedRequest as Record) try { - const friendship = await db.getFriendship([context.address, parsedRequest.user!]) - let lastAction = undefined - if (friendship) { - const lastRecordedAction = await db.getLastFriendshipAction(friendship.id) - lastAction = lastRecordedAction - } + const lastAction = await db.getLastFriendshipActionByUsers(context.address, parsedRequest.user!) - if ( - !validateNewFriendshipAction( - context.address, - { action: parsedRequest.action, user: parsedRequest.user }, - lastAction - ) - ) { + if (!validateNewFriendshipAction(context.address, parsedRequest, lastAction)) { logger.error('invalid action for a friendship') return { response: { @@ -62,12 +54,12 @@ export function upsertFriendshipService({ logger.debug('friendship status > ', { isActive: JSON.stringify(isActive), friendshipStatus }) const { id, actionId, createdAt } = await db.executeTx(async (tx) => { - let id: string, createdAt: number + let id: string, createdAt: Date - if (friendship) { - await db.updateFriendshipStatus(friendship.id, isActive, tx) - id = friendship.id - createdAt = new Date(friendship.created_at).getTime() + if (lastAction) { + const { created_at } = await db.updateFriendshipStatus(lastAction.friendship_id, isActive, tx) + id = lastAction.friendship_id + createdAt = created_at } else { const { id: newFriendshipId, created_at } = await db.createFriendship( [context.address, parsedRequest.user!], @@ -75,7 +67,7 @@ export function upsertFriendshipService({ tx ) id = newFriendshipId - createdAt = new Date(created_at).getTime() + createdAt = created_at } const actionId = await db.recordFriendshipAction( @@ -91,27 +83,30 @@ export function upsertFriendshipService({ logger.debug(`${id} friendship was upsert successfully`) - await pubsub.publishInChannel(FRIENDSHIP_UPDATES_CHANNEL, { - id: actionId, - from: context.address, - to: parsedRequest.user, - action: parsedRequest.action, - timestamp: Date.now(), - metadata: - parsedRequest.action === Action.REQUEST - ? parsedRequest.metadata - ? parsedRequest.metadata - : undefined - : undefined - }) + const metadata = + parsedRequest.action === Action.REQUEST && parsedRequest.metadata ? parsedRequest.metadata : undefined + + const [_, profile] = await Promise.all([ + await pubsub.publishInChannel(FRIENDSHIP_UPDATES_CHANNEL, { + id: actionId, + from: context.address, + to: parsedRequest.user, + action: parsedRequest.action, + timestamp: Date.now(), + metadata + }), + catalystClient.getEntityByPointer(parsedRequest.user) + ]) + const friendshipRequest = { + id, + timestamp: createdAt.toString(), + metadata: metadata || null + } return { response: { $case: 'accepted', - accepted: { - id: id, - createdAt - } + accepted: parseFriendshipRequestToFriendshipRequestResponse(friendshipRequest, profile, profileImagesUrl) } } } catch (error: any) { diff --git a/src/components.ts b/src/components.ts index 15a8147..84cd77a 100644 --- a/src/components.ts +++ b/src/components.ts @@ -19,7 +19,7 @@ import { createCatalystClient } from './adapters/catalyst-client' // Initialize all the components of the app export async function initComponents(): Promise { - const config = await createDotEnvConfigComponent({ path: ['.env.default', '.env'] }) + const config = await createDotEnvConfigComponent({ path: ['.env.test', '.env.default', '.env'] }) const metrics = await createMetricsComponent(metricDeclarations, { config }) const logs = await createLogComponent({ metrics }) diff --git a/src/logic/friends.ts b/src/logic/friends.ts index 46acdfe..9631e11 100644 --- a/src/logic/friends.ts +++ b/src/logic/friends.ts @@ -3,17 +3,17 @@ import { Entity } from '@dcl/schemas' import { getProfileAvatar, getProfilePictureUrl } from './profiles' import { normalizeAddress } from '../utils/address' -export function parseProfileToFriend(profile: Entity, contentServerUrl: string): FriendProfile { +export function parseProfileToFriend(profile: Entity, profileImagesUrl: string): FriendProfile { const { userId, name, hasClaimedName } = getProfileAvatar(profile) return { address: normalizeAddress(userId), name, hasClaimedName, - profilePictureUrl: getProfilePictureUrl(contentServerUrl, profile) + profilePictureUrl: getProfilePictureUrl(profileImagesUrl, profile) } } -export function parseProfilesToFriends(profiles: Entity[], contentServerUrl: string): FriendProfile[] { - return profiles.map((profile) => parseProfileToFriend(profile, contentServerUrl)) +export function parseProfilesToFriends(profiles: Entity[], profileImagesUrl: string): FriendProfile[] { + return profiles.map((profile) => parseProfileToFriend(profile, profileImagesUrl)) } diff --git a/src/logic/friendships.ts b/src/logic/friendships.ts index 6d50108..8a2649d 100644 --- a/src/logic/friendships.ts +++ b/src/logic/friendships.ts @@ -2,7 +2,7 @@ import { FriendshipUpdate, UpsertFriendshipPayload, FriendshipStatus as FriendshipRequestStatus, - FriendUpdate, + FriendConnectivityUpdate, FriendshipRequestResponse } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { @@ -129,7 +129,9 @@ export function parseUpsertFriendshipRequest(request: UpsertFriendshipPayload): } export function parseEmittedUpdateToFriendshipUpdate( - update: SubscriptionEventsEmitter['friendshipUpdate'] + update: SubscriptionEventsEmitter['friendshipUpdate'], + profile: Entity, + profileImagesUrl: string ): FriendshipUpdate | null { switch (update.action) { case Action.REQUEST: @@ -139,9 +141,7 @@ export function parseEmittedUpdateToFriendshipUpdate( request: { id: update.id, createdAt: update.timestamp, - user: { - address: update.from - }, + friend: parseProfileToFriend(profile, profileImagesUrl), message: update.metadata?.message } } @@ -195,26 +195,29 @@ export function parseEmittedUpdateToFriendshipUpdate( } } -export function parseEmittedUpdateToFriendStatusUpdate({ - address, - status -}: SubscriptionEventsEmitter['friendStatusUpdate']): FriendUpdate | null { +export function parseEmittedUpdateToFriendConnectivityUpdate( + update: Pick, + profile: Entity, + profileImagesUrl: string +): FriendConnectivityUpdate | null { + const { status } = update return { - user: { address }, + friend: parseProfileToFriend(profile, profileImagesUrl), status: status } } export function getFriendshipRequestStatus( - { action, acting_user }: FriendshipAction, + friendshipAction: Pick, loggedUserAddress: string ): FriendshipRequestStatus { + const { action, acting_user } = friendshipAction const statusResolver = FRIENDSHIP_STATUS_BY_ACTION[action] return statusResolver?.(acting_user, loggedUserAddress) ?? FriendshipRequestStatus.UNRECOGNIZED } export function parseFriendshipRequestToFriendshipRequestResponse( - request: FriendshipRequest, + request: Pick, profile: Entity, profileImagesUrl: string ): FriendshipRequestResponse { diff --git a/src/logic/updates.ts b/src/logic/updates.ts new file mode 100644 index 0000000..09bb240 --- /dev/null +++ b/src/logic/updates.ts @@ -0,0 +1,92 @@ +import { ILoggerComponent } from '@well-known-components/interfaces' +import { ICatalystClientComponent, IDatabaseComponent, RpcServerContext, SubscriptionEventsEmitter } from '../types' +import mitt from 'mitt' +import emitterToAsyncGenerator from '../utils/emitterToGenerator' + +export type ILogger = ILoggerComponent.ILogger + +type SharedContext = Pick +type UpdateHandler = ( + update: SubscriptionEventsEmitter[T] +) => void | Promise + +type UpdateParser = (update: U, ...args: any[]) => T | null + +interface SubscriptionHandlerParams { + rpcContext: RpcServerContext + eventName: keyof SubscriptionEventsEmitter + components: { + logger: ILogger + catalystClient: ICatalystClientComponent + } + getAddressFromUpdate: (update: U) => string + parser: UpdateParser + parseArgs: any[] +} + +function handleUpdate(handler: UpdateHandler, logger: ILogger) { + return async (message: string) => { + try { + const update = JSON.parse(message) as SubscriptionEventsEmitter[T] + await handler(update) + } catch (error: any) { + logger.error(`Error handling update: ${error.message}`, { + error, + message + }) + } + } +} + +export function friendshipUpdateHandler(sharedContext: SharedContext, logger: ILogger) { + return handleUpdate<'friendshipUpdate'>((update) => { + const updateEmitter = sharedContext.subscribers[update.to] + if (updateEmitter) { + updateEmitter.emit('friendshipUpdate', update) + } + }, logger) +} + +export function friendConnectivityUpdateHandler(sharedContext: SharedContext, logger: ILogger, db: IDatabaseComponent) { + return handleUpdate<'friendConnectivityUpdate'>(async (update) => { + const friends = await db.getOnlineFriends(update.address, Object.keys(sharedContext.subscribers)) + + friends.forEach(({ address: friendAddress }) => { + const emitter = sharedContext.subscribers[friendAddress] + if (emitter) { + emitter.emit('friendConnectivityUpdate', update) + } + }) + }, logger) +} + +export async function* handleSubscriptionUpdates({ + rpcContext, + eventName, + components: { catalystClient, logger }, + getAddressFromUpdate, + parser, + parseArgs +}: SubscriptionHandlerParams): AsyncGenerator { + const eventEmitter = rpcContext.subscribers[rpcContext.address] || mitt() + + if (!rpcContext.subscribers[rpcContext.address]) { + rpcContext.subscribers[rpcContext.address] = eventEmitter + } + + const updatesGenerator = emitterToAsyncGenerator(eventEmitter, eventName) + + for await (const update of updatesGenerator) { + const eventNameString = String(eventName) + logger.debug(`${eventNameString} received:`, { update: JSON.stringify(update) }) + + const profile = await catalystClient.getEntityByPointer(getAddressFromUpdate(update as U)) + const parsedUpdate = await parser(update as U, profile, ...parseArgs) + + if (parsedUpdate) { + yield parsedUpdate + } else { + logger.error(`Unable to parse ${eventNameString}:`, { update: JSON.stringify(update) }) + } + } +} diff --git a/src/service.ts b/src/service.ts index c02ffd1..1400056 100644 --- a/src/service.ts +++ b/src/service.ts @@ -9,4 +9,6 @@ export async function main(program: Lifecycle.EntryPointParameters - updateFriendshipStatus(friendshipId: string, isActive: boolean, txClient?: PoolClient): Promise + updateFriendshipStatus( + friendshipId: string, + isActive: boolean, + txClient?: PoolClient + ): Promise<{ + id: string + created_at: Date + }> getFriends( userAddress: string, options?: { @@ -118,6 +125,7 @@ export type IArchipelagoStatsComponent = IBaseComponent & { export type IPeersSynchronizer = IBaseComponent export type IPeerTrackingComponent = IBaseComponent & { getSubscriptions(): Map + subscribeToPeerStatusUpdates(): Promise } export type ICatalystClientRequestOptions = { @@ -126,8 +134,9 @@ export type ICatalystClientRequestOptions = { contentServerUrl?: string } -export type ICatalystClient = { +export type ICatalystClientComponent = { getEntitiesByPointers(pointers: string[], options?: ICatalystClientRequestOptions): Promise + getEntityByPointer(pointer: string, options?: ICatalystClientRequestOptions): Promise } // this type simplifies the typings of http handlers @@ -191,7 +200,7 @@ export type SubscriptionEventsEmitter = { timestamp: number metadata?: { message: string } } - friendStatusUpdate: { + friendConnectivityUpdate: { address: string status: ConnectivityStatus } diff --git a/test/mocks/components/catalyst-client.ts b/test/mocks/components/catalyst-client.ts index 4a885a9..7bcba49 100644 --- a/test/mocks/components/catalyst-client.ts +++ b/test/mocks/components/catalyst-client.ts @@ -1,5 +1,6 @@ -import { ICatalystClient } from '../../../src/types' +import { ICatalystClientComponent } from '../../../src/types' -export const mockCatalystClient: jest.Mocked = { - getEntitiesByPointers: jest.fn() +export const mockCatalystClient: jest.Mocked = { + getEntitiesByPointers: jest.fn(), + getEntityByPointer: jest.fn() } diff --git a/test/mocks/profile.ts b/test/mocks/profile.ts index b17b10a..bde68d3 100644 --- a/test/mocks/profile.ts +++ b/test/mocks/profile.ts @@ -1,5 +1,7 @@ import { Entity, EntityType } from '@dcl/schemas' +export const PROFILE_IMAGES_URL = 'https://profile-images.decentraland.org' + export const mockProfile: Entity = { version: '1', id: 'profile-id', diff --git a/test/unit/adapters/archipelago-stats.spec.ts b/test/unit/adapters/archipelago-stats.spec.ts index 347c6f6..9ef52fa 100644 --- a/test/unit/adapters/archipelago-stats.spec.ts +++ b/test/unit/adapters/archipelago-stats.spec.ts @@ -1,4 +1,3 @@ -import { json } from 'stream/consumers' import { createArchipelagoStatsComponent } from '../../../src/adapters/archipelago-stats' import { IArchipelagoStatsComponent } from '../../../src/types' import { mockConfig, mockFetcher, mockLogs, mockRedis } from '../../mocks/components' @@ -27,6 +26,14 @@ describe('ArchipelagoStatsComponent', () => { expect(result).toEqual(['0x123', '0x456']) }) + it('should throw an error when fetch response is not ok', async () => { + mockFetcher.fetch.mockResolvedValue({ + ok: false, + statusText: 'Not Found' + } as any) + await expect(archipelagoStats.getPeers()).rejects.toThrow('Error fetching peers: Not Found') + }) + it('should throw an error when the fetch fails', async () => { mockFetcher.fetch.mockRejectedValue(new Error('Fetch failed')) await expect(archipelagoStats.getPeers()).rejects.toThrow('Fetch failed') diff --git a/test/unit/adapters/catalyst-client.spec.ts b/test/unit/adapters/catalyst-client.spec.ts index f799454..1bf487b 100644 --- a/test/unit/adapters/catalyst-client.spec.ts +++ b/test/unit/adapters/catalyst-client.spec.ts @@ -1,6 +1,6 @@ import { Entity } from '@dcl/schemas' import { createCatalystClient } from '../../../src/adapters/catalyst-client' -import { ICatalystClient } from '../../../src/types' +import { ICatalystClientComponent } from '../../../src/types' import { ContentClient, createContentClient } from 'dcl-catalyst-client' import { mockConfig, mockFetcher } from '../../mocks/components' @@ -32,7 +32,7 @@ jest.mock('../../../src/utils/timer', () => ({ const LOAD_BALANCER_URL = 'http://catalyst-server.com' describe('Catalyst client', () => { - let catalystClient: ICatalystClient + let catalystClient: ICatalystClientComponent let contentClientMock: ContentClient beforeEach(async () => { @@ -45,10 +45,6 @@ describe('Catalyst client', () => { contentClientMock = createContentClient({ fetcher: mockFetcher, url: LOAD_BALANCER_URL }) }) - afterEach(() => { - jest.clearAllMocks() - }) - describe('getEntitiesByPointers', () => { let pointers: string[] let entities: Pick[] @@ -130,6 +126,21 @@ describe('Catalyst client', () => { }) }) + describe('getEntityByPointer', () => { + it('should throw an error if the entity is not found', async () => { + contentClientMock.fetchEntitiesByPointers = jest.fn().mockResolvedValue([]) + await expect(catalystClient.getEntityByPointer('pointer')).rejects.toThrow('Entity not found for pointer pointer') + }) + + it('should return the entity if it is found', async () => { + contentClientMock.fetchEntitiesByPointers = jest.fn().mockResolvedValue([{ id: 'entity1' }]) + + const result = await catalystClient.getEntityByPointer('pointer') + + expect(result).toEqual({ id: 'entity1' }) + }) + }) + // Helpers function expectContentClientToHaveBeenCalledWithUrl(url: string) { expect(createContentClient).toHaveBeenCalledWith( diff --git a/test/unit/adapters/db.spec.ts b/test/unit/adapters/db.spec.ts index da72bed..3306c3a 100644 --- a/test/unit/adapters/db.spec.ts +++ b/test/unit/adapters/db.spec.ts @@ -1,6 +1,6 @@ import { createDBComponent } from '../../../src/adapters/db' import { Action } from '../../../src/types' -import SQL, { SQLStatement } from 'sql-template-strings' +import SQL from 'sql-template-strings' import { mockLogs, mockPg } from '../../mocks/components' jest.mock('node:crypto', () => ({ @@ -189,22 +189,20 @@ describe('db', () => { describe('updateFriendshipStatus', () => { it('should update friendship status', async () => { - mockPg.query.mockResolvedValueOnce({ rowCount: 1, rows: [{ updated_at: '2025-01-01T00:00:00.000Z' }] }) + mockPg.query.mockResolvedValueOnce({ + rowCount: 1, + rows: [{ id: 'friendship-id', created_at: '2025-01-01T00:00:00.000Z' }] + }) const result = await dbComponent.updateFriendshipStatus('friendship-id', false) - expect(result).toBe(true) expect(mockPg.query).toHaveBeenCalledWith( - SQL`UPDATE friendships SET is_active = ${false}, updated_at = now() WHERE id = ${'friendship-id'}` + SQL`UPDATE friendships SET is_active = ${false}, updated_at = now() WHERE id = ${'friendship-id'} RETURNING id, created_at` ) - }) - - it('should return false if no rows were updated', async () => { - mockPg.query.mockResolvedValueOnce({ rowCount: 0, rows: [] }) - - const result = await dbComponent.updateFriendshipStatus('friendship-id', false) - - expect(result).toBe(false) + expect(result).toEqual({ + id: 'friendship-id', + created_at: '2025-01-01T00:00:00.000Z' + }) }) }) diff --git a/test/unit/adapters/rpc-server/services/get-friends.spec.ts b/test/unit/adapters/rpc-server/services/get-friends.spec.ts index db3128a..863c958 100644 --- a/test/unit/adapters/rpc-server/services/get-friends.spec.ts +++ b/test/unit/adapters/rpc-server/services/get-friends.spec.ts @@ -1,21 +1,19 @@ import { mockCatalystClient, mockConfig, mockDb, mockLogs } from '../../../../mocks/components' import { getFriendsService } from '../../../../../src/adapters/rpc-server/services/get-friends' import { RpcServerContext } from '../../../../../src/types' -import { createMockProfile } from '../../../../mocks/profile' +import { createMockProfile, PROFILE_IMAGES_URL } from '../../../../mocks/profile' import { createMockFriend, parseExpectedFriends } from '../../../../mocks/friend' describe('getFriendsService', () => { let getFriends: Awaited> - const profileImagesUrl = 'https://profile-images.decentraland.org' - const rpcContext: RpcServerContext = { address: '0x123', subscribers: undefined } beforeEach(async () => { - mockConfig.requireString.mockResolvedValueOnce(profileImagesUrl) + mockConfig.requireString.mockResolvedValueOnce(PROFILE_IMAGES_URL) getFriends = await getFriendsService({ components: { db: mockDb, logs: mockLogs, catalystClient: mockCatalystClient, config: mockConfig } @@ -35,7 +33,7 @@ describe('getFriendsService', () => { const response = await getFriends({ pagination: { limit: 10, offset: 0 } }, rpcContext) expect(response).toEqual({ - friends: mockProfiles.map(parseExpectedFriends(profileImagesUrl)), + friends: mockProfiles.map(parseExpectedFriends(PROFILE_IMAGES_URL)), paginationData: { total: totalFriends, page: 1 diff --git a/test/unit/adapters/rpc-server/services/get-mutual-friends.spec.ts b/test/unit/adapters/rpc-server/services/get-mutual-friends.spec.ts index e799e81..61ddabc 100644 --- a/test/unit/adapters/rpc-server/services/get-mutual-friends.spec.ts +++ b/test/unit/adapters/rpc-server/services/get-mutual-friends.spec.ts @@ -2,14 +2,12 @@ import { mockCatalystClient, mockConfig, mockDb, mockLogs } from '../../../../mo import { getMutualFriendsService } from '../../../../../src/adapters/rpc-server/services/get-mutual-friends' import { GetMutualFriendsPayload } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { RpcServerContext } from '../../../../../src/types' -import { createMockProfile } from '../../../../mocks/profile' +import { createMockProfile, PROFILE_IMAGES_URL } from '../../../../mocks/profile' import { createMockFriend, parseExpectedFriends } from '../../../../mocks/friend' describe('getMutualFriendsService', () => { let getMutualFriends: Awaited> - const profileImagesUrl = 'https://profile-images.decentraland.org' - const rpcContext: RpcServerContext = { address: '0x123', subscribers: undefined @@ -21,7 +19,7 @@ describe('getMutualFriendsService', () => { } beforeEach(async () => { - mockConfig.requireString.mockResolvedValueOnce(profileImagesUrl) + mockConfig.requireString.mockResolvedValueOnce(PROFILE_IMAGES_URL) getMutualFriends = await getMutualFriendsService({ components: { db: mockDb, logs: mockLogs, catalystClient: mockCatalystClient, config: mockConfig } }) @@ -40,7 +38,7 @@ describe('getMutualFriendsService', () => { const response = await getMutualFriends(mutualFriendsRequest, rpcContext) expect(response).toEqual({ - friends: mockMutualFriendsProfiles.map(parseExpectedFriends(profileImagesUrl)), + friends: mockMutualFriendsProfiles.map(parseExpectedFriends(PROFILE_IMAGES_URL)), paginationData: { total: totalMutualFriends, page: 1 diff --git a/test/unit/adapters/rpc-server/services/get-pending-friendship-requests.spec.ts b/test/unit/adapters/rpc-server/services/get-pending-friendship-requests.spec.ts index 04eac48..7793795 100644 --- a/test/unit/adapters/rpc-server/services/get-pending-friendship-requests.spec.ts +++ b/test/unit/adapters/rpc-server/services/get-pending-friendship-requests.spec.ts @@ -4,7 +4,7 @@ import { RpcServerContext } from '../../../../../src/types' import { PaginatedFriendshipRequestsResponse } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { emptyRequest } from '../../../../mocks/empty-request' import { createMockFriendshipRequest, createMockExpectedFriendshipRequest } from '../../../../mocks/friendship-request' -import { createMockProfile } from '../../../../mocks/profile' +import { createMockProfile, PROFILE_IMAGES_URL } from '../../../../mocks/profile' describe('getPendingFriendshipRequestsService', () => { let getPendingRequests: Awaited> @@ -14,10 +14,9 @@ describe('getPendingFriendshipRequestsService', () => { subscribers: undefined } - const profileImagesUrl = 'https://profile-images.decentraland.org' - beforeEach(async () => { - mockConfig.requireString.mockResolvedValueOnce(profileImagesUrl) + mockConfig.requireString.mockResolvedValueOnce(PROFILE_IMAGES_URL) + getPendingRequests = await getPendingFriendshipRequestsService({ components: { db: mockDb, logs: mockLogs, config: mockConfig, catalystClient: mockCatalystClient } }) diff --git a/test/unit/adapters/rpc-server/services/get-sent-friendship-requests.spec.ts b/test/unit/adapters/rpc-server/services/get-sent-friendship-requests.spec.ts index 29a826c..4951d9f 100644 --- a/test/unit/adapters/rpc-server/services/get-sent-friendship-requests.spec.ts +++ b/test/unit/adapters/rpc-server/services/get-sent-friendship-requests.spec.ts @@ -4,7 +4,7 @@ import { RpcServerContext } from '../../../../../src/types' import { emptyRequest } from '../../../../mocks/empty-request' import { createMockFriendshipRequest, createMockExpectedFriendshipRequest } from '../../../../mocks/friendship-request' import { PaginatedFriendshipRequestsResponse } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' -import { createMockProfile } from '../../../../mocks/profile' +import { createMockProfile, PROFILE_IMAGES_URL } from '../../../../mocks/profile' describe('getSentFriendshipRequestsService', () => { let getSentRequests: Awaited> @@ -14,10 +14,9 @@ describe('getSentFriendshipRequestsService', () => { subscribers: undefined } - const profileImagesUrl = 'https://profile-images.decentraland.org' - beforeEach(async () => { - mockConfig.requireString.mockResolvedValueOnce(profileImagesUrl) + mockConfig.requireString.mockResolvedValueOnce(PROFILE_IMAGES_URL) + getSentRequests = await getSentFriendshipRequestsService({ components: { db: mockDb, logs: mockLogs, config: mockConfig, catalystClient: mockCatalystClient } }) diff --git a/test/unit/adapters/rpc-server/services/subscribe-to-friend-connectivity-updates.spec.ts b/test/unit/adapters/rpc-server/services/subscribe-to-friend-connectivity-updates.spec.ts new file mode 100644 index 0000000..8b4b9ed --- /dev/null +++ b/test/unit/adapters/rpc-server/services/subscribe-to-friend-connectivity-updates.spec.ts @@ -0,0 +1,105 @@ +import { Empty } from '@dcl/protocol/out-js/google/protobuf/empty.gen' +import { RpcServerContext } from '../../../../../src/types' +import { mockLogs, mockArchipelagoStats, mockDb, mockConfig, mockCatalystClient } from '../../../../mocks/components' +import { subscribeToFriendConnectivityUpdatesService } from '../../../../../src/adapters/rpc-server/services/subscribe-to-friend-connectivity-updates' +import { ConnectivityStatus } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' +import { createMockProfile, PROFILE_IMAGES_URL } from '../../../../mocks/profile' +import { parseProfileToFriend } from '../../../../../src/logic/friends' +import { handleSubscriptionUpdates } from '../../../../../src/logic/updates' + +jest.mock('../../../../../src/logic/updates') + +describe('subscribeToFriendConnectivityUpdatesService', () => { + let subscribeToFriendConnectivityUpdates: Awaited> + let rpcContext: RpcServerContext + const mockFriendProfile = createMockProfile('0x456') + const mockHandler = handleSubscriptionUpdates as jest.Mock + const friend = { + address: '0x456' + } + + beforeEach(async () => { + mockConfig.requireString.mockResolvedValue(PROFILE_IMAGES_URL) + + subscribeToFriendConnectivityUpdates = await subscribeToFriendConnectivityUpdatesService({ + components: { + logs: mockLogs, + db: mockDb, + archipelagoStats: mockArchipelagoStats, + config: mockConfig, + catalystClient: mockCatalystClient + } + }) + + rpcContext = { + address: '0x123', + subscribers: {} + } + }) + + it('should get initial online friends from archipelago stats and then receive updates', async () => { + mockDb.getOnlineFriends.mockResolvedValueOnce([friend]) + mockCatalystClient.getEntitiesByPointers.mockResolvedValueOnce([mockFriendProfile]) + mockArchipelagoStats.getPeers.mockResolvedValue(['0x456', '0x789']) + mockHandler.mockImplementationOnce(async function* () { + yield { + friend: parseProfileToFriend(mockFriendProfile, PROFILE_IMAGES_URL), + status: ConnectivityStatus.ONLINE + } + }) + + const generator = subscribeToFriendConnectivityUpdates({} as Empty, rpcContext) + const result = await generator.next() + + expect(mockArchipelagoStats.getPeersFromCache).toHaveBeenCalled() + expect(result.value).toEqual({ + friend: parseProfileToFriend(mockFriendProfile, PROFILE_IMAGES_URL), + status: ConnectivityStatus.ONLINE + }) + + const result2 = await generator.next() + expect(result2.done).toBe(false) + }) + + it('should handle empty online friends list and then receive updates', async () => { + mockDb.getOnlineFriends.mockResolvedValueOnce([]) + mockCatalystClient.getEntitiesByPointers.mockResolvedValueOnce([]) + mockHandler.mockImplementationOnce(async function* () { + yield { + friend: parseProfileToFriend(mockFriendProfile, PROFILE_IMAGES_URL), + status: ConnectivityStatus.ONLINE + } + }) + + const generator = subscribeToFriendConnectivityUpdates({} as Empty, rpcContext) + + const result = await generator.next() + expect(mockCatalystClient.getEntitiesByPointers).toHaveBeenCalledWith([]) + expect(result.done).toBe(false) + + const result2 = await generator.next() + expect(result2.done).toBe(true) + }) + + it('should handle errors during subscription', async () => { + const testError = new Error('Test error') + mockDb.getOnlineFriends.mockRejectedValue(testError) + + const generator = subscribeToFriendConnectivityUpdates({} as Empty, rpcContext) + + await expect(generator.next()).rejects.toThrow(testError) + }) + + it('should properly clean up subscription on return', async () => { + mockHandler.mockImplementationOnce(async function* () { + while (true) { + yield undefined + } + }) + + const generator = subscribeToFriendConnectivityUpdates({} as Empty, rpcContext) + const result = await generator.return(undefined) + + expect(result.done).toBe(true) + }) +}) diff --git a/test/unit/adapters/rpc-server/services/subscribe-to-friend-updates.spec.ts b/test/unit/adapters/rpc-server/services/subscribe-to-friend-updates.spec.ts deleted file mode 100644 index e85c304..0000000 --- a/test/unit/adapters/rpc-server/services/subscribe-to-friend-updates.spec.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { Empty } from '@dcl/protocol/out-js/google/protobuf/empty.gen' -import { Friend, RpcServerContext } from '../../../../../src/types' -import { mockLogs, mockArchipelagoStats, mockDb } from '../../../../mocks/components' -import { subscribeToFriendUpdatesService } from '../../../../../src/adapters/rpc-server/services/subscribe-to-friend-updates' -import { ConnectivityStatus } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' - -describe('subscribeToFriendStatusService', () => { - let subscribeToFriendUpdates: ReturnType - let rpcContext: RpcServerContext - - beforeEach(() => { - subscribeToFriendUpdates = subscribeToFriendUpdatesService({ - components: { - logs: mockLogs, - db: mockDb, - archipelagoStats: mockArchipelagoStats - } - }) - - rpcContext = { - address: '0x123', - subscribers: {} - } - - mockDb.streamOnlineFriends.mockImplementationOnce(async function* () { - yield { address: '0x456' } - }) - }) - - it('should get initial online friends from archipelago stats', async () => { - mockArchipelagoStats.getPeers.mockResolvedValue(['0x456', '0x789']) - - const generator = subscribeToFriendUpdates({} as Empty, rpcContext) - const result = await generator.next() - - expect(mockArchipelagoStats.getPeersFromCache).toHaveBeenCalled() - expect(result.value).toEqual({ - user: { address: '0x456' }, - status: ConnectivityStatus.ONLINE - }) - }) - - it('should add the status subscriber to context', async () => { - const generator = subscribeToFriendUpdates({} as Empty, rpcContext) - generator.next() - - expect(rpcContext.subscribers['0x123']).toBeDefined() - generator.return(undefined) - }) - - it.todo('should yield parsed updates when an update is emitted') - it.todo('should skip unparsable updates') -}) diff --git a/test/unit/adapters/rpc-server/services/subscribe-to-friendship-updates.spec.ts b/test/unit/adapters/rpc-server/services/subscribe-to-friendship-updates.spec.ts index 5ce171e..c4958e8 100644 --- a/test/unit/adapters/rpc-server/services/subscribe-to-friendship-updates.spec.ts +++ b/test/unit/adapters/rpc-server/services/subscribe-to-friendship-updates.spec.ts @@ -1,16 +1,29 @@ import { subscribeToFriendshipUpdatesService } from '../../../../../src/adapters/rpc-server/services/subscribe-to-friendship-updates' import { Empty } from '@dcl/protocol/out-js/google/protobuf/empty.gen' -import { RpcServerContext, AppComponents } from '../../../../../src/types' -import { mockLogs } from '../../../../mocks/components' +import { Action, RpcServerContext } from '../../../../../src/types' +import { mockCatalystClient, mockConfig, mockLogs } from '../../../../mocks/components' +import { createMockProfile, PROFILE_IMAGES_URL } from '../../../../mocks/profile' +import { handleSubscriptionUpdates } from '../../../../../src/logic/updates' +import { parseProfileToFriend } from '../../../../../src/logic/friends' + +jest.mock('../../../../../src/logic/updates') describe('subscribeToFriendshipUpdatesService', () => { - let components: Pick - let subscribeToUpdates: ReturnType + let subscribeToFriendshipUpdates: Awaited> let rpcContext: RpcServerContext + const mockFriendProfile = createMockProfile('0x456') + const mockHandler = handleSubscriptionUpdates as jest.Mock + + beforeEach(async () => { + mockConfig.requireString.mockResolvedValue(PROFILE_IMAGES_URL) - beforeEach(() => { - components = { logs: mockLogs } - subscribeToUpdates = subscribeToFriendshipUpdatesService({ components }) + subscribeToFriendshipUpdates = await subscribeToFriendshipUpdatesService({ + components: { + logs: mockLogs, + config: mockConfig, + catalystClient: mockCatalystClient + } + }) rpcContext = { address: '0x123', @@ -18,16 +31,54 @@ describe('subscribeToFriendshipUpdatesService', () => { } }) - it('should add the subscriber to the context', async () => { - const generator = subscribeToUpdates({} as Empty, rpcContext) - generator.next() + it('should handle subscription updates', async () => { + const mockUpdate = { + id: '1', + from: '0x123', + to: '0x456', + action: Action.REQUEST, + timestamp: Date.now() + } + + mockHandler.mockImplementationOnce(async function* () { + yield { + friend: parseProfileToFriend(mockFriendProfile, PROFILE_IMAGES_URL), + action: mockUpdate.action, + createdAt: mockUpdate.timestamp + } + }) + + const generator = subscribeToFriendshipUpdates({} as Empty, rpcContext) + const result = await generator.next() - expect(rpcContext.subscribers['0x123']).toBeDefined() + expect(result.value).toEqual({ + friend: parseProfileToFriend(mockFriendProfile, PROFILE_IMAGES_URL), + action: mockUpdate.action, + createdAt: mockUpdate.timestamp + }) + expect(result.done).toBe(false) + }) + + it('should handle errors during subscription', async () => { + const testError = new Error('Test error') + mockHandler.mockImplementationOnce(async function* () { + throw testError + }) - // Properly clean up the generator - generator.return(undefined) + const generator = subscribeToFriendshipUpdates({} as Empty, rpcContext) + await expect(generator.next()).rejects.toThrow(testError) }) - it.todo('should yield parsed updates when an update is emitted') - it.todo('should skip unparsable updates') + it('should properly clean up subscription on return', async () => { + mockHandler.mockImplementationOnce(async function* () { + while (true) { + yield undefined + } + }) + + const generator = subscribeToFriendshipUpdates({} as Empty, rpcContext) + const result = await generator.return(undefined) + + expect(result.done).toBe(true) + }) }) diff --git a/test/unit/adapters/rpc-server/services/upsert-friendship.spec.ts b/test/unit/adapters/rpc-server/services/upsert-friendship.spec.ts index d3ac367..2718d57 100644 --- a/test/unit/adapters/rpc-server/services/upsert-friendship.spec.ts +++ b/test/unit/adapters/rpc-server/services/upsert-friendship.spec.ts @@ -1,19 +1,22 @@ -import { mockDb, mockLogs, mockPubSub } from '../../../../mocks/components' +import { mockCatalystClient, mockConfig, mockDb, mockLogs, mockPubSub } from '../../../../mocks/components' import { upsertFriendshipService } from '../../../../../src/adapters/rpc-server/services/upsert-friendship' -import { Action, FriendshipStatus, RpcServerContext, AppComponents } from '../../../../../src/types' +import { Action, FriendshipStatus, RpcServerContext } from '../../../../../src/types' import * as FriendshipsLogic from '../../../../../src/logic/friendships' import { UpsertFriendshipPayload, UpsertFriendshipResponse } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' -import { ParsedUpsertFriendshipRequest } from '../../../../../src/logic/friendships' +import { + ParsedUpsertFriendshipRequest, + parseFriendshipRequestToFriendshipRequestResponse +} from '../../../../../src/logic/friendships' import { FRIENDSHIP_UPDATES_CHANNEL } from '../../../../../src/adapters/pubsub' +import { mockProfile, PROFILE_IMAGES_URL } from '../../../../mocks/profile' jest.mock('../../../../../src/logic/friendships') describe('upsertFriendshipService', () => { - let components: jest.Mocked> - let upsertFriendship: ReturnType + let upsertFriendship: Awaited> const rpcContext: RpcServerContext = { address: '0x123', subscribers: undefined } const userAddress = '0x456' @@ -49,9 +52,18 @@ describe('upsertFriendshipService', () => { timestamp: Date.now().toString() } - beforeEach(() => { - components = { db: mockDb, logs: mockLogs, pubsub: mockPubSub } - upsertFriendship = upsertFriendshipService({ components }) + beforeEach(async () => { + mockConfig.requireString.mockResolvedValue(PROFILE_IMAGES_URL) + + upsertFriendship = await upsertFriendshipService({ + components: { + db: mockDb, + logs: mockLogs, + pubsub: mockPubSub, + config: mockConfig, + catalystClient: mockCatalystClient + } + }) mockDb.executeTx.mockImplementation(async (cb) => await cb({} as any)) }) @@ -87,8 +99,12 @@ describe('upsertFriendshipService', () => { jest.spyOn(FriendshipsLogic, 'validateNewFriendshipAction').mockReturnValueOnce(true) jest.spyOn(FriendshipsLogic, 'getNewFriendshipStatus').mockReturnValueOnce(FriendshipStatus.Friends) - mockDb.getFriendship.mockResolvedValueOnce(existingFriendship) - mockDb.getLastFriendshipAction.mockResolvedValueOnce(lastFriendshipAction) + mockDb.getLastFriendshipActionByUsers.mockResolvedValueOnce(lastFriendshipAction) + mockDb.updateFriendshipStatus.mockResolvedValueOnce({ + id: existingFriendship.id, + created_at: new Date(existingFriendship.created_at) + }) + mockCatalystClient.getEntityByPointer.mockResolvedValueOnce(mockProfile) const result: UpsertFriendshipResponse = await upsertFriendship(mockRequest, rpcContext) @@ -103,10 +119,15 @@ describe('upsertFriendshipService', () => { expect(result).toEqual({ response: { $case: 'accepted', - accepted: { - id: existingFriendship.id, - createdAt: expect.any(Number) - } + accepted: parseFriendshipRequestToFriendshipRequestResponse( + { + id: lastFriendshipAction.id, + timestamp: lastFriendshipAction.timestamp, + metadata: mockParsedRequest.metadata + }, + mockProfile, + PROFILE_IMAGES_URL + ) } }) }) @@ -121,6 +142,7 @@ describe('upsertFriendshipService', () => { id: 'new-friendship-id', created_at: new Date() }) + mockCatalystClient.getEntityByPointer.mockResolvedValueOnce(mockProfile) const result: UpsertFriendshipResponse = await upsertFriendship(mockRequest, rpcContext) @@ -135,10 +157,15 @@ describe('upsertFriendshipService', () => { expect(result).toEqual({ response: { $case: 'accepted', - accepted: { - id: 'new-friendship-id', - createdAt: expect.any(Number) - } + accepted: parseFriendshipRequestToFriendshipRequestResponse( + { + id: lastFriendshipAction.id, + timestamp: lastFriendshipAction.timestamp, + metadata: mockParsedRequest.metadata + }, + mockProfile, + PROFILE_IMAGES_URL + ) } }) }) @@ -148,13 +175,16 @@ describe('upsertFriendshipService', () => { jest.spyOn(FriendshipsLogic, 'validateNewFriendshipAction').mockReturnValueOnce(true) jest.spyOn(FriendshipsLogic, 'getNewFriendshipStatus').mockReturnValueOnce(FriendshipStatus.Friends) - mockDb.getFriendship.mockResolvedValueOnce(existingFriendship) - mockDb.getLastFriendshipAction.mockResolvedValueOnce(lastFriendshipAction) + mockDb.getLastFriendshipActionByUsers.mockResolvedValueOnce(lastFriendshipAction) + mockDb.updateFriendshipStatus.mockResolvedValueOnce({ + id: existingFriendship.id, + created_at: new Date(existingFriendship.created_at) + }) mockDb.recordFriendshipAction.mockResolvedValueOnce(lastFriendshipAction.id) const result: UpsertFriendshipResponse = await upsertFriendship(mockRequest, rpcContext) - expect(components.pubsub.publishInChannel).toHaveBeenCalledWith(FRIENDSHIP_UPDATES_CHANNEL, { + expect(mockPubSub.publishInChannel).toHaveBeenCalledWith(FRIENDSHIP_UPDATES_CHANNEL, { id: lastFriendshipAction.id, from: rpcContext.address, to: userAddress, @@ -167,7 +197,7 @@ describe('upsertFriendshipService', () => { it('should handle errors gracefully', async () => { jest.spyOn(FriendshipsLogic, 'parseUpsertFriendshipRequest').mockReturnValueOnce(mockParsedRequest) - mockDb.getFriendship.mockImplementationOnce(() => { + mockDb.getLastFriendshipActionByUsers.mockImplementationOnce(() => { throw new Error('Database error') }) diff --git a/test/unit/logic/friendships.spec.ts b/test/unit/logic/friendships.spec.ts index 793414c..209bc74 100644 --- a/test/unit/logic/friendships.spec.ts +++ b/test/unit/logic/friendships.spec.ts @@ -4,7 +4,7 @@ import { isFriendshipActionValid, isUserActionValid, parseEmittedUpdateToFriendshipUpdate, - parseEmittedUpdateToFriendStatusUpdate, + parseEmittedUpdateToFriendConnectivityUpdate, parseFriendshipRequestsToFriendshipRequestResponses, parseFriendshipRequestToFriendshipRequestResponse, parseUpsertFriendshipRequest, @@ -16,7 +16,8 @@ import { FriendshipStatus as FriendshipRequestStatus } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' import { createMockExpectedFriendshipRequest, createMockFriendshipRequest } from '../../mocks/friendship-request' -import { createMockProfile } from '../../mocks/profile' +import { createMockProfile, mockProfile, PROFILE_IMAGES_URL } from '../../mocks/profile' +import { parseProfileToFriend } from '../../../src/logic/friends' describe('isFriendshipActionValid()', () => { test('it should be valid if from is null and to is REQUEST ', () => { @@ -382,47 +383,51 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { test('it should parse REQUEST update properly', () => { const now = Date.now() expect( - parseEmittedUpdateToFriendshipUpdate({ - id, - action: Action.REQUEST, - timestamp: now, - from: '0xA', - to: '0xB' - }) + parseEmittedUpdateToFriendshipUpdate( + { + id, + action: Action.REQUEST, + timestamp: now, + from: '0xA', + to: '0xB' + }, + mockProfile, + PROFILE_IMAGES_URL + ) ).toEqual({ update: { $case: 'request', request: { id, createdAt: now, - user: { - address: '0xA' - }, + friend: parseProfileToFriend(mockProfile, PROFILE_IMAGES_URL), message: undefined } } }) expect( - parseEmittedUpdateToFriendshipUpdate({ - id, - action: Action.REQUEST, - timestamp: now, - from: '0xA', - to: '0xB', - metadata: { - message: 'Hi!' - } - }) + parseEmittedUpdateToFriendshipUpdate( + { + id, + action: Action.REQUEST, + timestamp: now, + from: '0xA', + to: '0xB', + metadata: { + message: 'Hi!' + } + }, + mockProfile, + PROFILE_IMAGES_URL + ) ).toEqual({ update: { $case: 'request', request: { id, createdAt: now, - user: { - address: '0xA' - }, + friend: parseProfileToFriend(mockProfile, PROFILE_IMAGES_URL), message: 'Hi!' } } @@ -432,13 +437,17 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { test('it should parse CANCEL update properly', () => { const now = Date.now() expect( - parseEmittedUpdateToFriendshipUpdate({ - id, - action: Action.CANCEL, - timestamp: now, - from: '0xA', - to: '0xB' - }) + parseEmittedUpdateToFriendshipUpdate( + { + id, + action: Action.CANCEL, + timestamp: now, + from: '0xA', + to: '0xB' + }, + mockProfile, + PROFILE_IMAGES_URL + ) ).toEqual({ update: { $case: 'cancel', @@ -454,13 +463,17 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { test('it should parse DELETE update properly', () => { const now = Date.now() expect( - parseEmittedUpdateToFriendshipUpdate({ - id, - action: Action.DELETE, - timestamp: now, - from: '0xA', - to: '0xB' - }) + parseEmittedUpdateToFriendshipUpdate( + { + id, + action: Action.DELETE, + timestamp: now, + from: '0xA', + to: '0xB' + }, + mockProfile, + PROFILE_IMAGES_URL + ) ).toEqual({ update: { $case: 'delete', @@ -476,13 +489,17 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { test('it should parse REJECT update properly', () => { const now = Date.now() expect( - parseEmittedUpdateToFriendshipUpdate({ - id, - action: Action.REJECT, - timestamp: now, - from: '0xA', - to: '0xB' - }) + parseEmittedUpdateToFriendshipUpdate( + { + id, + action: Action.REJECT, + timestamp: now, + from: '0xA', + to: '0xB' + }, + mockProfile, + PROFILE_IMAGES_URL + ) ).toEqual({ update: { $case: 'reject', @@ -498,13 +515,17 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { test('it should parse ACCEPT update properly', () => { const now = Date.now() expect( - parseEmittedUpdateToFriendshipUpdate({ - id, - action: Action.ACCEPT, - timestamp: now, - from: '0xA', - to: '0xB' - }) + parseEmittedUpdateToFriendshipUpdate( + { + id, + action: Action.ACCEPT, + timestamp: now, + from: '0xA', + to: '0xB' + }, + mockProfile, + PROFILE_IMAGES_URL + ) ).toEqual({ update: { $case: 'accept', @@ -520,13 +541,17 @@ describe('parseEmittedUpdateToFriendshipUpdate()', () => { test('it should return null', () => { const now = Date.now() expect( - parseEmittedUpdateToFriendshipUpdate({ - id, - action: 'whaterver' as Action, - timestamp: now, - from: '0xA', - to: '0xB' - }) + parseEmittedUpdateToFriendshipUpdate( + { + id, + action: 'whaterver' as Action, + timestamp: now, + from: '0xA', + to: '0xB' + }, + mockProfile, + PROFILE_IMAGES_URL + ) ).toBe(null) }) }) @@ -560,11 +585,15 @@ describe('getFriendshipRequestStatus()', () => { }) }) -describe('parseEmittedUpdateToFriendStatusUpdate()', () => { - test('it should parse ONLINE update properly', () => { - expect(parseEmittedUpdateToFriendStatusUpdate({ address: '0x123', status: ConnectivityStatus.ONLINE })).toEqual({ - user: { address: '0x123' }, - status: ConnectivityStatus.ONLINE +describe('parseEmittedUpdateToFriendConnectivityUpdate()', () => { + test.each([ + [ConnectivityStatus.OFFLINE, 'offline'], + [ConnectivityStatus.ONLINE, 'online'] + ])('it should parse status %s update properly', (status) => { + const update = { address: '0x123', status } + expect(parseEmittedUpdateToFriendConnectivityUpdate(update, mockProfile, PROFILE_IMAGES_URL)).toEqual({ + friend: parseProfileToFriend(mockProfile, PROFILE_IMAGES_URL), + status }) }) }) @@ -573,9 +602,8 @@ describe('parseFriendshipRequestToFriendshipRequestResponse()', () => { test('it should parse friendship request to friendship request response', () => { const request = createMockFriendshipRequest('id1', '0x456', '2025-01-01T00:00:00Z') const profile = createMockProfile('0x456') - const profileImagesUrl = 'https://profile-images.decentraland.org' - expect(parseFriendshipRequestToFriendshipRequestResponse(request, profile, profileImagesUrl)).toEqual( + expect(parseFriendshipRequestToFriendshipRequestResponse(request, profile, PROFILE_IMAGES_URL)).toEqual( createMockExpectedFriendshipRequest('id1', '0x456', profile, '2025-01-01T00:00:00Z', '') ) }) @@ -585,9 +613,8 @@ describe('parseFriendshipRequestsToFriendshipRequestResponses()', () => { test('it should parse friendship requests to friendship request responses', () => { const requests = [createMockFriendshipRequest('id1', '0x456', '2025-01-01T00:00:00Z')] const profiles = [createMockProfile('0x456')] - const profileImagesUrl = 'https://profile-images.decentraland.org' - expect(parseFriendshipRequestsToFriendshipRequestResponses(requests, profiles, profileImagesUrl)).toEqual([ + expect(parseFriendshipRequestsToFriendshipRequestResponses(requests, profiles, PROFILE_IMAGES_URL)).toEqual([ createMockExpectedFriendshipRequest('id1', '0x456', profiles[0], '2025-01-01T00:00:00Z', '') ]) }) @@ -595,8 +622,7 @@ describe('parseFriendshipRequestsToFriendshipRequestResponses()', () => { test('it should return an empty array if the requester/requested address is not found in the profiles', () => { const requests = [createMockFriendshipRequest('id1', '0x456', '2025-01-01T00:00:00Z')] const profiles = [createMockProfile('0x789')] - const profileImagesUrl = 'https://profile-images.decentraland.org' - expect(parseFriendshipRequestsToFriendshipRequestResponses(requests, profiles, profileImagesUrl)).toEqual([]) + expect(parseFriendshipRequestsToFriendshipRequestResponses(requests, profiles, PROFILE_IMAGES_URL)).toEqual([]) }) }) diff --git a/test/unit/logic/updates.spec.ts b/test/unit/logic/updates.spec.ts new file mode 100644 index 0000000..d5dfa51 --- /dev/null +++ b/test/unit/logic/updates.spec.ts @@ -0,0 +1,290 @@ +import { + friendshipUpdateHandler, + friendConnectivityUpdateHandler, + handleSubscriptionUpdates, + ILogger +} from '../../../src/logic/updates' +import { ConnectivityStatus } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen' +import { mockCatalystClient, mockDb, mockLogs } from '../../mocks/components' +import mitt, { Emitter } from 'mitt' +import { Action, RpcServerContext, SubscriptionEventsEmitter } from '../../../src/types' +import { sleep } from '../../../src/utils/timer' +import { mockProfile, PROFILE_IMAGES_URL } from '../../mocks/profile' + +describe('updates handlers', () => { + const logger = mockLogs.getLogger('test') + let sharedContext: RpcServerContext + + beforeEach(() => { + sharedContext = { + address: '0x123', + subscribers: { + '0x456': mitt(), + '0x789': mitt() + } + } + }) + + describe('friendshipUpdateHandler', () => { + it('should emit friendship update to the correct subscriber', () => { + const handler = friendshipUpdateHandler(sharedContext, logger) + const emitSpy = jest.spyOn(sharedContext.subscribers['0x456'], 'emit') + + const update = { + id: 'update-1', + from: '0x123', + to: '0x456', + action: Action.REQUEST, + timestamp: Date.now(), + metadata: { message: 'Hello!' } + } + + handler(JSON.stringify(update)) + + expect(emitSpy).toHaveBeenCalledWith('friendshipUpdate', update) + }) + + it('should not emit if subscriber does not exist', () => { + const handler = friendshipUpdateHandler(sharedContext, logger) + const nonExistentUpdate = { + id: 'update-1', + from: '0x123', + to: '0xNONEXISTENT', + action: Action.REQUEST, + timestamp: Date.now() + } + + expect(handler(JSON.stringify(nonExistentUpdate))).resolves.toBeUndefined() + }) + + it('should log error on invalid JSON', () => { + const handler = friendshipUpdateHandler(sharedContext, logger) + const errorSpy = jest.spyOn(logger, 'error') + + handler('invalid json') + + expect(errorSpy).toHaveBeenCalledWith( + expect.stringContaining('Error handling update:'), + expect.objectContaining({ message: 'invalid json' }) + ) + }) + }) + + describe('friendConnectivityUpdateHandler', () => { + it('should emit status update to all online friends', async () => { + const handler = friendConnectivityUpdateHandler(sharedContext, logger, mockDb) + const emitSpy456 = jest.spyOn(sharedContext.subscribers['0x456'], 'emit') + const emitSpy789 = jest.spyOn(sharedContext.subscribers['0x789'], 'emit') + + const onlineFriends = [{ address: '0x456' }, { address: '0x789' }] + mockDb.getOnlineFriends.mockResolvedValueOnce(onlineFriends) + + const update = { + address: '0x123', + status: ConnectivityStatus.ONLINE + } + + await handler(JSON.stringify(update)) + + expect(mockDb.getOnlineFriends).toHaveBeenCalledWith('0x123', ['0x456', '0x789']) + expect(emitSpy456).toHaveBeenCalledWith('friendConnectivityUpdate', update) + expect(emitSpy789).toHaveBeenCalledWith('friendConnectivityUpdate', update) + }) + + it('should handle empty online friends list', async () => { + const handler = friendConnectivityUpdateHandler(sharedContext, logger, mockDb) + mockDb.getOnlineFriends.mockResolvedValueOnce([]) + + const update = { + address: '0x123', + status: ConnectivityStatus.ONLINE + } + + await handler(JSON.stringify(update)) + + expect(mockDb.getOnlineFriends).toHaveBeenCalled() + }) + + it('should log error on invalid JSON', async () => { + const handler = friendConnectivityUpdateHandler(sharedContext, logger, mockDb) + const errorSpy = jest.spyOn(logger, 'error') + + await handler('invalid json') + + expect(errorSpy).toHaveBeenCalledWith( + expect.stringContaining('Error handling update:'), + expect.objectContaining({ message: 'invalid json' }) + ) + }) + + it('should handle database errors gracefully', async () => { + const handler = friendConnectivityUpdateHandler(sharedContext, logger, mockDb) + const errorSpy = jest.spyOn(logger, 'error') + const error = new Error('Database error') + + mockDb.getOnlineFriends.mockRejectedValueOnce(error) + + const update = { + address: '0x123', + status: ConnectivityStatus.ONLINE + } + + await handler(JSON.stringify(update)) + + expect(errorSpy).toHaveBeenCalledWith( + expect.stringContaining('Error handling update:'), + expect.objectContaining({ + error, + message: JSON.stringify(update) + }) + ) + }) + }) + + describe('handleSubscriptionUpdates', () => { + let eventEmitter: Emitter + let logger: ILogger + let parser: jest.Mock + let rpcContext: RpcServerContext + + const friendshipUpdate = { id: '1', to: '0x456', from: '0x123', action: Action.REQUEST, timestamp: Date.now() } + + beforeEach(() => { + eventEmitter = mitt() + logger = mockLogs.getLogger('test') + parser = jest.fn() + mockCatalystClient.getEntityByPointer.mockResolvedValue(mockProfile) + + rpcContext = { + address: '0x123', + subscribers: {} + } + }) + + it('should create and store emitter in context if not exists', async () => { + parser.mockResolvedValueOnce({ parsed: true }) + + const generator = handleSubscriptionUpdates({ + rpcContext, + eventName: 'friendshipUpdate', + components: { + catalystClient: mockCatalystClient, + logger + }, + getAddressFromUpdate: (update: SubscriptionEventsEmitter['friendshipUpdate']) => update.to, + parser, + parseArgs: [PROFILE_IMAGES_URL] + }) + + // Start consuming the generator + const resultPromise = generator.next() + + // Verify emitter was created and stored + expect(rpcContext.subscribers['0x123']).toBeDefined() + expect(rpcContext.subscribers['0x123'].all).toBeDefined() + + // Emit event using the stored emitter + rpcContext.subscribers['0x123'].emit('friendshipUpdate', friendshipUpdate) + + const result = await resultPromise + expect(result.value).toEqual({ parsed: true }) + }) + + it('should reuse existing emitter from context', async () => { + const existingEmitter = mitt() + rpcContext.subscribers['0x123'] = existingEmitter + parser.mockResolvedValueOnce({ parsed: true }) + + const generator = handleSubscriptionUpdates({ + rpcContext, + eventName: 'friendshipUpdate', + components: { + catalystClient: mockCatalystClient, + logger + }, + getAddressFromUpdate: (update: SubscriptionEventsEmitter['friendshipUpdate']) => update.to, + parser, + parseArgs: [PROFILE_IMAGES_URL] + }) + + // Start consuming the generator + const resultPromise = generator.next() + + // Verify the existing emitter is being used + expect(rpcContext.subscribers['0x123']).toBe(existingEmitter) + + // Emit event using the existing emitter + existingEmitter.emit('friendshipUpdate', friendshipUpdate) + + const result = await resultPromise + expect(result.value).toEqual({ parsed: true }) + }) + + it('should yield parsed updates', async () => { + parser.mockResolvedValueOnce({ parsed: true }) + + const generator = handleSubscriptionUpdates({ + rpcContext, + eventName: 'friendshipUpdate', + components: { + catalystClient: mockCatalystClient, + logger + }, + getAddressFromUpdate: (update: SubscriptionEventsEmitter['friendshipUpdate']) => update.to, + parser, + parseArgs: [PROFILE_IMAGES_URL] + }) + + const resultPromise = generator.next() + rpcContext.subscribers['0x123'].emit('friendshipUpdate', friendshipUpdate) + + const result = await resultPromise + expect(result.value).toEqual({ parsed: true }) + expect(parser).toHaveBeenCalledWith(friendshipUpdate, mockProfile, PROFILE_IMAGES_URL) + }) + + it('should yield multiple updates', async () => { + const generator = handleSubscriptionUpdates({ + rpcContext, + eventName: 'friendshipUpdate', + components: { + catalystClient: mockCatalystClient, + logger + }, + getAddressFromUpdate: (update: SubscriptionEventsEmitter['friendshipUpdate']) => update.to, + parser, + parseArgs: [PROFILE_IMAGES_URL] + }) + + for (let i = 0; i < 2; i++) { + parser.mockResolvedValueOnce({ parsed: i }) + const resultPromise = generator.next() + rpcContext.subscribers['0x123'].emit('friendshipUpdate', friendshipUpdate) + const result = await resultPromise + expect(result.value).toEqual({ parsed: i }) + expect(parser).toHaveBeenCalledWith(friendshipUpdate, mockProfile, PROFILE_IMAGES_URL) + } + }) + + it('should log error if parser returns null', async () => { + parser.mockResolvedValueOnce(null) + const generator = handleSubscriptionUpdates({ + rpcContext, + eventName: 'friendshipUpdate', + components: { catalystClient: mockCatalystClient, logger }, + getAddressFromUpdate: (update: SubscriptionEventsEmitter['friendshipUpdate']) => update.to, + parser, + parseArgs: [PROFILE_IMAGES_URL] + }) + + const resultPromise = generator.next() + rpcContext.subscribers['0x123'].emit('friendshipUpdate', friendshipUpdate) + + await sleep(100) // could be flaky + + expect(logger.error).toHaveBeenCalledWith('Unable to parse friendshipUpdate:', { + update: JSON.stringify(friendshipUpdate) + }) + }) + }) +}) diff --git a/test/unit/peer-tracking.spec.ts b/test/unit/peer-tracking.spec.ts index 5e36530..938108d 100644 --- a/test/unit/peer-tracking.spec.ts +++ b/test/unit/peer-tracking.spec.ts @@ -7,13 +7,12 @@ describe('PeerTrackingComponent', () => { let peerTracking: IPeerTrackingComponent beforeEach(() => { - jest.clearAllMocks() peerTracking = createPeerTrackingComponent({ logs: mockLogs, nats: mockNats, pubsub: mockPubSub }) }) describe('start', () => { it('should subscribe to all peer status patterns', async () => { - await peerTracking.start({} as any) + await peerTracking.subscribeToPeerStatusUpdates() const subscriptions = peerTracking.getSubscriptions() expect(subscriptions.size).toBe(PEER_STATUS_HANDLERS.length) @@ -27,7 +26,7 @@ describe('PeerTrackingComponent', () => { describe('stop', () => { it('should unsubscribe and clear all subscriptions', async () => { - await peerTracking.start({} as any) + await peerTracking.subscribeToPeerStatusUpdates() await peerTracking.stop() const subscriptions = peerTracking.getSubscriptions() @@ -38,7 +37,7 @@ describe('PeerTrackingComponent', () => { describe('message handling', () => { PEER_STATUS_HANDLERS.forEach((handler) => { it(`should handle ${handler.event} messages correctly`, async () => { - await peerTracking.start({} as any) + await peerTracking.subscribeToPeerStatusUpdates() const messageHandler = mockNats.subscribe.mock.calls.find((call) => call[0] === handler.pattern)?.[1] @@ -56,7 +55,7 @@ describe('PeerTrackingComponent', () => { }) it(`should handle ${handler.event} message errors`, async () => { - await peerTracking.start({} as any) + await peerTracking.subscribeToPeerStatusUpdates() const messageHandler = mockNats.subscribe.mock.calls.find((call) => call[0] === handler.pattern)?.[1] diff --git a/test/unit/utils/retrier.spec.ts b/test/unit/utils/retrier.spec.ts index b5b0597..e56690a 100644 --- a/test/unit/utils/retrier.spec.ts +++ b/test/unit/utils/retrier.spec.ts @@ -9,10 +9,6 @@ describe('retry', () => { const mockAction = jest.fn() const mockSleep = sleep as jest.MockedFunction - beforeEach(() => { - jest.clearAllMocks() - }) - it('should return result on the first attempt without retrying', async () => { mockAction.mockResolvedValue('success') diff --git a/test/unit/utils/wsUserData.spec.ts b/test/unit/utils/wsUserData.spec.ts index fca803e..1e5a067 100644 --- a/test/unit/utils/wsUserData.spec.ts +++ b/test/unit/utils/wsUserData.spec.ts @@ -1,4 +1,4 @@ -import { WsUserData, WsNotAuthenticatedUserData } from '../../../src/types' +import { WsUserData } from '../../../src/types' import { isNotAuthenticated } from '../../../src/utils/wsUserData' import { IUWebSocketEventMap } from '../../../src/utils/UWebSocketTransport' import { Emitter } from 'mitt' diff --git a/yarn.lock b/yarn.lock index af9232a..d4fced5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -354,18 +354,18 @@ "@well-known-components/fetch-component" "^2.0.2" "@well-known-components/interfaces" "^1.4.2" -"@dcl/protocol@https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12916006744.commit-ba473a8.tgz": - version "1.0.0-12916006744.commit-ba473a8" - resolved "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12916006744.commit-ba473a8.tgz#23f50b793eee33ee1c43d433b5f5d63d90f1d4c5" - dependencies: - "@dcl/ts-proto" "1.154.0" - "@dcl/protocol@https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12916692077.commit-190ed21.tgz": version "1.0.0-12916692077.commit-190ed21" resolved "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12916692077.commit-190ed21.tgz#32741f275d43b610db3ffcc702381672cfa9acbc" dependencies: "@dcl/ts-proto" "1.154.0" +"@dcl/protocol@https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12933824416.commit-ed8c3aa.tgz": + version "1.0.0-12933824416.commit-ed8c3aa" + resolved "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12933824416.commit-ed8c3aa.tgz#68fcc57917b601b0d9f5d257215a7bf51a1dc9a6" + dependencies: + "@dcl/ts-proto" "1.154.0" + "@dcl/rpc@^1.1.2": version "1.1.2" resolved "https://registry.yarnpkg.com/@dcl/rpc/-/rpc-1.1.2.tgz#789f4f24c8d432a48df3e786b77d017883dda11a"