Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Filter friends by connectivity status #35

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ module.exports = {
coverageDirectory: 'coverage',
collectCoverageFrom: ['src/**/*.ts', 'src/**/*.js', '!src/migrations/**'],
testMatch: ['**/*.spec.(ts)'],
testEnvironment: 'node'
testEnvironment: 'node',
setupFilesAfterEnv: ['<rootDir>/test/setupTests.ts']
}
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,19 @@
},
"dependencies": {
"@dcl/platform-crypto-middleware": "^1.1.0",
"@dcl/protocol": "^1.0.0-12815643167.commit-c4162c4",
"@dcl/protocol": "https://sdk-team-cdn.decentraland.org/@dcl/protocol/branch//dcl-protocol-1.0.0-12890706635.commit-a7e4210.tgz",
"@dcl/rpc": "^1.1.2",
"@well-known-components/env-config-provider": "^1.2.0",
"@well-known-components/fetch-component": "^2.0.2",
"@well-known-components/http-server": "^2.1.0",
"@well-known-components/interfaces": "^1.4.3",
"@well-known-components/logger": "^3.1.3",
"@well-known-components/metrics": "^2.1.0",
"@well-known-components/nats-component": "^2.0.0",
"@well-known-components/pg-component": "^0.2.2",
"@well-known-components/uws-http-server": "^0.0.2",
"fp-future": "^1.0.1",
"lru-cache": "^10.4.3",
"mitt": "^3.0.1",
"redis": "^4.6.13",
"sql-template-strings": "^2.2.2",
Expand Down
34 changes: 34 additions & 0 deletions src/adapters/archipelago-stats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { AppComponents, IArchipelagoStatsComponent } from '../types'
import { PEERS_CACHE_KEY } from '../utils/peers'

export async function createArchipelagoStatsComponent({
logs,
config,
fetcher,
redis
}: Pick<AppComponents, 'logs' | 'config' | 'fetcher' | 'redis'>): Promise<IArchipelagoStatsComponent> {
const logger = logs.getLogger('archipelago-stats-component')
const url = await config.getString('ARCHIPELAGO_STATS_URL')

return {
async getPeers() {
try {
const response = await fetcher.fetch(`${url}/comms/peers`)

if (!response.ok) {
throw new Error(`Error fetching peers: ${response.statusText}`)
}

const { peers } = await response.json()

return peers.map((peer: { id: string }) => peer.id)
} catch (error) {
logger.error(error as any)
return []
}
},
async getPeersFromCache() {
return (await redis.get<string[]>(PEERS_CACHE_KEY)) || []
}
}
}
99 changes: 52 additions & 47 deletions src/adapters/db.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,9 @@
import SQL, { SQLStatement } from 'sql-template-strings'
import { randomUUID } from 'node:crypto'
import { PoolClient } from 'pg'
import { Action, AppComponents, Friendship, FriendshipAction, FriendshipRequest, Mutual, Pagination } from '../types'
import { AppComponents, Friendship, FriendshipAction, FriendshipRequest, IDatabaseComponent, Friend } from '../types'
import { FRIENDSHIPS_PER_PAGE } from './rpc-server/constants'

export interface IDatabaseComponent {
createFriendship(
users: [string, string],
isActive: boolean,
txClient?: PoolClient
): Promise<{
id: string
created_at: Date
}>
updateFriendshipStatus(friendshipId: string, isActive: boolean, txClient?: PoolClient): Promise<boolean>
getFriends(
userAddress: string,
options?: {
pagination?: Pagination
onlyActive?: boolean
}
): Promise<Friendship[]>
getFriendsCount(
userAddress: string,
options?: {
onlyActive?: boolean
}
): Promise<number>
getMutualFriends(userAddress1: string, userAddress2: string, pagination?: Pagination): Promise<Mutual[]>
getMutualFriendsCount(userAddress1: string, userAddress2: string): Promise<number>
getFriendship(userAddresses: [string, string]): Promise<Friendship | undefined>
getLastFriendshipAction(friendshipId: string): Promise<FriendshipAction | undefined>
getLastFriendshipActionByUsers(loggedUser: string, friendUser: string): Promise<FriendshipAction | undefined>
recordFriendshipAction(
friendshipId: string,
actingUser: string,
action: Action,
metadata: Record<string, any> | null,
txClient?: PoolClient
): Promise<boolean>
getReceivedFriendshipRequests(userAddress: string, pagination?: Pagination): Promise<FriendshipRequest[]>
getSentFriendshipRequests(userAddress: string, pagination?: Pagination): Promise<FriendshipRequest[]>
executeTx<T>(cb: (client: PoolClient) => Promise<T>): Promise<T>
}

export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>): IDatabaseComponent {
const { pg, logs } = components

Expand All @@ -53,15 +13,22 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
async getFriends(userAddress, { onlyActive = true, pagination = { limit: FRIENDSHIPS_PER_PAGE, offset: 0 } } = {}) {
const { limit, offset } = pagination

const query: SQLStatement = SQL`SELECT * FROM friendships WHERE (address_requester = ${userAddress} OR address_requested = ${userAddress})`
const query: SQLStatement = SQL`
SELECT
CASE
WHEN address_requester = ${userAddress} THEN address_requested
ELSE address_requester
END as address
FROM friendships
WHERE (address_requester = ${userAddress} OR address_requested = ${userAddress})`

if (onlyActive) {
query.append(SQL` AND is_active = true`)
}

query.append(SQL` ORDER BY created_at DESC OFFSET ${offset} LIMIT ${limit}`)

const result = await pg.query<Friendship>(query)
const result = await pg.query<Friend>(query)
return result.rows
},
async getFriendsCount(userAddress, { onlyActive } = { onlyActive: true }) {
Expand All @@ -76,7 +43,7 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
},
async getMutualFriends(userAddress1, userAddress2, pagination = { limit: FRIENDSHIPS_PER_PAGE, offset: 0 }) {
const { limit, offset } = pagination
const result = await pg.query<Mutual>(
const result = await pg.query<Friend>(
SQL`WITH friendsA as (
SELECT
CASE
Expand Down Expand Up @@ -244,7 +211,7 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
await pg.query(query)
}

return true
return uuid
},
async getReceivedFriendshipRequests(userAddress, pagination) {
const { limit, offset } = pagination || {}
Expand All @@ -269,7 +236,7 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
query.append(SQL` LIMIT ${limit}`)
}

if (!!offset) {
if (offset) {
query.append(SQL` OFFSET ${offset}`)
}

Expand All @@ -280,7 +247,7 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
async getSentFriendshipRequests(userAddress, pagination) {
const { limit, offset } = pagination || {}
const query = SQL`
SELECT f.address_requested as address, fa.timestamp, fa.metadata
SELECT fa.id, f.address_requested as address, fa.timestamp, fa.metadata
FROM friendships f
INNER JOIN friendship_actions fa ON f.id = fa.friendship_id
WHERE
Expand All @@ -307,6 +274,44 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>

return results.rows
},
getOnlineFriends(userAddress: string, onlinePeers: string[]) {
const query: SQLStatement = SQL`
SELECT
CASE
WHEN address_requester = ${userAddress} THEN address_requested
ELSE address_requester
END as address
FROM friendships
WHERE (
(address_requester = ${userAddress} AND address_requested = ANY(${onlinePeers}))
OR
(address_requested = ${userAddress} AND address_requester = ANY(${onlinePeers}))
)
AND is_active = true
`

return pg.streamQuery<Friend>(query)
},
async areFriendsOf(userAddress: string, potentialFriends: string[]) {
if (potentialFriends.length === 0) return []

const query = SQL`
SELECT DISTINCT
CASE
WHEN address_requester = ${userAddress} THEN address_requested
ELSE address_requester
END as address
FROM friendships
WHERE (
(address_requester = ${userAddress} AND address_requested = ANY(${potentialFriends}))
OR
(address_requested = ${userAddress} AND address_requester = ANY(${potentialFriends}))
)
AND is_active = true
`
const results = await pg.query<Friend>(query)
return results.rows
},
async executeTx<T>(cb: (client: PoolClient) => Promise<T>): Promise<T> {
const pool = pg.getPool()
const client = await pool.connect()
Expand Down
22 changes: 22 additions & 0 deletions src/adapters/memory-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { ICacheComponent } from '@well-known-components/interfaces'
import { LRUCache } from 'lru-cache'

export function createInMemoryCacheComponent(): ICacheComponent {
const cache = new LRUCache<string, any>({
max: 1000,
ttl: 1000 * 60 * 60 * 2 // 2 hours
})

async function get(key: string): Promise<any> {
return cache.get(key)
}

async function put(key: string, value: any): Promise<void> {
cache.set(key, value)
}

return {
get,
put
}
}
56 changes: 56 additions & 0 deletions src/adapters/peer-tracking.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { Subscription } from '@well-known-components/nats-component'
import { IPeerTrackingComponent } from '../types'
import { AppComponents } from '../types'
import { ConnectivityStatus } from '@dcl/protocol/out-js/decentraland/social_service/v2/social_service_v2.gen'
import { NatsMsg } from '@well-known-components/nats-component/dist/types'
import { FRIEND_STATUS_UPDATES_CHANNEL } from './pubsub'

export function createPeerTrackingComponent({
logs,
pubsub,
nats
}: Pick<AppComponents, 'logs' | 'pubsub' | 'nats'>): IPeerTrackingComponent {
const logger = logs.getLogger('peer-tracking-component')
let natsSubscriptions: Subscription[] = []

async function notifyPeerStatusChange(peerId: string, status: ConnectivityStatus) {
try {
await pubsub.publishInChannel(FRIEND_STATUS_UPDATES_CHANNEL, {
address: peerId,
status
})
} catch (error: any) {
logger.error('Error notifying peer status change:', {
error: error.message,
peerId,
status
})
}
}

function handleNatsMessage(event: string, status: ConnectivityStatus) {
return async (err: Error | null, message: NatsMsg) => {
if (err) {
logger.error(`Error processing peer ${event} message: ${err.message}`)
return
}

const peerId = message.subject.split('.')[1]
await notifyPeerStatusChange(peerId, status)
}
}

return {
async start() {
natsSubscriptions.push(
nats.subscribe('peer.*.connect', handleNatsMessage('connect', ConnectivityStatus.OFFLINE)),
nats.subscribe('peer.*.disconnect', handleNatsMessage('disconnect', ConnectivityStatus.OFFLINE)),
nats.subscribe('peer.*.heartbeat', handleNatsMessage('heartbeat', ConnectivityStatus.ONLINE))
)
},
async stop() {
natsSubscriptions.forEach((subscription) => subscription.unsubscribe())
natsSubscriptions = []
}
}
}
50 changes: 50 additions & 0 deletions src/adapters/peers-synchronizer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { AppComponents, IPeersSynchronizer } from '../types'
import { PEERS_CACHE_KEY } from '../utils/peers'

export const FIVE_SECS_IN_MS = 5000
export const TEN_SECS_IN_MS = 10000

export async function createPeersSynchronizerComponent({
logs,
archipelagoStats,
redis,
config
}: Pick<AppComponents, 'logs' | 'archipelagoStats' | 'redis' | 'config'>): Promise<IPeersSynchronizer> {
const logger = logs.getLogger('peers-synchronizer-component')
let intervalId: NodeJS.Timeout | null = null
const syncIntervalMs = (await config.getNumber('PEER_SYNC_INTERVAL_MS')) || FIVE_SECS_IN_MS
const cacheTTLInSeconds = Math.floor(((await config.getNumber('PEERS_SYNC_CACHE_TTL_MS')) || TEN_SECS_IN_MS) / 1000)

async function syncPeers() {
try {
const currentPeers = await archipelagoStats.getPeers()

await redis.put(PEERS_CACHE_KEY, JSON.stringify(currentPeers), {
EX: cacheTTLInSeconds
})

logger.debug('Synced peers to Redis', {
peersCount: Object.keys(currentPeers).length,
timestamp: Date.now()
})
} catch (error: any) {
logger.error('Error syncing peers:', error)
}
}

return {
async start() {
logger.info('Starting scheduler component', { syncIntervalMs })
await syncPeers()
intervalId = setInterval(syncPeers, syncIntervalMs)
},

async stop() {
logger.info('Stopping scheduler component')
if (intervalId) {
clearInterval(intervalId)
intervalId = null
}
}
}
}
Loading
Loading