diff --git a/yepcord/gateway/events.py b/yepcord/gateway/events.py index 379df0d..fc17668 100644 --- a/yepcord/gateway/events.py +++ b/yepcord/gateway/events.py @@ -24,7 +24,7 @@ from ..yepcord.config import Config from ..yepcord.enums import GatewayOp -from ..yepcord.models import Emoji, Application, Integration, ConnectedAccount +from ..yepcord.models import Emoji, Application, Integration, ConnectedAccount, Presence from ..yepcord.models.interaction import Interaction from ..yepcord.snowflake import Snowflake @@ -32,7 +32,6 @@ from ..yepcord.models import Channel, Invite, GuildMember, UserData, User, UserSettings from ..yepcord.core import Core from .gateway import GatewayClient - from .presences import Presence class Event: @@ -296,11 +295,8 @@ async def json(self) -> dict: "op": self.OP, "d": { "user": self.userdata.ds_json, - "status": self.presence.public_status, - "last_modified": int(time() * 1000), - "client_status": {} if self.presence.public_status == "offline" - else {"desktop": self.presence.public_status}, - "activities": [] if self.presence.public_status == "offline" else self.presence.activities + "last_modified": 0 if self.presence.is_offline else int(self.presence.updated_at * 1000), + **self.presence.ds_json(), } } @@ -508,41 +504,53 @@ class GuildMembersListUpdateEvent(DispatchEvent): def __init__(self, members: List[GuildMember], total_members: int, statuses: dict, guild_id: int): self.members = members self.total_members = total_members - self.statuses = statuses + self.statuses: dict[int, Presence] = statuses self.groups = {} for s in statuses.values(): - if s.public_status not in self.groups: - self.groups[s.public_status] = 0 - self.groups[s.public_status] += 1 + status = s.public_status if s is not None else "offline" + if status not in self.groups: + self.groups[status] = 0 + self.groups[status] += 1 self.guild_id = guild_id # noinspection PyShadowingNames async def json(self) -> dict: groups = [{"id": status, "count": count} for status, count in self.groups.items()] items = [] - for mem in self.members: - m = await mem.ds_json() - m["presence"] = { - "user": {"id": str(mem.user.id)}, - "status": self.statuses[mem.user.id].public_status, - "client_status": {} if self.statuses[mem.user.id].public_status == "offline" else { - "desktop": self.statuses[mem.user.id].public_status + online_count = 0 + for member in self.members: + status = self.statuses[member.user.id] + if status is not None and not status.is_offline: + online_count += 1 + + items.append({ + "member": { + **(await member.ds_json()), + "presence": { + "user": {"id": str(member.user.id)}, + **(status.ds_json(False) if status is not None else Presence.ds_json_offline(full=False)), + }, }, - "activities": self.statuses[mem.user.id].activities - } - items.append({"member": m}) + }) items.sort(key=lambda i: i["member"]["presence"]["status"]) - _ls = None - _ins: dict = {} - _offset = 0 - for idx, i in enumerate(items): - if (_s := i["member"]["presence"]["status"]) != _ls: - group = {"id": _s, "count": self.groups[_s]} - _ins[idx+_offset] = {"group": group} - _offset += 1 - _ls = _s - for idx, ins in _ins.items(): - items.insert(idx, ins) + + last_group = None + group_indexes: dict = {} + index_offset = 0 + for idx, item in enumerate(items): + if (group_name := item["member"]["presence"]["status"]) != last_group: + group_indexes[idx + index_offset] = { + "group": { + "id": group_name, + "count": self.groups[group_name], + } + } + index_offset += 1 + last_group = group_name + + for idx, group in group_indexes.items(): # TODO: insert in previous loop + items.insert(idx, group) + return { "t": self.NAME, "op": self.OP, @@ -552,7 +560,7 @@ async def json(self) -> dict: "op": "SYNC", "items": items }], - "online_count": self.statuses.get("online", 0), + "online_count": online_count, "member_count": self.total_members, "id": "everyone", "guild_id": str(self.guild_id), diff --git a/yepcord/gateway/gateway.py b/yepcord/gateway/gateway.py index cd44d54..8c4390e 100644 --- a/yepcord/gateway/gateway.py +++ b/yepcord/gateway/gateway.py @@ -20,19 +20,19 @@ import warnings from json import dumps as jdumps -from typing import Optional, Union +from typing import Union from quart import Websocket from redis.asyncio import Redis +from tortoise.expressions import Q from .events import * -from .presences import Presences, Presence from .utils import require_auth, get_token_type, TokenType, init_redis_pool from ..yepcord.classes.fakeredis import FakeRedis from ..yepcord.core import Core from ..yepcord.ctx import getCore from ..yepcord.enums import GatewayOp -from ..yepcord.models import Session, User, UserSettings, Bot, GuildMember +from ..yepcord.models import Session, User, UserSettings, Bot, GuildMember, Relationship, Presence from ..yepcord.mq_broker import getBroker @@ -45,9 +45,10 @@ def __init__(self, ws, gateway: Gateway): self._connected = True self.z = getattr(ws, "zlib", None) - self.id = self.user_id = None + self.user_id = None self.is_bot = False - self.cached_presence: Optional[Presence] = None + + self._user = None @property def connected(self): @@ -72,6 +73,12 @@ async def esend(self, event): def compress(self, json: dict): return self.z(jdumps(json).encode("utf8")) + async def get_user(self, reload: bool = False) -> User: + if self._user is None or reload: + self._user = await User.get(id=self.user_id) + + return self._user + async def handle_IDENTIFY(self, data: dict) -> None: if self.user_id is not None: return await self.ws.close(4005) @@ -82,15 +89,10 @@ async def handle_IDENTIFY(self, data: dict) -> None: if (session := await S.from_token(token)) is None: return await self.ws.close(4004) - self.id = self.user_id = session.user.id + self.user_id = session.user.id self.is_bot = session.user.is_bot - settings = await session.user.settings - self.cached_presence = await self.gateway.presences.get(self.user_id) or self.cached_presence - if self.cached_presence is None: - self.cached_presence = Presence(self.user_id, settings.status, settings.custom_status, []) - - await self.gateway.authenticated(self, self.cached_presence) + await self.gateway.authenticated(self) await self.esend(ReadyEvent(session.user, self, getCore())) if not session.user.is_bot: guild_ids = [guild.id for guild in await getCore().getUserGuilds(session.user)] @@ -106,7 +108,7 @@ async def handle_RESUME(self, data: dict, new_client: GatewayClient) -> None: return await new_client.ws.close(4004) S = Session if token_type == TokenType.USER else Bot - if (session := await S.from_token(token)) is None or self.user_id.id != session.user.id: + if (session := await S.from_token(token)) is None or self.user_id != session.user.id: return await self.ws.close(4004) self.z = new_client.z @@ -114,30 +116,30 @@ async def handle_RESUME(self, data: dict, new_client: GatewayClient) -> None: setattr(self.ws, "_yepcord_client", self) self.gateway.remove_client(new_client) - await self.gateway.authenticated(self, self.cached_presence) + await self.gateway.authenticated(self) await self.send({"op": GatewayOp.DISPATCH, "t": "READY"}) # noinspection PyUnusedLocal async def handle_HEARTBEAT(self, data: None) -> None: await self.send({"op": GatewayOp.HEARTBEAT_ACK, "t": None, "d": None}) - await self.gateway.presences.set_or_refresh(self.user_id, self.cached_presence) + if self.user_id is not None: + await Presence.filter(user__id=self.user_id).update(updated_at=int(time())) @require_auth async def handle_STATUS(self, data: dict) -> None: - self.cached_presence = await self.gateway.presences.get(self.user_id) or self.cached_presence - if self.cached_presence is None: - settings = await UserSettings.get(id=self.user_id) - self.cached_presence = Presence(self.user_id, settings.status, settings.custom_status, []) - - presence = self.cached_presence + presence, created = await Presence.get_or_create(id=self.user_id, user=await self.get_user()) + presence.updated_at = int(time()) + if created: + settings = await UserSettings.get(user__id=self.user_id) + presence.fill_from_settings(settings) - if (status := data.get("status")) and status in ["online", "idle", "offline", "dnd", "invisible"]: + if (status := data.get("status")) and status in {"online", "idle", "offline", "dnd", "invisible"}: presence.status = status if (activities := data.get("activities")) is not None: presence.activities = activities - await self.gateway.presences.set_or_refresh(self.user_id, presence, overwrite=True) + await presence.save() await self.gateway.ev.presence_update(self.user_id, presence) @require_auth @@ -150,10 +152,11 @@ async def handle_LAZY_REQUEST(self, data: dict) -> None: members = await getCore().getGuildMembers(guild) statuses = {} for member in members: - if presence := await self.gateway.presences.get(member.user.id): - statuses[member.user.id] = presence - else: - statuses[member.user.id] = Presence(member.user.id, "offline", None) + # TODO: rewrite with one query? + #if presence := await self.gateway.presences.get(member.user.id): + # statuses[member.user.id] = presence + # continue + statuses[member.user.id] = None await self.esend(GuildMembersListUpdateEvent( members, await getCore().getGuildMemberCount(guild), @@ -292,7 +295,6 @@ def __init__(self, core: Core): self.broker.subscriber("yepcord_events")(self.mcl_yepcordEventsCallback) self.broker.subscriber("yepcord_sys_events")(self.mcl_yepcordSysEventsCallback) self.store = WsStore() - self.presences = Presences(self) self.ev = GatewayEvents(self) self.redis: Union[Redis, FakeRedis, None] = None @@ -362,20 +364,29 @@ async def add_client(self, ws: Websocket) -> None: setattr(ws, "_yepcord_client", client) await client.send({"op": GatewayOp.HELLO, "t": None, "s": None, "d": {"heartbeat_interval": 45000}}) - async def authenticated(self, client: GatewayClient, presence: Presence) -> None: + async def authenticated(self, client: GatewayClient) -> None: if client.user_id not in self.store.by_user_id: self.store.by_user_id[client.user_id] = set() self.store.by_user_id[client.user_id].add(client) self.store.by_sess_id[client.sid] = client - for member in await GuildMember.filter(user__id=client.user_id).select_related("guild"): + user = await client.get_user() + + for member in await GuildMember.filter(user=user).select_related("guild"): self.store.subscribe(member.guild.id, None, client.user_id) for role in await member.roles.all(): self.store.subscribe(None, role.id, client.user_id) - if presence: + presence, created = await Presence.get_or_create(id=user.id, user=user) + if created or time() - presence.updated_at > Config.GATEWAY_KEEP_ALIVE_DELAY * 10: + settings = await user.settings + presence.fill_from_settings(settings) + + presence.updated_at = int(time()) + await presence.save() + + if presence.public_status != "offline": await self.ev.presence_update(client.user_id, presence) - await self.presences.set_or_refresh(client.user_id, presence) def remove_client(self, client: GatewayClient) -> None: if client in self.store.get(user_id=client.user_id): @@ -403,29 +414,21 @@ async def process(self, ws: Websocket, data: dict): print(f" Unknown op code: {op}") print(f" Data: {data}") - async def disconnect(self, ws: Websocket): + @staticmethod + async def disconnect(ws: Websocket): getattr(ws, "_yepcord_client").disconnect() - async def getFriendsPresences(self, uid: int) -> list[dict]: + @staticmethod + async def getFriendsPresences(user_id: int) -> list[dict[str, ...]]: presences = [] - user = await User.get(id=uid) - friends = await self.core.getRelationships(user) - friends = [int(u["user_id"]) for u in friends if u["type"] == 1] - for friend in friends: - if presence := await self.presences.get(friend): - presences.append({ - "user_id": str(friend), - "status": presence.public_status, - "last_modified": int(time()*1000), - "client_status": {"desktop": presence.public_status} if presence.public_status != "offline" else {}, - "activities": presence.activities - }) + async for rel in Relationship.filter(Q(type=1) & (Q(from_user__id=user_id) | Q(to_user__id=user_id))) \ + .select_related("from_user", "to_user"): + other = rel.other_user(user_id) + if presence := await Presence.get_or_none( + user=other, updated_at__gt=int(time() - Config.GATEWAY_KEEP_ALIVE_DELAY * 1.25) + ): + presences.append(presence.ds_json()) continue - presences.append({ - "user_id": str(friend), - "status": "offline", - "last_modified": int(time()), - "client_status": {}, - "activities": [] - }) + presences.append(Presence.ds_json_offline()) + return presences diff --git a/yepcord/gateway/main.py b/yepcord/gateway/main.py index 3050761..c29700a 100644 --- a/yepcord/gateway/main.py +++ b/yepcord/gateway/main.py @@ -65,8 +65,8 @@ async def ws_gateway(): await gw.add_client(ws) while True: try: - data = await ws.receive() - await gw.process(ws, jloads(data)) + data: dict = await ws.receive_json() + await gw.process(ws, data) except CancelledError: await gw.disconnect(ws) raise diff --git a/yepcord/gateway/presences.py b/yepcord/gateway/presences.py deleted file mode 100644 index 00be5ba..0000000 --- a/yepcord/gateway/presences.py +++ /dev/null @@ -1,84 +0,0 @@ -""" - YEPCord: Free open source selfhostable fully discord-compatible chat - Copyright (C) 2022-2024 RuslanUC - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published - by the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . -""" - -from __future__ import annotations - -from json import loads, dumps -from typing import Optional, TYPE_CHECKING - -from ..yepcord.config import Config - -if TYPE_CHECKING: # pragma: no cover - from .gateway import Gateway - - -class Presence: - def __init__(self, user_id: int, status: str, custom_status: dict = None, activities: list = None) -> None: - self.user_id = user_id - self.status = status - self.activities = activities or [] - if custom_status is not None: - self.activities.append({ - "name": "Custom Status", - "type": 4, - "state": custom_status.get("text"), - "emoji": None, - }) - if "expires_at_ms" in custom_status: - self.activities[-1]["timestamps"] = { - "end": custom_status.get("expires_at_ms"), - } - if "emoji_id" in custom_status or "emoji_name" in custom_status: - self.activities[-1]["emoji"] = { - "emoji_id": custom_status.get("emoji_id"), - "emoji_name": custom_status.get("emoji_name"), - } - - @property - def public_status(self) -> str: - return self.status if self.status != "invisible" else "offline" - - -class Presences: - def __init__(self, gateway: Gateway): - self._gateway = gateway - - #async def _expiration_handler(self, message: dict[str, str]) -> None: - # if "presence_" not in message["data"]: - # return - # user_id = int(message["data"][9:]) - # await self._gateway.ev.presence_update(user_id, Presence(user_id, "offline")) - - async def set_or_refresh(self, user_id: int, presence: Presence = None, overwrite=False): - pipe = self._gateway.redis.pipeline() - await pipe.set( - f"presence_{user_id}", - dumps({ - "status": presence.status if presence else "offline", - "activities": presence.activities if presence else [], - }), - ex=int(Config.GATEWAY_KEEP_ALIVE_DELAY * 1.25), - nx=not overwrite, - ) - await pipe.expire(f"presence_{user_id}", int(Config.GATEWAY_KEEP_ALIVE_DELAY * 1.25)) - await pipe.execute() - - async def get(self, user_id: int) -> Optional[Presence]: - if (presence := await self._gateway.redis.get(f"presence_{user_id}")) is None: - return - return Presence(user_id, **loads(presence)) diff --git a/yepcord/yepcord/models/__init__.py b/yepcord/yepcord/models/__init__.py index 7fd8bfb..de7afca 100644 --- a/yepcord/yepcord/models/__init__.py +++ b/yepcord/yepcord/models/__init__.py @@ -8,6 +8,7 @@ from .mfa_code import MfaCode from .relationship import Relationship from .remote_auth_session import RemoteAuthSession +from .presence import Presence from .channel import Channel from .hidden_dm_channel import HiddenDmChannel diff --git a/yepcord/yepcord/models/_utils.py b/yepcord/yepcord/models/_utils.py index 46313c1..9df3f89 100644 --- a/yepcord/yepcord/models/_utils.py +++ b/yepcord/yepcord/models/_utils.py @@ -43,6 +43,9 @@ def __init__(self, *args, **kwargs): class Model(tortoise.Model): + def update_no_save(self, **kwargs) -> None: + self.update_from_dict(kwargs) + async def update(self, **kwargs) -> None: - await self.update_from_dict(kwargs) + self.update_no_save(**kwargs) await self.save() diff --git a/yepcord/yepcord/models/presence.py b/yepcord/yepcord/models/presence.py new file mode 100644 index 0000000..bc9a997 --- /dev/null +++ b/yepcord/yepcord/models/presence.py @@ -0,0 +1,96 @@ +""" + YEPCord: Free open source selfhostable fully discord-compatible chat + Copyright (C) 2022-2024 RuslanUC + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +""" +from time import time +from typing import Optional + +from tortoise import fields +from tortoise.fields import BigIntField + +import yepcord.yepcord.models as models +from ._utils import Model, ChoicesValidator + + +class Presence(Model): + id: int = BigIntField(pk=True, generated=False) + user: models.User = fields.OneToOneField("models.User") + updated_at: int = fields.IntField(default=lambda: int(time())) + status: str = fields.CharField(max_length=12, default="online", validators=[ + ChoicesValidator({"online", "idle", "offline", "dnd", "invisible"}) + ]) + activities: list = fields.JSONField(default=list) + + def ds_json(self, full: bool = True) -> dict: + data = { + "status": self.public_status, + "client_status": {} if self.is_offline else {"desktop": self.public_status}, + "activities": self.activities, + } + if full: + data["user_id"] = str(self.id) + data["last_modified"] = 0 if self.is_offline else int(self.updated_at * 1000) + + return data + + @property + def is_offline(self) -> bool: + return self.status in {"offline", "invisible"} + + @property + def public_status(self) -> str: + return self.status if self.status != "invisible" else "offline" + + def fill_from_settings(self, settings: models.UserSettings) -> None: + self.status = settings.status + self.activities = [] + if (activity := Presence.activity_from_custom_status(settings.custom_status)) is not None: + self.activities.append(activity) + + @staticmethod + def activity_from_custom_status(custom_status: dict) -> Optional[dict]: + if custom_status is None: + return + activity = { + "name": "Custom Status", + "type": 4, + "state": custom_status.get("text"), + "emoji": None, + } + if "expires_at_ms" in custom_status: + activity["timestamps"] = { + "end": custom_status.get("expires_at_ms"), + } + if "emoji_id" in custom_status or "emoji_name" in custom_status: + activity["emoji"] = { + "emoji_id": custom_status.get("emoji_id"), + "emoji_name": custom_status.get("emoji_name"), + } + + return activity + + @staticmethod + def ds_json_offline(user_id: int = 0, full: bool = True) -> dict: + data = { + "status": "offline", + "client_status": {}, + "activities": [], + } + if full: + data["user_id"] = str(user_id) + data["last_modified"] = 0 + + return data diff --git a/yepcord/yepcord/models/relationship.py b/yepcord/yepcord/models/relationship.py index 1e85332..c20961c 100644 --- a/yepcord/yepcord/models/relationship.py +++ b/yepcord/yepcord/models/relationship.py @@ -17,7 +17,7 @@ """ from __future__ import annotations -from typing import Optional +from typing import Optional, Union from tortoise.expressions import Q from tortoise import fields @@ -128,8 +128,9 @@ class Relationship(Model): to_user: models.User = fields.ForeignKeyField("models.User", related_name="to_user") type: int = fields.IntField(validators=[ChoicesValidator({0, 1, 2})]) - def other_user(self, current_user: models.User) -> models.User: - return self.from_user if self.to_user == current_user else self.to_user + def other_user(self, current_user: Union[models.User, int]) -> models.User: + check = self.to_user.id if isinstance(current_user, int) else self.to_user + return self.from_user if check == current_user else self.to_user def discord_rel_type(self, current_user: models.User) -> Optional[int]: if self.type == RelationshipType.BLOCK and self.from_user.id != current_user.id: