diff --git a/.github/workflows/python-test.yml b/.github/workflows/python-test.yml index 2beef9e..37cc953 100644 --- a/.github/workflows/python-test.yml +++ b/.github/workflows/python-test.yml @@ -8,17 +8,22 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9] + python-version: ["3.8", "3.9", "3.10"] steps: - - uses: actions/checkout@v2 + - name: Install APT dependencies + run: | + sudo apt-get install -y gir1.2-gst-plugins-bad-1.0 gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-nice gobject-introspection libgirepository1.0-dev - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v3 with: python-version: ${{ matrix.python-version }} - - name: Install dependencies + - name: Install Python dependencies run: | python -m pip install --upgrade pip pip install tox + - uses: actions/checkout@v3 - name: Lint run: tox -e linters + - name: Test + run: tox -e tests diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index a4694e5..0000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,51 +0,0 @@ -stages: - - quality-assurance - - testing - - deploy - -# Cache pip -variables: - PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip" - -cache: - paths: - - .cache/pip - -linters: - image: python:3.9 - stage: quality-assurance - before_script: - - pip install tox - script: - - tox -e linters - - # Be nice to new contributors, but please use `tox` before commit - allow_failure: true - -tests-debian-10: - image: debian:10 - stage: testing - before_script: - - apt-get update && apt-get install -y tox python3-gi python3-gi-cairo python3-websockets gir1.2-gst-plugins-bad-1.0 gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-nice - script: - - tox -e tests - -tests-archlinux: - image: archlinux:latest - stage: testing - before_script: - - pacman -Sy --noconfirm python-setuptools python-tox python-websockets python-gobject gobject-introspection gst-python gst-plugins-base gst-plugins-bad gst-plugins-ugly gst-libav - script: - - tox -e tests - -deploy: - image: python:3.9 - stage: deploy - before_script: - - pip install build twine - script: - - python -m build - - TWINE_PASSWORD=${PIPY_TOKEN} TWINE_USERNAME=__token__ python -m twine upload --repository pypi dist/* - only: - refs: - - tags diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md deleted file mode 100644 index 81972db..0000000 --- a/CONTRIBUTING.md +++ /dev/null @@ -1,12 +0,0 @@ -Contributing ------------- - -1. Fork the repository and clone it locally. -2. Install `tox` testing framework: `pip install tox`. -3. Go to the root of project and run: `tox`. - It should be successful. If it is not, something is wrong. -4. Make changes to code, documentation, etc. -5. Run `tox` to apply checks. If it is not successful, please fix it or ask - advices. -6. Commit changes to your forked repository. -7. Submit a Pull Request on GitHub. diff --git a/MANIFEST.in b/MANIFEST.in index 4883011..b3ba2ec 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,5 +1,5 @@ # Include documentation recursive-include docs *.png *.conf Vagrantfile* -# Include metadata files -include .gitlab-ci.yml CONTRIBUTING.md +# Include entrypoint +include galene-stream.py diff --git a/README.md b/README.md index cbdcfba..82ced9a 100644 --- a/README.md +++ b/README.md @@ -4,78 +4,47 @@ Gateway to send streams such as RTMP or SRT to [Galène videoconference server](https://galene.org/). It is based on Gstreamer and implements the Galène protocol. -Tested on Debian Bullseye, Ubuntu 20.04, Ubuntu 20.10, ArchLinux and NixOS 20.09. - **This project is still not production ready, and you might experience jittering and crashes.** ![Streaming from OBS to Galène, video background from KaMy Video Stock](./docs/demo.png) -## User guide +## Installation Real-time video conversion requires resources. If many users are going to use this gateway simultaneously, you should scale your machine resources accordingly. -### Installation on Debian/Ubuntu - -```bash -sudo apt install python3-pip python3-gi python3-gi-cairo python3-websockets gir1.2-gst-plugins-bad-1.0 gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-nice -pip3 install --user galene-stream -``` - -### Installation on ArchLinux - -```bash -sudo pacman -S python-setuptools python-pip python-websockets python-gobject gobject-introspection gst-python gst-plugins-base gst-plugins-bad gst-plugins-ugly gst-libav -pip install --user galene-stream -``` +Installation works on Ubuntu 20.10 and Debian Bullseye or any later version. -### Installation from source code using Python Virtualenv +For Windows users, we recommend to use +[Windows Subsystem for Linux](https://learn.microsoft.com/en-us/windows/wsl/install). -Start by cloning the source code, +### Dependencies ```bash -git clone https://github.com/erdnaxe/galene-stream -cd galene-stream -``` +# On Debian/Ubuntu-based distributions +sudo apt install python3-gi python3-gi-cairo python3-websockets gir1.2-gst-plugins-bad-1.0 gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-nice -Then create a Python VirtualEnv and install galene-stream inside, +# On ArchLinux-based distributions +sudo pacman -S python-setuptools python-pip python-websockets python-gobject gobject-introspection gst-python gst-plugins-base gst-plugins-bad gst-plugins-ugly gst-libav -```bash -python -m venv venv --system-site-packages -source venv/bin/activate -pip install -e . +# On NixOS +nix-shell -p gobject-introspection -p gst_all_1.gst-libav -p gst_all_1.gst-plugins-bad -p gst_all_1.gst-plugins-base -p gst_all_1.gst-plugins-good -p gst_all_1.gst-plugins-ugly -p libnice -p python3 -p python3Packages.gst-python -p python3Packages.pygobject3 -p python3Packages.websockets ``` -### Installation on Windows - -*Running the gateway on Windows is not tested and not recommended.* - -Go to and follow the instructions to set up a MSYS2 -environment. Then run `C:\msys64\mingw64.exe`, you should have a terminal -window. Then execute, - -```bash -# Update MSYS2 -pacman -Suy - -# Install Python3 and GStreamer -pacman -S mingw-w64-x86_64-python mingw-w64-x86_64-gcc mingw-w64-x86_64-python-pip mingw-w64-x86_64-python-gobject mingw-w64-x86_64-gst-python mingw-w64-x86_64-gst-plugins-base mingw-w64-x86_64-gst-plugins-good mingw-w64-x86_64-gst-plugins-bad mingw-w64-x86_64-gst-plugins-ugly mingw-w64-x86_64-gst-libav -pip install galene-stream - -python -m galene_stream --help -``` +Then you should be able to either run `./galene-stream.py` in this repository, +or install it using pip. ### Configuration for UDP streaming Launch the gateway using: ``` -galene-stream --input "udp://localhost:8888" --output "wss://galene.example.com/ws" --group test --username bot +galene-stream --input "udp://127.0.0.1:8888" --output "wss://galene.example.com/ws" --group test --username bot ``` -Then you can stream to `udp://localhost:8888` with no stream key. +Then you can stream to `udp://127.0.0.1:8888` with no stream key. ### Configuration for RTMP streaming @@ -96,7 +65,7 @@ nginx -c nginx.conf -p $PWD You may launch the gateway after the NGINX server using: ``` -galene-stream --input "rtmp://localhost:1935/live/test" --output "wss://galene.example.com/ws" --group test --username bot +galene-stream --input "rtmp://127.0.0.1:1935/live/test" --output "wss://galene.example.com/ws" --group test --username bot ``` Then you can stream to `rtmp://127.0.0.1:1935/live` with stream key `test`. @@ -113,10 +82,10 @@ On Windows and MacOS, OBS comes with his own FFMpeg that will work. Launch the gateway using: ``` -galene-stream --input "srt://localhost:9710?mode=listener" --output "wss://galene.example.com/ws" --group test --username bot +galene-stream --input "srt://127.0.0.1:9710?mode=listener" --output "wss://galene.example.com/ws" --group test --username bot ``` -Then you can stream to `srt://localhost:9710` with no stream key. +Then you can stream to `srt://127.0.0.1:9710` with no stream key. More information on [OBS Wiki, Streaming With SRT Or RIST Protocols](https://obsproject.com/wiki/Streaming-With-SRT-Or-RIST-Protocols). @@ -130,7 +99,8 @@ galene-stream --input "file://source.webm" --output "wss://galene.example.com/ws ## Contributing -See [contributing guidelines](./CONTRIBUTING.md). +We welcome contributions that stays in the scope of this project. +Please format your code using `black` and test it using `pytest`. ### Collecting statistics about GStreamer WebRTC element @@ -159,18 +129,11 @@ For example, `export GST_DEBUG_DUMP_DOT_DIR=.`. Then you can use GraphViz to generate an image from the dot file: `dot -Tpng pipeline.dot > pipeline.png`. -## Authors +## License -This gateway is currently developed by members from -[Crans](https://www.crans.org/) +This gateway is developed by former members of [Crans](https://www.crans.org/) and [Aurore](https://auro.re/) network organizations to build a self-hosted -free and open-source streaming server. - -Main contributors: - -- Alexandre Iooss - -## License +free and open-source streaming server based on [Galène](https://galene.org/). We believe in open source software. This project is licensed under [MIT](./LICENSE.txt). diff --git a/galene-stream.py b/galene-stream.py new file mode 100755 index 0000000..fa2989e --- /dev/null +++ b/galene-stream.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +# Copyright (C) 2022 Alexandre Iooss +# SPDX-License-Identifier: MIT + +""" +Entrypoint for Python module +""" + +from galene_stream.cli import main + +if __name__ == "__main__": + main() diff --git a/galene_stream/__main__.py b/galene_stream/__main__.py index 33b929f..c2b5bce 100644 --- a/galene_stream/__main__.py +++ b/galene_stream/__main__.py @@ -1,105 +1,11 @@ -# Copyright (C) 2021 Alexandre Iooss +# Copyright (C) 2021-2022 Alexandre Iooss # SPDX-License-Identifier: MIT """ -Main script for Galène stream gateway. +Entrypoint for Python module """ -import argparse -import asyncio -import logging -import sys - -from galene_stream.galene import GaleneClient - - -def start(opt: argparse.Namespace): - """Init Galène client and start gateway - - :param opt: program options - :type opt: argparse.Namespace - """ - client = GaleneClient( - opt.input, opt.output, opt.bitrate, opt.group, opt.username, opt.password - ) - - # Connect and run main even loop - event_loop = asyncio.get_event_loop() - event_loop.run_until_complete(client.connect()) - try: - event_loop.run_until_complete(client.loop(event_loop)) - event_loop.run_until_complete(client.close()) - except KeyboardInterrupt: - event_loop.run_until_complete(client.close()) - sys.exit(1) - - -def main(): - """Entrypoint.""" - # Arguments parser - parser = argparse.ArgumentParser( - prog="galene-stream", - description="Galène stream gateway.", - ) - parser.add_argument( - "--debug", - action="store_true", - default=False, - help="debug mode: show debug messages", - ) - parser.add_argument( - "-i", - "--input", - required=True, - help=( - 'URI to use as GStreamer "uridecodebin" module input, ' - 'e.g. "rtmp://localhost:1935/live/test"' - ), - ) - parser.add_argument( - "-o", - "--output", - required=True, - help='Galène server to connect to, e.g. "wss://galene.example.com/ws"', - ) - parser.add_argument( - "-b", - "--bitrate", - default=1048576, - help="VP8 encoder bitrate in bit/s, you should adapt this to your network, default to 1048576", - ) - parser.add_argument( - "-g", - "--group", - required=True, - help="Join this group", - ) - parser.add_argument( - "-u", - "--username", - required=True, - help="Group username", - ) - parser.add_argument( - "-p", - "--password", - help="Group password", - ) - options = parser.parse_args() - - # Configure logging - level = logging.DEBUG if options.debug else logging.INFO - logging.addLevelName(logging.INFO, "\033[1;36mINFO\033[1;0m") - logging.addLevelName(logging.WARNING, "\033[1;33mWARNING\033[1;0m") - logging.addLevelName(logging.ERROR, "\033[1;91mERROR\033[1;0m") - logging.addLevelName(logging.DEBUG, "\033[1;30mDEBUG") - logging.basicConfig( - level=level, - format="\033[90m%(asctime)s\033[1;0m [%(name)s] %(levelname)s %(message)s\033[1;0m", - ) - - start(options) - +from .cli import main if __name__ == "__main__": main() diff --git a/galene_stream/cli.py b/galene_stream/cli.py new file mode 100644 index 0000000..ccc0271 --- /dev/null +++ b/galene_stream/cli.py @@ -0,0 +1,101 @@ +# Copyright (C) 2021 Alexandre Iooss +# SPDX-License-Identifier: MIT + +""" +Main command-line script for Galène stream gateway. +""" + +import argparse +import asyncio +import logging +import sys + +from galene_stream.galene import GaleneClient + + +def start(opt: argparse.Namespace): + """Init Galène client and start gateway + + :param opt: program options + :type opt: argparse.Namespace + """ + client = GaleneClient( + opt.input, opt.output, opt.bitrate, opt.group, opt.username, opt.password + ) + + # Connect and run main even loop + event_loop = asyncio.get_event_loop() + event_loop.run_until_complete(client.connect()) + try: + event_loop.run_until_complete(client.loop(event_loop)) + event_loop.run_until_complete(client.close()) + except KeyboardInterrupt: + event_loop.run_until_complete(client.close()) + sys.exit(1) + + +def main(): + """Entrypoint.""" + # Arguments parser + parser = argparse.ArgumentParser( + prog="galene-stream", + description="Galène stream gateway.", + ) + parser.add_argument( + "--debug", + action="store_true", + default=False, + help="debug mode: show debug messages", + ) + parser.add_argument( + "-i", + "--input", + required=True, + help=( + 'URI to use as GStreamer "uridecodebin" module input, ' + 'e.g. "rtmp://localhost:1935/live/test"' + ), + ) + parser.add_argument( + "-o", + "--output", + required=True, + help='Galène server to connect to, e.g. "wss://galene.example.com/ws"', + ) + parser.add_argument( + "-b", + "--bitrate", + default=1048576, + help="VP8 encoder bitrate in bit/s, you should adapt this to your network, default to 1048576", + ) + parser.add_argument( + "-g", + "--group", + required=True, + help="Join this group", + ) + parser.add_argument( + "-u", + "--username", + required=True, + help="Group username", + ) + parser.add_argument( + "-p", + "--password", + help="Group password", + ) + options = parser.parse_args() + + # Configure logging + level = logging.DEBUG if options.debug else logging.INFO + logging.addLevelName(logging.INFO, "\033[1;36mINFO\033[1;0m") + logging.addLevelName(logging.WARNING, "\033[1;33mWARNING\033[1;0m") + logging.addLevelName(logging.ERROR, "\033[1;91mERROR\033[1;0m") + logging.addLevelName(logging.DEBUG, "\033[1;30mDEBUG") + logging.basicConfig( + level=level, + format="\033[90m%(asctime)s\033[1;0m [%(name)s] %(levelname)s %(message)s\033[1;0m", + ) + + start(options) diff --git a/galene_stream/galene.py b/galene_stream/galene.py index 4676b4f..84f1041 100644 --- a/galene_stream/galene.py +++ b/galene_stream/galene.py @@ -8,6 +8,7 @@ import json import logging import secrets +from typing import List import websockets @@ -26,10 +27,8 @@ def __init__( bitrate: int, group: str, username: str, - password=None, - identifier=None, - ice_servers=[], - ): + password: str = "", + ) -> None: """Create GaleneClient :param input_uri: URI for GStreamer uridecodebin @@ -44,38 +43,32 @@ def __init__( :type username: str :param password: group user password if required :type password: str, optional - :param identifier: client id, defaults to random - :type identifier: str, optional - :param ice_servers: TURN/STUN servers to use, default to those announced - by the server - :type ice_servers: [str] """ - super().__init__() - if identifier is None: - # Create random client id - identifier = secrets.token_bytes(16).hex() - self.server = server self.group = group self.username = username self.password = password - self.client_id = identifier + self.conn = None - self.ice_servers = None + self.ice_servers: List[str] = [] + self.client_id = secrets.token_bytes(16).hex() self.webrtc = WebRTCClient( input_uri, bitrate, self.send_sdp_offer, self.send_ice_candidate ) - async def send(self, message: dict): + async def send(self, message: dict) -> None: """Send message to remote. :param message: message to send :type message: dict """ - message = json.dumps(message) - await self.conn.send(message) + msg = json.dumps(message) + if self.conn is None: + log.error("Connection is closed, cannot send message") + return + await self.conn.send(msg) - async def send_sdp_offer(self, sdp): + async def send_sdp_offer(self, sdp: str) -> None: """Send SDP offer to remote. :param sdp: session description @@ -93,7 +86,7 @@ async def send_sdp_offer(self, sdp): } await self.send(msg) - async def send_ice_candidate(self, candidate: dict): + async def send_ice_candidate(self, candidate: dict) -> None: """Send ICE candidate to remote. :param canditate: ICE candidate @@ -103,7 +96,7 @@ async def send_ice_candidate(self, candidate: dict): msg = {"type": "ice", "id": self.client_id, "candidate": candidate} await self.send(msg) - async def send_chat(self, message): + async def send_chat(self, message: str) -> None: """Send chat message. :param message: content of the message @@ -119,7 +112,7 @@ async def send_chat(self, message): } ) - async def connect(self): + async def connect(self) -> None: """Connect to server.""" # Create WebSocket log.info("Connecting to WebSocket") @@ -129,6 +122,7 @@ async def connect(self): log.info("Handshaking") msg = { "type": "handshake", + "version": ["1"], # since Galene 0.6.0 "id": self.client_id, } await self.send(msg) @@ -147,20 +141,34 @@ async def connect(self): response = {"type": "none"} while response["type"] != "joined": # The server will send 'user' messages that we ignore - response = await self.conn.recv() - response = json.loads(response) + raw_response = await self.conn.recv() + response = json.loads(raw_response) if response["kind"] != "join": raise RuntimeError("failed to join room") - if self.ice_servers is None: - self.ice_servers = response.get("rtcConfiguration").get("iceServers", []) - async def close(self): + # Get ICE servers + rtc_configuration = response.get("rtcConfiguration", {}) + assert isinstance(rtc_configuration, dict) + self.ice_servers = [] + for server in rtc_configuration.get("iceServers", []): + username = server.get("username", "") + credential = server.get("credential", "") + for url in server.get("urls", []): + url = url.replace("turn:", "") # remove prefix + uri = f"turn://{username}:{credential}@{url}" + self.ice_servers.append(uri) + + async def close(self) -> None: """Close connection.""" log.info("Closing WebSocket connection") self.webrtc.close_pipeline() + if self.conn is None: + log.warn("Connection is already closed") + return await self.conn.close() + self.conn = None - async def loop(self, event_loop): + async def loop(self, event_loop) -> None: """Client loop :param event_loop: asyncio event loop @@ -208,11 +216,13 @@ async def loop(self, event_loop): elif message["type"] == "close": continue # ignore close events elif message["type"] == "chat": - # User might request statistics + # User might request statistics using `!webrtc` chat command if message.get("value") == "!webrtc": m = self.webrtc.get_stats() if m: await self.send_chat(m) + elif message["type"] == "chathistory": + continue # ignore chat history events else: # Oh no! We receive something not implemented log.warn(f"Not implemented {message}") diff --git a/galene_stream/webrtc.py b/galene_stream/webrtc.py index 3bb0b18..550d069 100644 --- a/galene_stream/webrtc.py +++ b/galene_stream/webrtc.py @@ -10,6 +10,7 @@ import os import pprint import sys +from typing import List import gi @@ -31,7 +32,7 @@ class WebRTCClient: def __init__( self, input_uri: str, bitrate: int, sdp_offer_callback, ice_candidate_callback - ): + ) -> None: """Init WebRTCClient. :param input_uri: URI for GStreamer uridecodebin @@ -74,17 +75,20 @@ def __init__( "x264", ] missing = filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed) - missing = list(missing) - if len(missing): - log.error(f"Missing gstreamer plugins: {missing}") + missing_list = list(missing) + if len(missing_list): + log.error(f"Missing gstreamer plugins: {missing_list}") sys.exit(1) - def on_offer_created(self, promise, _, __): + def on_offer_created(self, promise, _, __) -> None: """``on-offer-created`` event handler. :param promise: promise running this event :type promise: Gst.Promise """ + assert self.event_loop is not None + assert self.webrtc is not None + # Get offer from the promise calling the event promise.wait() reply = promise.get_reply() @@ -103,7 +107,7 @@ def on_offer_created(self, promise, _, __): ) future.result() # wait - def on_negotiation_needed(self, element): + def on_negotiation_needed(self, element) -> None: """``on-negotiation-needed`` event handler. When receiving ``on-negotiation-needed`` event, create new offer. @@ -117,7 +121,7 @@ def on_negotiation_needed(self, element): # Create new offer element.emit("create-offer", None, promise) - def on_ice_candidate(self, _, mline_index, candidate: str): + def on_ice_candidate(self, _, mline_index, candidate: str) -> None: """``on-ice-candidate`` event handler. Send ICE candidate message to remote. @@ -127,18 +131,22 @@ def on_ice_candidate(self, _, mline_index, candidate: str): :param candidate: an ICE candidate :type candidate: str """ - candidate = {"candidate": candidate, "sdpMLineIndex": mline_index} + assert self.event_loop is not None + + c = {"candidate": candidate, "sdpMLineIndex": mline_index} future = asyncio.run_coroutine_threadsafe( - self.ice_candidate_callback(candidate), self.event_loop + self.ice_candidate_callback(c), self.event_loop ) future.result() # wait - def set_remote_sdp(self, sdp: str): + def set_remote_sdp(self, sdp: str) -> None: """Set remote session description. :param sdp: Session description :type sdp: str """ + assert self.webrtc is not None + log.info("Setting remote session description") _, sdp_msg = GstSdp.SDPMessage.new() GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdp_msg) @@ -149,7 +157,7 @@ def set_remote_sdp(self, sdp: str): self.webrtc.emit("set-remote-description", answer, promise) promise.interrupt() - def add_ice_candidate(self, mline_index: int, candidate: str): + def add_ice_candidate(self, mline_index: int, candidate: str) -> None: """Add new ICE candidate. :param mline_index: the index of the media description in the SDP @@ -157,6 +165,7 @@ def add_ice_candidate(self, mline_index: int, candidate: str): :param candidate: an ice candidate :type candidate: str """ + assert self.webrtc is not None self.webrtc.emit("add-ice-candidate", mline_index, candidate) def get_stats(self) -> str: @@ -165,6 +174,7 @@ def get_stats(self) -> str: :return: statistics as text report :rtype: str """ + assert self.pipe is not None fields = [ "ssrc", "is-sender", @@ -192,13 +202,15 @@ def get_stats(self) -> str: message.append({f: source_stats.get_value(f) for f in fields}) return pprint.pformat(message, sort_dicts=False) - def start_pipeline(self, event_loop, ice_servers): + def start_pipeline( + self, event_loop: asyncio.AbstractEventLoop, ice_servers: List[str] + ) -> None: """Start gstreamer pipeline and connect WebRTC events. :param event_loop: asyncio event loop :type event_loop: EventLoop :param ice_servers: list of ICE TURN servers - :type ice_servers: list of dicts + :type ice_servers: list of str """ log.info("Starting pipeline") self.event_loop = event_loop @@ -217,13 +229,8 @@ def start_pipeline(self, event_loop, ice_servers): # Add TURN servers try: - for server in ice_servers: - username = server.get("username", "") - credential = server.get("credential", "") - for url in server.get("urls", []): - url = url.replace("turn:", "") # remove prefix - uri = f"turn://{username}:{credential}@{url}" - self.webrtc.emit("add-turn-server", uri) + for uri in ice_servers: + self.webrtc.emit("add-turn-server", uri) except TypeError: log.warn( "add-turn-server signal is missing, maybe your gstreamer " @@ -233,7 +240,7 @@ def start_pipeline(self, event_loop, ice_servers): # Start self.pipe.set_state(Gst.State.PLAYING) - def close_pipeline(self): + def close_pipeline(self) -> None: """Stop gstreamer pipeline.""" log.info("Closing pipeline") diff --git a/tests/test_webrtc.py b/tests/test_webrtc.py index 69713ec..285f6dd 100644 --- a/tests/test_webrtc.py +++ b/tests/test_webrtc.py @@ -13,6 +13,6 @@ def test_init_webrtc(): """Test WebRTC initialization.""" event_loop = asyncio.get_event_loop() - client = WebRTCClient() - client.start_pipeline(event_loop, [], "rtmp://localhost:1935/live/test") + client = WebRTCClient("rtmp://localhost:1935/live/test", 1048576, None, None) + client.start_pipeline(event_loop, []) client.close_pipeline()