Skip to content

Commit

Permalink
feat: Get friends and Get Mutual optimizations and improved responses (
Browse files Browse the repository at this point in the history
…#37)

* refactor: Change logs and return empty lists on get friends errors

* feat: Add Catalyst Client adapter implementation with catalyst rotation and utils

* feat: Get profiles from catalyst and map to friends

* feat: GetMutualFriends also retrieves profiles and improves the response

* fix: Wrong usage of table alias

* feat: Get sent and received friendship requests optimizations (#38)

* chore: seq diag improvements

* feat: Use the profile image url to create the full url

* feat: Get Sent and Received Friendship Requests respond with profile data

* chore: Remove unused imports

* refactor: Try with filter boolean but doesn't work

* refactor: Pick specific props from entity

* chore: Use Avatar type from schemas
  • Loading branch information
kevinszuchet authored Jan 23, 2025
1 parent 19b23c0 commit 0958107
Show file tree
Hide file tree
Showing 39 changed files with 999 additions and 200 deletions.
3 changes: 3 additions & 0 deletions .env.default
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ HTTP_SERVER_HOST=0.0.0.0
WKC_METRICS_RESET_AT_NIGHT=false

NATS_URL=localhost:4222

#CATALYST_CONTENT_URL_LOADBALANCER=https://peer.decentraland.org/
#PROFILE_IMAGES_URL=https://profile-images.decentraland.org
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,12 @@ sequenceDiagram
Note over RPC Server: (accept/cancel/reject/delete)
Note over Client,DB: Friends Lifecycle
NATS-->>Redis: Peer Heartbeat
Redis-->>RPC Server: Friend Status Update
RPC Server->>Redis: Request Cached Peers
NATS-->>Redis: Publish Peer Connection Update Event
Redis-->>RPC Server: Broadcast Friend Status Update Event
RPC Server->>Redis: Get Cached Peers
Redis-->>RPC Server: Cached Peers
RPC Server->>DB: Request Online Friends
DB-->>RPC Server: Online Friends
RPC Server->>DB: Query Online Friends
DB-->>RPC Server: Online Friends
RPC Server-->>Client: Stream Friend Status Updates
Note over RPC Server: (online/offline)
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
},
"dependencies": {
"@dcl/platform-crypto-middleware": "^1.1.0",
"@dcl/protocol": "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12890706635.commit-a7e4210.tgz",
"@dcl/protocol": "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12916692077.commit-190ed21.tgz",
"@dcl/rpc": "^1.1.2",
"@dcl/schemas": "^15.6.0",
"@well-known-components/env-config-provider": "^1.2.0",
"@well-known-components/fetch-component": "^2.0.2",
"@well-known-components/http-server": "^2.1.0",
Expand All @@ -41,6 +42,7 @@
"@well-known-components/nats-component": "^2.0.0",
"@well-known-components/pg-component": "^0.2.2",
"@well-known-components/uws-http-server": "^0.0.2",
"dcl-catalyst-client": "^21.7.0",
"fp-future": "^1.0.1",
"lru-cache": "^10.4.3",
"mitt": "^3.0.1",
Expand Down
57 changes: 57 additions & 0 deletions src/adapters/catalyst-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Entity } from '@dcl/schemas'
import { createContentClient, ContentClient } from 'dcl-catalyst-client'
import { getCatalystServersFromCache } from 'dcl-catalyst-client/dist/contracts-snapshots'
import { AppComponents, ICatalystClient, ICatalystClientRequestOptions } from '../types'
import { retry } from '../utils/retrier'
import { shuffleArray } from '../utils/array'

const L1_MAINNET = 'mainnet'
const L1_TESTNET = 'sepolia'

export async function createCatalystClient({
fetcher,
config
}: Pick<AppComponents, 'fetcher' | 'config'>): Promise<ICatalystClient> {
const loadBalancer = await config.requireString('CATALYST_CONTENT_URL_LOADBALANCER')
const contractNetwork = (await config.getString('ENV')) === 'prod' ? L1_MAINNET : L1_TESTNET

function getContentClientOrDefault(contentServerUrl?: string): ContentClient {
return contentServerUrl
? createContentClient({ fetcher, url: contentServerUrl })
: createContentClient({
fetcher,
url: loadBalancer
})
}

function rotateContentServerClient<T>(
executeClientRequest: (client: ContentClient) => Promise<T>,
contentServerUrl?: string
) {
const catalystServers = shuffleArray(getCatalystServersFromCache(contractNetwork)).map((server) => server.address)
let contentClientToUse: ContentClient = getContentClientOrDefault(contentServerUrl)

return (attempt: number): Promise<T> => {
if (attempt > 1 && catalystServers.length > 0) {
const [catalystServerUrl] = catalystServers.splice(attempt % catalystServers.length, 1)
contentClientToUse = getContentClientOrDefault(catalystServerUrl)
}

return executeClientRequest(contentClientToUse)
}
}

async function getEntitiesByPointers(
pointers: string[],
options: ICatalystClientRequestOptions = {}
): Promise<Entity[]> {
const { retries = 3, waitTime = 300, contentServerUrl } = options
const executeClientRequest = rotateContentServerClient(
(contentClientToUse) => contentClientToUse.fetchEntitiesByPointers(pointers),
contentServerUrl
)
return retry(executeClientRequest, retries, waitTime)
}

return { getEntitiesByPointers }
}
62 changes: 28 additions & 34 deletions src/adapters/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,21 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
end as address
FROM
(
SELECT
f_a.*
from
friendships f_a
where
SELECT f_a.*
FROM friendships f_a
WHERE
(
f_a.address_requester = ${userAddress1}
or f_a.address_requested = ${userAddress1}
) and f_a.is_active = true
) as friends_a
)
SELECT
address
f_b.address
FROM
friendsA f_b
WHERE
address IN (
f_b.address IN (
SELECT
CASE
WHEN address_requester = ${userAddress2} then address_requested
Expand All @@ -134,18 +132,18 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
(
SELECT
f_b.*
from
FROM
friendships f_b
where
WHERE
(
f_b.address_requester = ${userAddress2}
or f_b.address_requested = ${userAddress2}
) and f_b.is_active = true
) as friends_b
)
ORDER BY f_a.address
ORDER BY f_b.address
LIMIT ${limit}
OFFSET ${offset};`
OFFSET ${offset}`
)

return result.rows
Expand All @@ -155,20 +153,18 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
SQL`WITH friendsA as (
SELECT
CASE
WHEN address_requester = ${userAddress1} then address_requested
else address_requester
end as address
WHEN address_requester = ${userAddress1} THEN address_requested
ELSE address_requester
END as address
FROM
(
SELECT
f_a.*
from
friendships f_a
where
SELECT f_a.*
FROM friendships f_a
WHERE
(
f_a.address_requester = ${userAddress1}
or f_a.address_requested = ${userAddress1}
) and f_a.is_active = true
OR f_a.address_requested = ${userAddress1}
) AND f_a.is_active = true
) as friends_a
)
SELECT
Expand All @@ -179,22 +175,20 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
address IN (
SELECT
CASE
WHEN address_requester = ${userAddress2} then address_requested
else address_requester
end as address_a
WHEN address_requester = ${userAddress2} THEN address_requested
ELSE address_requester
END as address_a
FROM
(
SELECT
f_b.*
from
friendships f_b
where
SELECT f_b.*
FROM friendships f_b
WHERE
(
f_b.address_requester = ${userAddress2}
or f_b.address_requested = ${userAddress2}
) and f_b.is_active = true
OR f_b.address_requested = ${userAddress2}
) AND f_b.is_active = true
) as friends_b
);`
)`
)

return result.rows[0].count
Expand Down Expand Up @@ -321,8 +315,8 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
const res = await cb(client)
await client.query('COMMIT')
return res
} catch (error) {
logger.error(error as any)
} catch (error: any) {
logger.error(`Error executing transaction: ${error.message}`)
await client.query('ROLLBACK')
client.release()
throw error
Expand Down
17 changes: 11 additions & 6 deletions src/adapters/rpc-server/rpc-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ export async function createRpcServerComponent({
pubsub,
config,
server,
archipelagoStats
archipelagoStats,
catalystClient
}: Pick<
AppComponents,
'logs' | 'db' | 'pubsub' | 'config' | 'server' | 'nats' | 'archipelagoStats' | 'redis'
'logs' | 'db' | 'pubsub' | 'config' | 'server' | 'nats' | 'archipelagoStats' | 'redis' | 'catalystClient'
>): Promise<IRPCServerComponent> {
// TODO: this should be a redis if we want to have more than one instance of the server
const SHARED_CONTEXT: Pick<RpcServerContext, 'subscribers'> = {
Expand All @@ -36,10 +37,14 @@ export async function createRpcServerComponent({

const rpcServerPort = (await config.getNumber('RPC_SERVER_PORT')) || 8085

const getFriends = getFriendsService({ components: { logs, db } })
const getMutualFriends = getMutualFriendsService({ components: { logs, db } })
const getPendingFriendshipRequests = getPendingFriendshipRequestsService({ components: { logs, db } })
const getSentFriendshipRequests = getSentFriendshipRequestsService({ components: { logs, db } })
const getFriends = await getFriendsService({ components: { logs, db, catalystClient, config } })
const getMutualFriends = await getMutualFriendsService({ components: { logs, db, catalystClient, config } })
const getPendingFriendshipRequests = await getPendingFriendshipRequestsService({
components: { logs, db, catalystClient, config }
})
const getSentFriendshipRequests = await getSentFriendshipRequestsService({
components: { logs, db, catalystClient, config }
})
const upsertFriendship = upsertFriendshipService({ components: { logs, db, pubsub } })
const getFriendshipStatus = getFriendshipStatusService({ components: { logs, db } })
const subscribeToFriendshipUpdates = subscribeToFriendshipUpdatesService({ components: { logs } })
Expand Down
31 changes: 23 additions & 8 deletions src/adapters/rpc-server/services/get-friends.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import { parseProfilesToFriends } from '../../../logic/friends'
import { RpcServerContext, RPCServiceContext } from '../../../types'
import { getPage } from '../../../utils/pagination'
import { FRIENDSHIPS_PER_PAGE, INTERNAL_SERVER_ERROR } from '../constants'
import { FRIENDSHIPS_PER_PAGE } from '../constants'
import {
GetFriendsPayload,
PaginatedUsersResponse
PaginatedFriendsProfilesResponse
} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen'

export function getFriendsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) {
export async function getFriendsService({
components: { logs, db, catalystClient, config }
}: RPCServiceContext<'logs' | 'db' | 'catalystClient' | 'config'>) {
const logger = logs.getLogger('get-friends-service')
const profileImagesUrl = await config.requireString('PROFILE_IMAGES_URL')

return async function (request: GetFriendsPayload, context: RpcServerContext): Promise<PaginatedUsersResponse> {
return async function (
request: GetFriendsPayload,
context: RpcServerContext
): Promise<PaginatedFriendsProfilesResponse> {
const { pagination } = request
const { address: loggedUserAddress } = context

Expand All @@ -20,16 +27,24 @@ export function getFriendsService({ components: { logs, db } }: RPCServiceContex
db.getFriendsCount(loggedUserAddress)
])

const profiles = await catalystClient.getEntitiesByPointers(friends.map((friend) => friend.address))

return {
users: friends,
friends: parseProfilesToFriends(profiles, profileImagesUrl),
paginationData: {
total,
page: getPage(pagination?.limit || FRIENDSHIPS_PER_PAGE, pagination?.offset)
}
}
} catch (error) {
logger.error(error as any)
throw new Error(INTERNAL_SERVER_ERROR)
} catch (error: any) {
logger.error(`Error getting friends: ${error.message}`)
return {
friends: [],
paginationData: {
total: 0,
page: 1
}
}
}
}
}
4 changes: 2 additions & 2 deletions src/adapters/rpc-server/services/get-friendship-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ export function getFriendshipStatusService({ components: { logs, db } }: RPCServ
}
}
}
} catch (error) {
logger.error(error as any)
} catch (error: any) {
logger.error(`Error getting friendship status: ${error.message}`)
return {
response: {
$case: 'internalServerError',
Expand Down
38 changes: 27 additions & 11 deletions src/adapters/rpc-server/services/get-mutual-friends.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,53 @@
import { RpcServerContext, RPCServiceContext } from '../../../types'
import { INTERNAL_SERVER_ERROR, FRIENDSHIPS_PER_PAGE } from '../constants'
import { FRIENDSHIPS_PER_PAGE } from '../constants'
import {
GetMutualFriendsPayload,
PaginatedUsersResponse
PaginatedFriendsProfilesResponse
} from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen'
import { normalizeAddress } from '../../../utils/address'
import { getPage } from '../../../utils/pagination'
import { parseProfilesToFriends } from '../../../logic/friends'

export function getMutualFriendsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) {
export async function getMutualFriendsService({
components: { logs, db, catalystClient, config }
}: RPCServiceContext<'logs' | 'db' | 'catalystClient' | 'config'>) {
const logger = logs.getLogger('get-mutual-friends-service')
const profileImagesUrl = await config.requireString('PROFILE_IMAGES_URL')

return async function (
request: GetMutualFriendsPayload,
context: RpcServerContext
): Promise<PaginatedFriendsProfilesResponse> {
logger.debug(`Getting mutual friends ${context.address}<>${request.user!.address}`)

return async function (request: GetMutualFriendsPayload, context: RpcServerContext): Promise<PaginatedUsersResponse> {
logger.debug(`getting mutual friends ${context.address}<>${request.user!.address}`)
try {
const { address: requester } = context
const { pagination, user } = request
const requested = normalizeAddress(user!.address)

const [mutualFriends, total] = await Promise.all([
db.getMutualFriends(requester, requested, pagination),
db.getMutualFriendsCount(requester, requested)
])

const profiles = await catalystClient.getEntitiesByPointers(mutualFriends.map((friend) => friend.address))

return {
users: mutualFriends,
friends: parseProfilesToFriends(profiles, profileImagesUrl),
paginationData: {
total,
page: getPage(pagination?.limit || FRIENDSHIPS_PER_PAGE, pagination?.offset)
}
}
} catch (error) {
logger.error(error as any)
// throw an error bc there is no sense to create a generator to send an error
// as it's done in the previous Social Service
throw new Error(INTERNAL_SERVER_ERROR)
} catch (error: any) {
logger.error(`Error getting mutual friends: ${error.message}`)
return {
friends: [],
paginationData: {
total: 0,
page: 1
}
}
}
}
}
Loading

0 comments on commit 0958107

Please sign in to comment.