Skip to content

Commit

Permalink
Users Cards and Management (#15)
Browse files Browse the repository at this point in the history
* Fill users page, select TTS voice

* Session management page, fix threaded event triggers

* party card view

* Task queue loop fix

* fix queue reset on start

* fix queue number reset

* fix for queue reset
  • Loading branch information
WolfwithSword authored Jan 9, 2025
1 parent 10aac7a commit edae251
Show file tree
Hide file tree
Showing 17 changed files with 495 additions and 62 deletions.
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ twitchAPI==4.4.0
customtkinter==5.2.2
Quart==0.20.0
SQLAlchemy==2.0.36
aiosqlite==0.20.0
aiosqlite==0.20.0
pillow==11.1.0
requests==2.32.3
3 changes: 2 additions & 1 deletion src/chatdnd/events/chat_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
chat_on_channel_fetch = Event()
chat_bot_on_connect = Event()

chat_say_command = Event()
chat_say_command = Event()
chat_on_join_queue = Event()
16 changes: 11 additions & 5 deletions src/chatdnd/session_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import random

from data import Session, Member

from data import Session, Member, SessionState
from custom_logger.logger import logger

class SessionManager():
Expand All @@ -12,10 +12,16 @@ def join_queue(self, member: Member):

def start_session(self, party_size: int = 4) -> bool:
if len(self.session.queue) < party_size:
return False
return False
self.session.state = SessionState.STARTED
self.session.party.clear()
self.session.party.update(random.sample(self.session.queue, party_size))
self.session.party.update(random.sample(sorted(self.session.queue), party_size))
self.session.queue.clear()
return True

def end(self):
self.session.clear()
self.session.clear()
self.session.state = SessionState.NONE

def open(self):
self.session.state = SessionState.OPEN
2 changes: 1 addition & 1 deletion src/custom_logger/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class CustomLogger:
def __init__(self, name):
self.logger = logging.getLogger(name)
debug_mode = os.environ['TCDND_DEBUG_MODE'] == '1'
self.logger.setLevel(logging.DEBUG if debug_mode else logging.INFO) # TODO env var
self.logger.setLevel(logging.DEBUG if debug_mode else logging.INFO)

self.log_queue = Queue()

Expand Down
4 changes: 2 additions & 2 deletions src/data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from data.member import Member
from data.session import Session
from data.session import Session, SessionState

__all__ = [Member, Session]
__all__ = [Member, Session, SessionState]
45 changes: 42 additions & 3 deletions src/data/member.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column
from sqlalchemy import String, Integer, JSON
from sqlalchemy import String, Integer, JSON, asc
from sqlalchemy.future import select

from data.base import Base

Expand Down Expand Up @@ -37,7 +38,13 @@ def __hash__(self):


def __repr__(self):
return f"Member(name='{self.name})"
return f"Member(name='{self.name}')"

def __lt__(self, other):
return self.name < other.name

def __gt__(self, other):
return self.name > other.name


async def create_or_get_member(name: str, pfp_url: str = "") -> Member:
Expand All @@ -56,16 +63,48 @@ async def _upsert_member(name: str, pfp_url: str) -> Member:
# Update existing member
if member.pfp_url != pfp_url:
member.pfp_url = pfp_url
await session.commit()
return member
else:
# Create new member
new_member = Member(name=name, pfp_url=pfp_url)
session.add(new_member)
return new_member

async def update_tts(member: Member, preferred_tts: str = ""):
async with async_session() as session:
async with session.begin():
member_in_db = await session.get(Member, member.id)
if member_in_db:
member_in_db.preferred_tts = preferred_tts
await session.commit()


async def fetch_member(name: str) -> Member | None:
name = name.lower()
async with async_session() as session:
query = select(Member).where(Member.name == name)
result = await session.execute(query)
return result.scalars().first()
return result.scalars().first()


async def fetch_paginated_members(page: int, per_page: int=20,
exclude_names: list[str] = None,
name_filter: str = None) -> list[Member]:
if not exclude_names:
exclude_names = []

async with async_session() as session:
query = select(Member).order_by(asc(Member.name))

if exclude_names:
query = query.where(Member.name.notin_([name.lower()] for name in exclude_names))

if name_filter:
query = query.where(Member.name.like(f"%{name_filter.lower()}%"))

offset = (page -1) * per_page
query = query.offset(offset).limit(per_page)

result = await session.execute(query)
return result.scalars().all()
9 changes: 9 additions & 0 deletions src/data/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,24 @@

from data.member import Member

from enum import Enum, auto

class SessionState(Enum):
NONE = auto()
OPEN = auto()
STARTED = auto()

class Session():

def __init__(self):
self.queue: Set[Member] = set()
self.party: Set[Member] = set()
self.state: SessionState = SessionState.NONE

def join_queue(self, member: Member):
self.queue.add(member)

def clear(self):
self.queue.clear()
self.party.clear()
self.state: SessionState = SessionState.NONE
7 changes: 6 additions & 1 deletion src/helpers/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import threading

from custom_logger.logger import logger

# Main sets this at startup. It's cursed, but it works.
_task_queue = None

class Event:
def __init__(self):
self.__listeners = []
Expand Down Expand Up @@ -45,13 +49,14 @@ def trigger(self, args = None):
func(*args)
except RuntimeError as e:
if "main thread is not in main loop" in str(e):
logger.error(f"Error: {e}. This is likely to occur when a trigger is fired from a separate thread to the UI thread.")
_task_queue.put((func, *args))
continue
else:
raise
except Exception as e:
raise


async def _run_async_func(self, func, *args):
try:
await func(*args)
Expand Down
30 changes: 28 additions & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ def parse_args():
args = parse_args()
os.environ['TCDND_DEBUG_MODE'] = '1' if args.debug else '0'

from queue import Queue
_tasks = Queue()
import helpers.event as _event_module
setattr(sys.modules[_event_module.__name__], '_task_queue', _tasks)

from twitch.utils import TwitchUtils
from twitch.chat import ChatController
from twitchAPI.type import TwitchAuthorizationException
Expand Down Expand Up @@ -45,6 +50,7 @@ def parse_args():

APP_RUNNING = True


async def run_twitch():

async def try_setup():
Expand Down Expand Up @@ -131,13 +137,33 @@ async def run_ui():
sys.exit(0)
# app.mainloop()

async def run_queued_tasks():
while APP_RUNNING:
try:
callback = None
args = None
if _tasks.empty():
await asyncio.sleep(0.5)
else:
items = _tasks.get(False)
callback = items[0]
if len(items) > 1:
args = items[1:]
callback(*args)
else:
callback()
except Exception as e:
logger.error(f"Error in queued task: {callback} ({args}) - {e}")


async def run_all():
tasks = [
asyncio.create_task(initialize_database()),
asyncio.create_task(initialize_database(), name="DB-Setup"),
asyncio.create_task(run_server(), name="Server"),
asyncio.create_task(run_twitch(), name="Twitch"),
asyncio.create_task(run_ui(), name="UI"),
asyncio.create_task(run_twitch_bot(), name="Twitch-Bot")
asyncio.create_task(run_twitch_bot(), name="Twitch-Bot"),
asyncio.create_task(run_queued_tasks(), name="Task-Queue")
]

try:
Expand Down
15 changes: 8 additions & 7 deletions src/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from quart import Quart, redirect, request, jsonify, websocket, render_template, send_from_directory, Response
import os, sys

from data import Member
from tts import LocalTTS
from helpers import TCDNDConfig as Config
from custom_logger.logger import logger
Expand Down Expand Up @@ -40,10 +41,10 @@ async def audio_stream():
logger.debug("ws opened")
while True:
if not message_queue.empty():
message = await message_queue.get()
logger.info(f"saying {message}")
member, message = await message_queue.get()
logger.info(f"saying {message} from {member}")

async for chunk in self.tts.get_stream(message):
async for chunk in self.tts.get_stream(message, '' if not member else member.preferred_tts):
await asyncio.wait_for(websocket.send(chunk), timeout=10)
await asyncio.sleep(0.1)
except Exception as e:
Expand All @@ -64,7 +65,7 @@ async def trigger_tts():
return jsonify({"error": "Text is required"}), 400

if clients:
await message_queue.put(text)
await message_queue.put((None, text))
return jsonify({"status": "success", "message": "Text sent to WebSocket for TTS."})
else:
return jsonify({"error": "No active WebSocket connection."}), 400
Expand All @@ -73,11 +74,11 @@ async def trigger_tts():
async def overlay():
return await send_from_directory(STATIC_DIR, 'overlay.html')

async def chat_say(self, username: str, text: str):
logger.info(f"TTS saying '{text}' from {username}")
async def chat_say(self, member: Member, text: str):
logger.info(f"TTS saying '{text}' from {'UNKNOWN' if not member else member.name}")
# If client was connected but dc'd, this can revive connection when it runs. But also, we don't want things to queue up forever... Might not be a problem, needs hard testing later
# But cannot simply do an if check here for clients
await message_queue.put(text)
await message_queue.put((member, text))


async def run_task(self, host="0.0.0.0", **kwargs):
Expand Down
40 changes: 33 additions & 7 deletions src/tts/local_tts.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import threading
import base64, io, struct
import pyttsx4

from tts.tts import TTS

from helpers import TCDNDConfig as Config
from custom_logger.logger import logger

Expand Down Expand Up @@ -32,37 +35,60 @@ def create_wav_header(sample_rate, bits_per_sample, num_channels, data_size):
)
return header

class LocalTTS(): # TODO Refactor with some inheritance from a TTS class, so we can abstract both Local and Cloud TTS later on
class LocalTTS(TTS): # TODO Refactor with some inheritance from a TTS class, so we can abstract both Local and Cloud TTS later on
def __init__(self, config: Config):
self.config = config
super().__init__(config=None)

self.sample_rate = 22050 # PyTTS default
self.bits_per_sample = 16
self.num_channels = 1

self.max_chunk_size = 1024*8*8*2 # 128kb

def audio_stream_generator(self, text="Hello World!"):
engine = pyttsx4.init()
for v in engine.getProperty('voices'):
self.voices.setdefault(v.name, v.id)


def audio_stream_generator(self, text="Hello World!", voice: str = ''):
engine = pyttsx4.init() # We are using the fork for x4 as it works with outputting to bytesIO
output = io.BytesIO()

if voice and voice in self.get_voices().keys():
engine.setProperty('voice', self.get_voices()[voice])

# TODO Uses default tts on at the moment. Can configure later
engine.setProperty('rate', 150) # Speed of speech
engine.setProperty('volume', 1) # Volume level (0.0 to 1.0)

engine.save_to_file(text, output)
engine.runAndWait()
_th = threading.Thread(target=engine.runAndWait)
_th.daemon = True
_th.start()
_th.join()

output.seek(0)

return output

async def get_stream(self, text="Hello World!"):
output = self.audio_stream_generator(text)
async def get_stream(self, text="Hello World!", voice: str = ''):
output = self.audio_stream_generator(text, voice)
header = create_wav_header(self.sample_rate, self.bits_per_sample, self.num_channels, len(output.getvalue()))
chunk_size = min(self.max_chunk_size, len(output.getvalue()))
chunk = output.read(chunk_size)

while chunk:
await asyncio.sleep((len(chunk) / (self.sample_rate * self.num_channels * (self.bits_per_sample // 8))))
yield header + chunk
chunk = output.read(chunk_size)
chunk = output.read(chunk_size)

def test_speak(self, text:str ="Hello there. How are you?", voice:str = None):
def _run(text, voice):
engine = pyttsx4.init()
if voice and voice in self.voices.keys():
engine.setProperty('voice', self.voices.get(voice))
engine.say(text)
engine.runAndWait()
thread = threading.Thread(target=_run, args=(text, voice))
thread.daemon = True
thread.start()
12 changes: 12 additions & 0 deletions src/tts/tts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from helpers import TCDNDConfig as Config
from custom_logger.logger import logger

class TTS():

voices: dict = dict()

def __init__(self, config: Config):
self.config = config

def get_voices(self) -> dict:
return self.voices
Loading

0 comments on commit edae251

Please sign in to comment.