Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Send profile info on upsert and friendship updates #39

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
NATS_URL=localhost:4222
CATALYST_CONTENT_URL_LOADBALANCER=http://peer.decentraland.local/
PROFILE_IMAGES_URL=https://profile-images.decentraland.local
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
},
"dependencies": {
"@dcl/platform-crypto-middleware": "^1.1.0",
"@dcl/protocol": "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12916692077.commit-190ed21.tgz",
"@dcl/protocol": "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12933824416.commit-ed8c3aa.tgz",
"@dcl/rpc": "^1.1.2",
"@dcl/schemas": "^15.6.0",
"@well-known-components/env-config-provider": "^1.2.0",
Expand Down
16 changes: 13 additions & 3 deletions src/adapters/catalyst-client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Entity } from '@dcl/schemas'
import { createContentClient, ContentClient } from 'dcl-catalyst-client'
import { getCatalystServersFromCache } from 'dcl-catalyst-client/dist/contracts-snapshots'
import { AppComponents, ICatalystClient, ICatalystClientRequestOptions } from '../types'
import { AppComponents, ICatalystClientComponent, ICatalystClientRequestOptions } from '../types'
import { retry } from '../utils/retrier'
import { shuffleArray } from '../utils/array'

Expand All @@ -11,7 +11,7 @@ const L1_TESTNET = 'sepolia'
export async function createCatalystClient({
fetcher,
config
}: Pick<AppComponents, 'fetcher' | 'config'>): Promise<ICatalystClient> {
}: Pick<AppComponents, 'fetcher' | 'config'>): Promise<ICatalystClientComponent> {
const loadBalancer = await config.requireString('CATALYST_CONTENT_URL_LOADBALANCER')
const contractNetwork = (await config.getString('ENV')) === 'prod' ? L1_MAINNET : L1_TESTNET

Expand Down Expand Up @@ -53,5 +53,15 @@ export async function createCatalystClient({
return retry(executeClientRequest, retries, waitTime)
}

return { getEntitiesByPointers }
async function getEntityByPointer(pointer: string, options: ICatalystClientRequestOptions = {}): Promise<Entity> {
const [entity] = await getEntitiesByPointers([pointer], options)

if (!entity) {
throw new Error(`Entity not found for pointer ${pointer}`)
}

return entity
}

return { getEntitiesByPointers, getEntityByPointer }
}
20 changes: 15 additions & 5 deletions src/adapters/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
},
async getLastFriendshipActionByUsers(loggedUser: string, friendUser: string) {
const query = SQL`
SELECT fa.action, fa.acting_user as by
SELECT fa.*
FROM friendships f
INNER JOIN friendship_actions fa ON f.id = fa.friendship_id
WHERE (f.address_requester, f.address_requested) IN ((${loggedUser}, ${friendUser}), (${friendUser}, ${loggedUser}))
Expand All @@ -235,7 +235,9 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>

const {
rows: [{ id, created_at }]
} = txClient ? await txClient.query(query) : await pg.query(query)
} = txClient
? await txClient.query<{ id: string; created_at: Date }>(query)
: await pg.query<{ id: string; created_at: Date }>(query)

return {
id,
Expand All @@ -244,10 +246,18 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
},
async updateFriendshipStatus(friendshipId, isActive, txClient) {
logger.debug(`updating ${friendshipId} - ${isActive}`)
const query = SQL`UPDATE friendships SET is_active = ${isActive}, updated_at = now() WHERE id = ${friendshipId}`
const query = SQL`UPDATE friendships SET is_active = ${isActive}, updated_at = now() WHERE id = ${friendshipId} RETURNING id, created_at`

const results = txClient ? await txClient?.query(query) : await pg.query(query)
return results.rowCount === 1
const {
rows: [{ id, created_at }]
} = txClient
? await txClient.query<{ id: string; created_at: Date }>(query)
: await pg.query<{ id: string; created_at: Date }>(query)

return {
id,
created_at
}
},
async recordFriendshipAction(friendshipId, actingUser, action, metadata, txClient) {
const uuid = randomUUID()
Expand Down
12 changes: 9 additions & 3 deletions src/adapters/peer-tracking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,16 @@ export function createPeerTrackingComponent({
}

return {
async start() {
async subscribeToPeerStatusUpdates() {
PEER_STATUS_HANDLERS.forEach((handler) => {
const subscription = nats.subscribe(handler.pattern, createMessageHandler(handler))
subscriptions.set(handler.event, subscription)
try {
const subscription = nats.subscribe(handler.pattern, createMessageHandler(handler))
subscriptions.set(handler.event, subscription)
} catch (error: any) {
logger.error(`Error subscribing to ${handler.pattern}`, {
error: error.message
})
}
})
},
async stop() {
Expand Down
55 changes: 14 additions & 41 deletions src/adapters/rpc-server/rpc-server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createRpcServer } from '@dcl/rpc'
import { registerService } from '@dcl/rpc/dist/codegen'
import { AppComponents, IRPCServerComponent, RpcServerContext, SubscriptionEventsEmitter } from '../../types'

Check warning on line 3 in src/adapters/rpc-server/rpc-server.ts

View workflow job for this annotation

GitHub Actions / build / validations

'SubscriptionEventsEmitter' is defined but never used. Allowed unused vars must match /^_/u
import { getFriendsService } from './services/get-friends'
import { getMutualFriendsService } from './services/get-mutual-friends'
import { getPendingFriendshipRequestsService } from './services/get-pending-friendship-requests'
Expand All @@ -9,8 +9,9 @@
import { SocialServiceDefinition } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen'
import { getSentFriendshipRequestsService } from './services/get-sent-friendship-requests'
import { getFriendshipStatusService } from './services/get-friendship-status'
import { subscribeToFriendUpdatesService } from './services/subscribe-to-friend-updates'
import { subscribeToFriendConnectivityUpdatesService } from './services/subscribe-to-friend-connectivity-updates'
import { FRIEND_STATUS_UPDATES_CHANNEL, FRIENDSHIP_UPDATES_CHANNEL } from '../pubsub'
import { friendshipUpdateHandler, friendConnectivityUpdateHandler } from '../../logic/updates'

export async function createRpcServerComponent({
logs,
Expand Down Expand Up @@ -45,45 +46,14 @@
const getSentFriendshipRequests = await getSentFriendshipRequestsService({
components: { logs, db, catalystClient, config }
})
const upsertFriendship = upsertFriendshipService({ components: { logs, db, pubsub } })
const upsertFriendship = await upsertFriendshipService({ components: { logs, db, pubsub, config, catalystClient } })
const getFriendshipStatus = getFriendshipStatusService({ components: { logs, db } })
const subscribeToFriendshipUpdates = subscribeToFriendshipUpdatesService({ components: { logs } })
const subscribeToFriendUpdates = subscribeToFriendUpdatesService({
components: { logs, db, archipelagoStats }
const subscribeToFriendshipUpdates = await subscribeToFriendshipUpdatesService({
components: { logs, config, catalystClient }
})
const subscribeToFriendConnectivityUpdates = await subscribeToFriendConnectivityUpdatesService({
components: { logs, db, archipelagoStats, config, catalystClient }
})

function handleFriendshipUpdate(message: string) {
try {
const update = JSON.parse(message) as SubscriptionEventsEmitter['friendshipUpdate']
const updateEmitter = SHARED_CONTEXT.subscribers[update.to]
if (updateEmitter) {
updateEmitter.emit('friendshipUpdate', update)
}
} catch (error: any) {
logger.error(`Error handling friendship update: ${error.message}`, {
message
})
}
}

async function handleFriendStatusUpdate(message: string) {
try {
// TODO: this may be a problem if the user has a lot of friends or there are a lot of users online
const update = JSON.parse(message) as SubscriptionEventsEmitter['friendStatusUpdate']
const friends = await db.getOnlineFriends(update.address, Object.keys(SHARED_CONTEXT.subscribers))

friends.forEach(({ address: friendAddress }) => {
const emitter = SHARED_CONTEXT.subscribers[friendAddress]
if (emitter) {
emitter.emit('friendStatusUpdate', update)
}
})
} catch (error: any) {
logger.error(`Error handling friend status update: ${error.message}`, {
message
})
}
}

rpcServer.setHandler(async function handler(port) {
registerService(port, SocialServiceDefinition, async () => ({
Expand All @@ -94,7 +64,7 @@
getFriendshipStatus,
upsertFriendship,
subscribeToFriendshipUpdates,
subscribeToFriendUpdates
subscribeToFriendConnectivityUpdates
}))
})

Expand All @@ -104,8 +74,11 @@
logger.info(`[RPC] RPC Server listening on port ${rpcServerPort}`)
})

await pubsub.subscribeToChannel(FRIENDSHIP_UPDATES_CHANNEL, handleFriendshipUpdate)
await pubsub.subscribeToChannel(FRIEND_STATUS_UPDATES_CHANNEL, handleFriendStatusUpdate)
await pubsub.subscribeToChannel(FRIENDSHIP_UPDATES_CHANNEL, friendshipUpdateHandler(SHARED_CONTEXT, logger))
await pubsub.subscribeToChannel(
FRIEND_STATUS_UPDATES_CHANNEL,
friendConnectivityUpdateHandler(SHARED_CONTEXT, logger, db)
)
},
attachUser({ transport, address }) {
transport.on('close', () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { Empty } from '@dcl/protocol/out-js/google/protobuf/empty.gen'
import { SubscriptionEventsEmitter, RpcServerContext, RPCServiceContext } from '../../../types'
import {
FriendConnectivityUpdate,
ConnectivityStatus
} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen'
import mitt from 'mitt'
import emitterToAsyncGenerator from '../../../utils/emitterToGenerator'

Check warning on line 8 in src/adapters/rpc-server/services/subscribe-to-friend-connectivity-updates.ts

View workflow job for this annotation

GitHub Actions / build / validations

'emitterToAsyncGenerator' is defined but never used. Allowed unused vars must match /^_/u
import { parseEmittedUpdateToFriendConnectivityUpdate } from '../../../logic/friendships'
import { parseProfileToFriend } from '../../../logic/friends'
import { handleSubscriptionUpdates } from '../../../logic/updates'

export async function subscribeToFriendConnectivityUpdatesService({
components: { logs, db, archipelagoStats, config, catalystClient }
}: RPCServiceContext<'logs' | 'db' | 'archipelagoStats' | 'config' | 'catalystClient'>) {
const logger = logs.getLogger('subscribe-to-friend-connectivity-updates-service')
const profileImagesUrl = await config.requireString('PROFILE_IMAGES_URL')

return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator<FriendConnectivityUpdate> {
try {
const eventEmitter = context.subscribers[context.address] || mitt<SubscriptionEventsEmitter>()

if (!context.subscribers[context.address]) {
context.subscribers[context.address] = eventEmitter
}

const onlinePeers = await archipelagoStats.getPeersFromCache()
const onlineFriends = db.streamOnlineFriends(context.address, onlinePeers)

for await (const friend of onlineFriends) {
// TODO: improve this to avoid fetching the profile for each friend
const profile = await catalystClient.getEntityByPointer(friend.address)
yield {
friend: parseProfileToFriend(profile, profileImagesUrl),
status: ConnectivityStatus.ONLINE
}
}

yield* handleSubscriptionUpdates({
eventEmitter,
eventName: 'friendConnectivityUpdate',
catalystClient,
logger,
parser: parseEmittedUpdateToFriendConnectivityUpdate,
addressGetter: (update: SubscriptionEventsEmitter['friendConnectivityUpdate']) => update.address,
parseArgs: [profileImagesUrl]
})
} catch (error: any) {
logger.error('Error in friend updates subscription:', error)
throw error
}
}
}
49 changes: 0 additions & 49 deletions src/adapters/rpc-server/services/subscribe-to-friend-updates.ts

This file was deleted.

27 changes: 14 additions & 13 deletions src/adapters/rpc-server/services/subscribe-to-friendship-updates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ import { RpcServerContext, RPCServiceContext, SubscriptionEventsEmitter } from '
import { FriendshipUpdate } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen'
import mitt from 'mitt'
import { parseEmittedUpdateToFriendshipUpdate } from '../../../logic/friendships'
import emitterToAsyncGenerator from '../../../utils/emitterToGenerator'
import { handleSubscriptionUpdates } from '../../../logic/updates'

export function subscribeToFriendshipUpdatesService({ components: { logs } }: RPCServiceContext<'logs'>) {
export async function subscribeToFriendshipUpdatesService({
components: { logs, config, catalystClient }
}: RPCServiceContext<'logs' | 'config' | 'catalystClient'>) {
const logger = logs.getLogger('subscribe-to-friendship-updates-service')
const profileImagesUrl = await config.requireString('PROFILE_IMAGES_URL')

return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator<FriendshipUpdate> {
const eventEmitter = context.subscribers[context.address] || mitt<SubscriptionEventsEmitter>()
Expand All @@ -15,16 +18,14 @@ export function subscribeToFriendshipUpdatesService({ components: { logs } }: RP
context.subscribers[context.address] = eventEmitter
}

const updatesGenerator = emitterToAsyncGenerator(eventEmitter, 'friendshipUpdate')

for await (const update of updatesGenerator) {
logger.debug('Friendship update received:', { update: JSON.stringify(update) })
const updateToResponse = parseEmittedUpdateToFriendshipUpdate(update)
if (updateToResponse) {
yield updateToResponse
} else {
logger.error('Unable to parse friendship update: ', { update: JSON.stringify(update) })
}
}
yield* handleSubscriptionUpdates<FriendshipUpdate, SubscriptionEventsEmitter['friendshipUpdate']>({
eventEmitter,
eventName: 'friendshipUpdate',
logger,
catalystClient,
parser: parseEmittedUpdateToFriendshipUpdate,
addressGetter: (update: SubscriptionEventsEmitter['friendshipUpdate']) => update.to,
parseArgs: [profileImagesUrl]
})
}
}
Loading
Loading