From 03dab6d8299ccad819c6df622d67c916bc79c613 Mon Sep 17 00:00:00 2001 From: Andrei Zbikowski Date: Mon, 10 Apr 2017 22:23:09 -0700 Subject: [PATCH] Voice Send Support (#17) * First pass at voice sending * more voice * Refactor playables a bit, general fixes n stuff * Cleanup * Voice encryption, dep version bump, etc fixes * Remove debugging, don't open a pipe for stderr * Refactor playables This is still a very lose concept, need to think about what the actual differences between encoders and playables are. Also some rough edges in general with the frame/sample calculations. However, this still feels miles ahead of the previous iteration. * Properly reset state when resuming from a pause * rework playables/encoding/etc a bit * Add a proxy, allow for more pipin' * Cleanup, etc * Fix resuming from a pause lerping music timestamp * Fix some incorrect bounds checks, add MemoryBufferedPlayable --- .gitignore | 1 + disco/bot/bot.py | 6 +- disco/gateway/events.py | 2 + disco/voice/__init__.py | 3 + disco/voice/client.py | 135 +++++++++++----- disco/voice/opus.py | 149 +++++++++++++++++ disco/voice/playable.py | 347 ++++++++++++++++++++++++++++++++++++++++ disco/voice/player.py | 122 ++++++++++++++ examples/music.py | 52 ++++++ requirements.txt | 7 +- 10 files changed, 775 insertions(+), 49 deletions(-) create mode 100644 disco/voice/opus.py create mode 100644 disco/voice/playable.py create mode 100644 disco/voice/player.py create mode 100644 examples/music.py diff --git a/.gitignore b/.gitignore index 54cb9741..87a6c02a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ dist/ disco*.egg-info/ docs/_build storage.db +*.dca diff --git a/disco/bot/bot.py b/disco/bot/bot.py index 7693f507..8ad0fca1 100644 --- a/disco/bot/bot.py +++ b/disco/bot/bot.py @@ -355,10 +355,10 @@ def on_message_create(self, event): if event.message.author.id == self.client.state.me.id: return - if self.config.commands_allow_edit: - self.last_message_cache[event.message.channel_id] = (event.message, False) + result = self.handle_message(event.message) - self.handle_message(event.message) + if self.config.commands_allow_edit: + self.last_message_cache[event.message.channel_id] = (event.message, result) def on_message_update(self, event): if self.config.commands_allow_edit: diff --git a/disco/gateway/events.py b/disco/gateway/events.py index 5033b031..9d9d323e 100644 --- a/disco/gateway/events.py +++ b/disco/gateway/events.py @@ -49,6 +49,8 @@ def create(cls, obj, client): """ Create this GatewayEvent class from data and the client. """ + cls.raw_data = obj + # If this event is wrapping a model, pull its fields if hasattr(cls, '_wraps_model'): alias, model = cls._wraps_model diff --git a/disco/voice/__init__.py b/disco/voice/__init__.py index e69de29b..b4a7f6c5 100644 --- a/disco/voice/__init__.py +++ b/disco/voice/__init__.py @@ -0,0 +1,3 @@ +from disco.voice.client import * +from disco.voice.player import * +from disco.voice.playable import * diff --git a/disco/voice/client.py b/disco/voice/client.py index 69fe9d51..d0d3e092 100644 --- a/disco/voice/client.py +++ b/disco/voice/client.py @@ -3,6 +3,8 @@ import struct import time +import nacl.secret + from holster.enum import Enum from holster.emitter import Emitter @@ -22,11 +24,6 @@ VOICE_CONNECTED=6, ) -# TODO: -# - player implementation -# - encryption -# - cleanup - class VoiceException(Exception): def __init__(self, msg, client): @@ -38,12 +35,40 @@ class UDPVoiceClient(LoggingClass): def __init__(self, vc): super(UDPVoiceClient, self).__init__() self.vc = vc + + # The underlying UDP socket self.conn = None + + # Connection information self.ip = None self.port = None + self.run_task = None self.connected = False + def send_frame(self, frame, sequence=None, timestamp=None): + # Convert the frame to a bytearray + frame = bytearray(frame) + + # First, pack the header (TODO: reuse bytearray?) + header = bytearray(24) + header[0] = 0x80 + header[1] = 0x78 + struct.pack_into('>H', header, 2, sequence or self.vc.sequence) + struct.pack_into('>I', header, 4, timestamp or self.vc.timestamp) + struct.pack_into('>i', header, 8, self.vc.ssrc) + + # Now encrypt the payload with the nonce as a header + raw = self.vc.secret_box.encrypt(bytes(frame), bytes(header)).ciphertext + + # Send the header (sans nonce padding) plus the payload + self.send(header[:12] + raw) + + # Increment our sequence counter + self.vc.sequence += 1 + if self.vc.sequence >= 65535: + self.vc.sequence = 0 + def run(self): while True: self.conn.recvfrom(4096) @@ -54,26 +79,29 @@ def send(self, data): def disconnect(self): self.run_task.kill() - def connect(self, host, port, timeout=10): + def connect(self, host, port, timeout=10, addrinfo=None): self.ip = socket.gethostbyname(host) self.port = port self.conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - # Send discovery packet - packet = bytearray(70) - struct.pack_into('>I', packet, 0, self.vc.ssrc) - self.send(packet) + if addrinfo: + ip, port = addrinfo + else: + # Send discovery packet + packet = bytearray(70) + struct.pack_into('>I', packet, 0, self.vc.ssrc) + self.send(packet) - # Wait for a response - try: - data, addr = gevent.spawn(lambda: self.conn.recvfrom(70)).get(timeout=timeout) - except gevent.Timeout: - return (None, None) + # Wait for a response + try: + data, addr = gevent.spawn(lambda: self.conn.recvfrom(70)).get(timeout=timeout) + except gevent.Timeout: + return (None, None) - # Read IP and port - ip = str(data[4:]).split('\x00', 1)[0] - port = struct.unpack(' OpusPlayable +# FFMpegInput.youtube_dl('youtube.com/yolo').pipe(DCADOpusEncoder) => OpusPlayable +# FFMpegInput.youtube_dl('youtube.com/yolo').pipe(OpusEncoder).pipe(DuplexStream, open('cache_file.opus', 'w')) => OpusPlayable + + +class AbstractOpus(object): + def __init__(self, sampling_rate=48000, frame_length=20, channels=2): + self.sampling_rate = sampling_rate + self.frame_length = frame_length + self.channels = 2 + self.sample_size = 2 * self.channels + self.samples_per_frame = int(self.sampling_rate / 1000 * self.frame_length) + self.frame_size = self.samples_per_frame * self.sample_size + + +class BaseUtil(object): + def pipe(self, other, *args, **kwargs): + child = other(self, *args, **kwargs) + setattr(child, 'metadata', self.metadata) + setattr(child, '_parent', self) + return child + + @property + def metadata(self): + return self._metadata + + @metadata.setter + def metadata(self, value): + self._metadata = value + + +@six.add_metaclass(abc.ABCMeta) +class BasePlayable(BaseUtil): + @abc.abstractmethod + def next_frame(self): + raise NotImplementedError + + +@six.add_metaclass(abc.ABCMeta) +class BaseInput(BaseUtil): + @abc.abstractmethod + def read(self, size): + raise NotImplementedError + + @abc.abstractmethod + def fileobj(self): + raise NotImplementedError + + +class OpusFilePlayable(BasePlayable, AbstractOpus): + """ + An input which reads opus data from a file or file-like object. + """ + def __init__(self, fobj, *args, **kwargs): + super(OpusFilePlayable, self).__init__(*args, **kwargs) + self.fobj = fobj + self.done = False + + def next_frame(self): + if self.done: + return None + + header = self.fobj.read(OPUS_HEADER_SIZE) + if len(header) < OPUS_HEADER_SIZE: + self.done = True + return None + + data_size = struct.unpack('0]/bestaudio/best'}) + + if self._url: + obj = ydl.extract_info(self._url, download=False, process=False) + if 'entries' in obj: + self._ie_info = obj['entries'] + else: + self._ie_info = [obj] + + self._info = ydl.process_ie_result(self._ie_info, download=False) + return self._info + + @property + def _metadata(self): + return self.info + + @classmethod + def many(cls, url, *args, **kwargs): + import youtube_dl + + ydl = youtube_dl.YoutubeDL({'format': 'webm[abr>0]/bestaudio/best'}) + info = ydl.extract_info(url, download=False, process=False) + + if 'entries' not in info: + yield cls(ie_info=info, *args, **kwargs) + raise StopIteration + + for item in info['entries']: + yield cls(ie_info=item, *args, **kwargs) + + @property + def source(self): + return self.info['url'] + + +class BufferedOpusEncoderPlayable(BasePlayable, AbstractOpus, OpusEncoder): + def __init__(self, source, *args, **kwargs): + self.source = source + self.frames = Queue(kwargs.pop('queue_size', 4096)) + super(BufferedOpusEncoderPlayable, self).__init__(*args, **kwargs) + gevent.spawn(self._encoder_loop) + + def _encoder_loop(self): + while self.source: + raw = self.source.read(self.frame_size) + if len(raw) < self.frame_size: + break + + self.frames.put(self.encode(raw, self.samples_per_frame)) + gevent.idle() + self.source = None + + def next_frame(self): + if not self.source: + return None + return self.frames.get() + + +class DCADOpusEncoderPlayable(BasePlayable, AbstractOpus, OpusEncoder): + def __init__(self, source, *args, **kwargs): + self.source = source + self.command = kwargs.pop('command', 'dcad') + super(DCADOpusEncoderPlayable, self).__init__(*args, **kwargs) + + self._done = False + self._proc = None + + @property + def proc(self): + if not self._proc: + source = obj = self.source.fileobj() + if not hasattr(obj, 'fileno'): + source = subprocess.PIPE + + self._proc = subprocess.Popen([ + self.command, + '--channels', str(self.channels), + '--rate', str(self.sampling_rate), + '--size', str(self.samples_per_frame), + '--bitrate', '128', + '--fec', + '--packet-loss-percent', '30', + '--input', 'pipe:0', + '--output', 'pipe:1', + ], stdin=source, stdout=subprocess.PIPE) + + def writer(): + while True: + data = obj.read(2048) + if len(data) > 0: + self._proc.stdin.write(data) + if len(data) < 2048: + break + + if source == subprocess.PIPE: + gevent.spawn(writer) + return self._proc + + def next_frame(self): + if self._done: + return None + + header = self.proc.stdout.read(OPUS_HEADER_SIZE) + if len(header) < OPUS_HEADER_SIZE: + self._done = True + return + + size = struct.unpack('') + def on_play(self, event, url): + item = FFmpegInput.youtube_dl(url).pipe(DCADOpusEncoderPlayable) + self.get_player(event.guild.id).queue.put(item) + + @Plugin.command('pause') + def on_pause(self, event): + self.get_player(event.guild.id).pause() + + @Plugin.command('resume') + def on_resume(self, event): + self.get_player(event.guild.id).resume() diff --git a/requirements.txt b/requirements.txt index 3b0894b0..ed9a25c2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ -gevent==1.1.2 +gevent==1.2.1 holster==1.0.11 inflection==0.3.1 -requests==2.11.1 +requests==2.13.0 six==1.10.0 -websocket-client==0.37.0 +websocket-client==0.40.0 +pynacl==1.1.2