Skip to content

Commit

Permalink
store presences in database instead of redis
Browse files Browse the repository at this point in the history
  • Loading branch information
RuslanUC committed Sep 1, 2024
1 parent 9805f9f commit a1c9a9c
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 176 deletions.
74 changes: 41 additions & 33 deletions yepcord/gateway/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@

from ..yepcord.config import Config
from ..yepcord.enums import GatewayOp
from ..yepcord.models import Emoji, Application, Integration, ConnectedAccount
from ..yepcord.models import Emoji, Application, Integration, ConnectedAccount, Presence
from ..yepcord.models.interaction import Interaction
from ..yepcord.snowflake import Snowflake

if TYPE_CHECKING: # pragma: no cover
from ..yepcord.models import Channel, Invite, GuildMember, UserData, User, UserSettings
from ..yepcord.core import Core
from .gateway import GatewayClient
from .presences import Presence


class Event:
Expand Down Expand Up @@ -296,11 +295,8 @@ async def json(self) -> dict:
"op": self.OP,
"d": {
"user": self.userdata.ds_json,
"status": self.presence.public_status,
"last_modified": int(time() * 1000),
"client_status": {} if self.presence.public_status == "offline"
else {"desktop": self.presence.public_status},
"activities": [] if self.presence.public_status == "offline" else self.presence.activities
"last_modified": 0 if self.presence.is_offline else int(self.presence.updated_at * 1000),
**self.presence.ds_json(),
}
}

Expand Down Expand Up @@ -508,41 +504,53 @@ class GuildMembersListUpdateEvent(DispatchEvent):
def __init__(self, members: List[GuildMember], total_members: int, statuses: dict, guild_id: int):
self.members = members
self.total_members = total_members
self.statuses = statuses
self.statuses: dict[int, Presence] = statuses
self.groups = {}
for s in statuses.values():
if s.public_status not in self.groups:
self.groups[s.public_status] = 0
self.groups[s.public_status] += 1
status = s.public_status if s is not None else "offline"
if status not in self.groups:
self.groups[status] = 0
self.groups[status] += 1
self.guild_id = guild_id

# noinspection PyShadowingNames
async def json(self) -> dict:
groups = [{"id": status, "count": count} for status, count in self.groups.items()]
items = []
for mem in self.members:
m = await mem.ds_json()
m["presence"] = {
"user": {"id": str(mem.user.id)},
"status": self.statuses[mem.user.id].public_status,
"client_status": {} if self.statuses[mem.user.id].public_status == "offline" else {
"desktop": self.statuses[mem.user.id].public_status
online_count = 0
for member in self.members:
status = self.statuses[member.user.id]
if status is not None and not status.is_offline:
online_count += 1

items.append({
"member": {
**(await member.ds_json()),
"presence": {
"user": {"id": str(member.user.id)},
**(status.ds_json(False) if status is not None else Presence.ds_json_offline(full=False)),
},
},
"activities": self.statuses[mem.user.id].activities
}
items.append({"member": m})
})
items.sort(key=lambda i: i["member"]["presence"]["status"])
_ls = None
_ins: dict = {}
_offset = 0
for idx, i in enumerate(items):
if (_s := i["member"]["presence"]["status"]) != _ls:
group = {"id": _s, "count": self.groups[_s]}
_ins[idx+_offset] = {"group": group}
_offset += 1
_ls = _s
for idx, ins in _ins.items():
items.insert(idx, ins)

last_group = None
group_indexes: dict = {}
index_offset = 0
for idx, item in enumerate(items):
if (group_name := item["member"]["presence"]["status"]) != last_group:
group_indexes[idx + index_offset] = {
"group": {
"id": group_name,
"count": self.groups[group_name],
}
}
index_offset += 1
last_group = group_name

for idx, group in group_indexes.items(): # TODO: insert in previous loop
items.insert(idx, group)

return {
"t": self.NAME,
"op": self.OP,
Expand All @@ -552,7 +560,7 @@ async def json(self) -> dict:
"op": "SYNC",
"items": items
}],
"online_count": self.statuses.get("online", 0),
"online_count": online_count,
"member_count": self.total_members,
"id": "everyone",
"guild_id": str(self.guild_id),
Expand Down
109 changes: 56 additions & 53 deletions yepcord/gateway/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@

import warnings
from json import dumps as jdumps
from typing import Optional, Union
from typing import Union

from quart import Websocket
from redis.asyncio import Redis
from tortoise.expressions import Q

from .events import *
from .presences import Presences, Presence
from .utils import require_auth, get_token_type, TokenType, init_redis_pool
from ..yepcord.classes.fakeredis import FakeRedis
from ..yepcord.core import Core
from ..yepcord.ctx import getCore
from ..yepcord.enums import GatewayOp
from ..yepcord.models import Session, User, UserSettings, Bot, GuildMember
from ..yepcord.models import Session, User, UserSettings, Bot, GuildMember, Relationship, Presence
from ..yepcord.mq_broker import getBroker


Expand All @@ -45,9 +45,10 @@ def __init__(self, ws, gateway: Gateway):
self._connected = True

self.z = getattr(ws, "zlib", None)
self.id = self.user_id = None
self.user_id = None
self.is_bot = False
self.cached_presence: Optional[Presence] = None

self._user = None

@property
def connected(self):
Expand All @@ -72,6 +73,12 @@ async def esend(self, event):
def compress(self, json: dict):
return self.z(jdumps(json).encode("utf8"))

async def get_user(self, reload: bool = False) -> User:
if self._user is None or reload:
self._user = await User.get(id=self.user_id)

return self._user

async def handle_IDENTIFY(self, data: dict) -> None:
if self.user_id is not None:
return await self.ws.close(4005)
Expand All @@ -82,15 +89,10 @@ async def handle_IDENTIFY(self, data: dict) -> None:
if (session := await S.from_token(token)) is None:
return await self.ws.close(4004)

self.id = self.user_id = session.user.id
self.user_id = session.user.id
self.is_bot = session.user.is_bot

settings = await session.user.settings
self.cached_presence = await self.gateway.presences.get(self.user_id) or self.cached_presence
if self.cached_presence is None:
self.cached_presence = Presence(self.user_id, settings.status, settings.custom_status, [])

await self.gateway.authenticated(self, self.cached_presence)
await self.gateway.authenticated(self)
await self.esend(ReadyEvent(session.user, self, getCore()))
if not session.user.is_bot:
guild_ids = [guild.id for guild in await getCore().getUserGuilds(session.user)]
Expand All @@ -106,38 +108,38 @@ async def handle_RESUME(self, data: dict, new_client: GatewayClient) -> None:
return await new_client.ws.close(4004)

S = Session if token_type == TokenType.USER else Bot
if (session := await S.from_token(token)) is None or self.user_id.id != session.user.id:
if (session := await S.from_token(token)) is None or self.user_id != session.user.id:
return await self.ws.close(4004)

self.z = new_client.z
self.ws = new_client.ws
setattr(self.ws, "_yepcord_client", self)

self.gateway.remove_client(new_client)
await self.gateway.authenticated(self, self.cached_presence)
await self.gateway.authenticated(self)

await self.send({"op": GatewayOp.DISPATCH, "t": "READY"})

# noinspection PyUnusedLocal
async def handle_HEARTBEAT(self, data: None) -> None:
await self.send({"op": GatewayOp.HEARTBEAT_ACK, "t": None, "d": None})
await self.gateway.presences.set_or_refresh(self.user_id, self.cached_presence)
if self.user_id is not None:
await Presence.filter(user__id=self.user_id).update(updated_at=int(time()))

@require_auth
async def handle_STATUS(self, data: dict) -> None:
self.cached_presence = await self.gateway.presences.get(self.user_id) or self.cached_presence
if self.cached_presence is None:
settings = await UserSettings.get(id=self.user_id)
self.cached_presence = Presence(self.user_id, settings.status, settings.custom_status, [])

presence = self.cached_presence
presence, created = await Presence.get_or_create(id=self.user_id, user=await self.get_user())
presence.updated_at = int(time())
if created:
settings = await UserSettings.get(user__id=self.user_id)
presence.fill_from_settings(settings)

if (status := data.get("status")) and status in ["online", "idle", "offline", "dnd", "invisible"]:
if (status := data.get("status")) and status in {"online", "idle", "offline", "dnd", "invisible"}:
presence.status = status
if (activities := data.get("activities")) is not None:
presence.activities = activities

await self.gateway.presences.set_or_refresh(self.user_id, presence, overwrite=True)
await presence.save()
await self.gateway.ev.presence_update(self.user_id, presence)

@require_auth
Expand All @@ -150,10 +152,11 @@ async def handle_LAZY_REQUEST(self, data: dict) -> None:
members = await getCore().getGuildMembers(guild)
statuses = {}
for member in members:
if presence := await self.gateway.presences.get(member.user.id):
statuses[member.user.id] = presence
else:
statuses[member.user.id] = Presence(member.user.id, "offline", None)
# TODO: rewrite with one query?
#if presence := await self.gateway.presences.get(member.user.id):
# statuses[member.user.id] = presence
# continue
statuses[member.user.id] = None
await self.esend(GuildMembersListUpdateEvent(
members,
await getCore().getGuildMemberCount(guild),
Expand Down Expand Up @@ -292,7 +295,6 @@ def __init__(self, core: Core):
self.broker.subscriber("yepcord_events")(self.mcl_yepcordEventsCallback)
self.broker.subscriber("yepcord_sys_events")(self.mcl_yepcordSysEventsCallback)
self.store = WsStore()
self.presences = Presences(self)
self.ev = GatewayEvents(self)

self.redis: Union[Redis, FakeRedis, None] = None
Expand Down Expand Up @@ -362,20 +364,29 @@ async def add_client(self, ws: Websocket) -> None:
setattr(ws, "_yepcord_client", client)
await client.send({"op": GatewayOp.HELLO, "t": None, "s": None, "d": {"heartbeat_interval": 45000}})

async def authenticated(self, client: GatewayClient, presence: Presence) -> None:
async def authenticated(self, client: GatewayClient) -> None:
if client.user_id not in self.store.by_user_id:
self.store.by_user_id[client.user_id] = set()
self.store.by_user_id[client.user_id].add(client)
self.store.by_sess_id[client.sid] = client

for member in await GuildMember.filter(user__id=client.user_id).select_related("guild"):
user = await client.get_user()

for member in await GuildMember.filter(user=user).select_related("guild"):
self.store.subscribe(member.guild.id, None, client.user_id)
for role in await member.roles.all():
self.store.subscribe(None, role.id, client.user_id)

if presence:
presence, created = await Presence.get_or_create(id=user.id, user=user)
if created or time() - presence.updated_at > Config.GATEWAY_KEEP_ALIVE_DELAY * 10:
settings = await user.settings
presence.fill_from_settings(settings)

presence.updated_at = int(time())
await presence.save()

if presence.public_status != "offline":
await self.ev.presence_update(client.user_id, presence)
await self.presences.set_or_refresh(client.user_id, presence)

def remove_client(self, client: GatewayClient) -> None:
if client in self.store.get(user_id=client.user_id):
Expand Down Expand Up @@ -403,29 +414,21 @@ async def process(self, ws: Websocket, data: dict):
print(f" Unknown op code: {op}")
print(f" Data: {data}")

async def disconnect(self, ws: Websocket):
@staticmethod
async def disconnect(ws: Websocket):
getattr(ws, "_yepcord_client").disconnect()

async def getFriendsPresences(self, uid: int) -> list[dict]:
@staticmethod
async def getFriendsPresences(user_id: int) -> list[dict[str, ...]]:
presences = []
user = await User.get(id=uid)
friends = await self.core.getRelationships(user)
friends = [int(u["user_id"]) for u in friends if u["type"] == 1]
for friend in friends:
if presence := await self.presences.get(friend):
presences.append({
"user_id": str(friend),
"status": presence.public_status,
"last_modified": int(time()*1000),
"client_status": {"desktop": presence.public_status} if presence.public_status != "offline" else {},
"activities": presence.activities
})
async for rel in Relationship.filter(Q(type=1) & (Q(from_user__id=user_id) | Q(to_user__id=user_id))) \
.select_related("from_user", "to_user"):
other = rel.other_user(user_id)
if presence := await Presence.get_or_none(
user=other, updated_at__gt=int(time() - Config.GATEWAY_KEEP_ALIVE_DELAY * 1.25)
):
presences.append(presence.ds_json())
continue
presences.append({
"user_id": str(friend),
"status": "offline",
"last_modified": int(time()),
"client_status": {},
"activities": []
})
presences.append(Presence.ds_json_offline())

return presences
4 changes: 2 additions & 2 deletions yepcord/gateway/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ async def ws_gateway():
await gw.add_client(ws)
while True:
try:
data = await ws.receive()
await gw.process(ws, jloads(data))
data: dict = await ws.receive_json()
await gw.process(ws, data)
except CancelledError:
await gw.disconnect(ws)
raise
Expand Down
Loading

0 comments on commit a1c9a9c

Please sign in to comment.