diff --git a/.gitignore b/.gitignore index b6e4761..9b54e06 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,8 @@ + +# Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode,python +# Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode,python + +### Python ### # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] @@ -20,7 +25,6 @@ parts/ sdist/ var/ wheels/ -pip-wheel-metadata/ share/python-wheels/ *.egg-info/ .installed.cfg @@ -50,6 +54,7 @@ coverage.xml *.py,cover .hypothesis/ .pytest_cache/ +cover/ # Translations *.mo @@ -72,6 +77,7 @@ instance/ docs/_build/ # PyBuilder +.pybuilder/ target/ # Jupyter Notebook @@ -82,7 +88,9 @@ profile_default/ ipython_config.py # pyenv -.python-version +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. @@ -127,3 +135,28 @@ dmypy.json # Pyre type checker .pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +### VisualStudioCode ### +.vscode +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +*.code-workspace + +# Local History for Visual Studio Code +.history/ + +### VisualStudioCode Patch ### +# Ignore all local history of files +.history +.ionide + +# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,python \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..6583c28 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 linewalks + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md index 06302ca..308ec73 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,119 @@ -# CLUE-MQ -CLUE pub-sub을 위한 repo +# DASIMA + +#### Description + +Dasima is a library that helps you send and receive messages in Flask project. It is a simple wrapper around Kombu and support with the publisher/subscriber pattern of your Flask project. + + + + +## Getting Started + +#### Built With + +* Python >= 3.6 + + + +#### Prerequisites + +- Message Queue + + > ex) Redis, RabbitMQ, ActiveMQ, ZeroMQ, Kafka... + + + +#### Installation + +##### Installing + +```shell +$ pip install dasima +``` + + + +##### setting parameters + +```python +DASIMA_CONNECTION_HOST = "localhost" # your Message Queue host ex) redis://0.0.0.0, amqp://id:password@0.0.0.0:port +DASIMA_ACCEPT_TYPE = "json" # sending data type ex) json, pickle ... +DASIMA_EXCHANGE_SETTING = [("test_exchange", "one"),] +# DASIMA_EXCHANGE_SETTING is list of tuples [(exchange name, type)] +# There are two types: 'all' and 'one'. +``` + + + +## Usage + +#### Subscriber Simple example + +```python +from dasima import Dasima +from flask import Flask + + +app = Flask(__name__) + +dasimamq = Dasima() +dasimamq.init_app(app) # Alternatively, auto init_app can be used after putting the flask app into Dasima like Dasima(app). + + +# Be able to subscribe target functions using the function 'subscribe' +# The queue named by subscribed function name will be made, and binding it with routing key +# dasimamq.{exchange}.subscribe(routing_key) - "Route key to bind" +@dasimamq.test_exchange.subscribe(routing_key="test_routing_key") +# @dasimamq.test_exchange.subscribe - if routing key not defined, routing key is defined as function name +def test_function(x, y): + print(x + y) + return x + y + + +if __name__ == "__main__": + # Call the function 'run_subscribers' to create queues in which consumers process the messages. + dasimamq.run_subscribers() + app.run(port=5050) +``` + + + +#### Publisher Simple example + +```python +from flask import Flask +from dasima import Dasima + + +app = Flask(__name__) + + +dasimamq = Dasima() +dasimamq.init_app(app) # Alternatively, auto init_app is possible by putting the flask app directly into Dasima(app). + + +@app.route("/") +def send_message(): + # dasimamq.{exchange}.subscribe("Route key to bind") + dasimamq.test_exchange.send_message({"x": 1, "y": 2}, "test_routing_key") + return {"data": "send message successful"} + + +if __name__ == "__main__": + app.run(port=5000) +``` + + + +## Links + +- [Kombu github](https://github.com/celery/kombu) +- [Redis](https://redis.io/) +- [RabbitMQ](https://www.rabbitmq.com/) + + + +## Contact + +**JISU JEONG** - js.jeong@linewalks.com + diff --git a/dasima/__init__.py b/dasima/__init__.py new file mode 100644 index 0000000..77730b6 --- /dev/null +++ b/dasima/__init__.py @@ -0,0 +1,67 @@ +import threading + +from kombu import Connection + +from dasima.exchange import ExchangeWrapper +from dasima.worker import Worker + + +class Dasima: + def __init__(self, app=None): + self.app = app + if app: + self.init_app(app) + + def init_app(self, app): + self.app = app + self.app_ctx = self.app.app_context() + self.exchange_list = self.app.config.get( + "DASIMA_EXCHANGE_SETTING", + [("dasima_test", "one")] + ) + self.connection = Connection(self.app.config.get("DASIMA_CONNECTION_HOST", "localhost")) + self.connection.ensure_connection(max_retries=3) + self.worker = Worker( + connection=self.connection, + accept_type=self.app.config.get("DASIMA_ACCEPT_TYPE", "json"), + app_ctx=self.app_ctx + ) + self.create_exchange() + self.is_running = False + + def create_exchange(self): + for exchange_name, exchange_type in self.exchange_list: + setattr( + self, + exchange_name, + ExchangeWrapper( + exchange_name, + exchange_type, + self.worker + ) + ) + + def setup_queue(self): + for exchange_name, exchange_type in self.exchange_list: + exchange = getattr( + self, + exchange_name + ) + self.worker.add_consumer_config_list(exchange) + + def run_subscribers(self): + if self.is_running: + raise RuntimeError("run_subscribers is aleady running!") + else: + self.is_running = True + self.setup_queue() + t = threading.Thread(target=self.worker.run) + t.daemon = True + t.start() + # worker가 준비가 될때 까지 잠시 기다려줌 + while True: + if self.worker.is_ready: + break + + def stop_subscribers(self): + self.worker.stop() diff --git a/dasima/exchange.py b/dasima/exchange.py new file mode 100644 index 0000000..a441b1d --- /dev/null +++ b/dasima/exchange.py @@ -0,0 +1,53 @@ +import uuid + +from kombu import Exchange, Queue, binding +from flask.ctx import AppContext +from dasima.worker import Worker + + +class ExchangeWrapper: + def __init__( + self, + exchange_name: str, + exchange_type: str, + worker: Worker + ): + self.exchange_type = exchange_type + self.exchange = Exchange( + name=exchange_name, + type="topic", + durable=True + ) + self.__binding_dict = {} + self.worker = worker + + def get_binding_dict(self): + return self.__binding_dict + + def send_message(self, data, routing_key): + self.worker.publish( + data, + self.exchange, + routing_key + ) + + def subscribe(self, routing_key=None): + if callable(routing_key): + self.add_binding_dict(routing_key, None) + return routing_key + + def decorator(func): + self.add_binding_dict(func, routing_key) + return func + + return decorator + + def add_binding_dict(self, func, routing_key): + prefix = str(uuid.uuid4()) if self.exchange_type == "all" else "" + key = prefix + self.exchange.name + routing_key = func.__name__ if routing_key is None else routing_key + + if self.__binding_dict.get(key): + self.__binding_dict[key].append((routing_key, func)) + else: + self.__binding_dict[key] = [(routing_key, func)] diff --git a/dasima/tests/conftest.py b/dasima/tests/conftest.py new file mode 100644 index 0000000..962417e --- /dev/null +++ b/dasima/tests/conftest.py @@ -0,0 +1,17 @@ +import pytest + + +@pytest.fixture(scope="session") +def exchange_setting_list(): + return [ + ("exchange_type_one", "one"), + ("exchange_type_all", "all") + ] + + +@pytest.fixture(scope="session") +def flask_app(exchange_setting_list): + from flask import Flask + app = Flask(__name__) + app.config["DASIMA_EXCHANGE_SETTING"] = exchange_setting_list + return app diff --git a/dasima/tests/test_mq.py b/dasima/tests/test_mq.py new file mode 100644 index 0000000..db75a03 --- /dev/null +++ b/dasima/tests/test_mq.py @@ -0,0 +1,125 @@ +import pytest +import time + +from collections import Counter +from dasima import Dasima + + +class TestSetup: + def test_init_app(self, flask_app): + dasmia1 = Dasima(flask_app) + dasmia2 = Dasima() + dasmia2.init_app(flask_app) + assert dasmia1.app == dasmia2.app + + def test_create_exchange(self, exchange_setting_list, flask_app): + dasima = Dasima(flask_app) + for exchange_name, exchange_type in exchange_setting_list: + exchange = getattr(dasima, exchange_name) + assert exchange.exchange_type == exchange_type + assert exchange.exchange.name == exchange_name + + +class TestSubscribe: + @pytest.fixture(scope="function") + def dasima(self, flask_app): + return Dasima(flask_app) + + def test_subscribe_exist_routing_key(self, dasima): + @dasima.exchange_type_one.subscribe("test") + def test_func(): + return + + binding_dict = dasima.exchange_type_one.get_binding_dict() + routing_key, _ = binding_dict["exchange_type_one"][0] + assert routing_key == "test" + + def test_subscribe_not_exist_routing_key(self, dasima): + @dasima.exchange_type_one.subscribe + def test_func(): + return + + binding_dict = dasima.exchange_type_one.get_binding_dict() + routing_key, _ = binding_dict["exchange_type_one"][0] + assert routing_key == "test_func" + + def test_run_subscribers_error(self, dasima): + dasima.run_subscribers() + with pytest.raises(RuntimeError): + dasima.run_subscribers() + + +class TestMessageSendReceive: + @pytest.fixture(scope="function") + def sub1(self, flask_app): + return Dasima(flask_app) + + @pytest.fixture(scope="function") + def sub2(self, flask_app): + return Dasima(flask_app) + + @pytest.fixture(scope="function") + def pub(self, flask_app): + return Dasima(flask_app) + + @pytest.mark.parametrize("number", [2, 10, 100]) + def test_exchange_type_one_recive(self, sub1, sub2, pub, number): + count_list = [] + + @sub1.exchange_type_one.subscribe("one") + def test_subscribe_function_1(): + count_list.append(1) + return + + @sub2.exchange_type_one.subscribe("one") + def test_subscribe_function_2(): + count_list.append(2) + return + + sub1.run_subscribers() + sub2.run_subscribers() + + for _ in range(number): + pub.exchange_type_one.send_message({}, "one") + + # Wait for received message to be processed + time.sleep(number * 0.01) + + counter = Counter(count_list) + + assert counter[1] == number // 2 + assert counter[2] == number // 2 + + sub1.stop_subscribers() + sub2.stop_subscribers() + + @pytest.mark.parametrize("number", [2, 10, 100]) + def test_exchange_type_all_recive(self, sub1, sub2, pub, number): + count_list = [] + + @sub1.exchange_type_all.subscribe("all") + def test_subscribe_function_1(): + count_list.append(1) + return + + @sub2.exchange_type_all.subscribe("all") + def test_subscribe_function_2(): + count_list.append(2) + return + + sub1.run_subscribers() + sub2.run_subscribers() + + for _ in range(number): + pub.exchange_type_all.send_message({}, "all") + + # Wait for received message to be processed + time.sleep(number * 0.01) + + counter = Counter(count_list) + + assert counter[1] == number + assert counter[2] == number + + sub1.stop_subscribers() + sub2.stop_subscribers() diff --git a/dasima/worker.py b/dasima/worker.py new file mode 100644 index 0000000..6a5ceaa --- /dev/null +++ b/dasima/worker.py @@ -0,0 +1,119 @@ +import time + +from flask.ctx import AppContext +from kombu import Connection, Consumer, Queue, binding +from kombu.mixins import ConsumerProducerMixin + +# The basic class ConsumerMixin would need a :attr:`connection` attribute +# which must be a :class:`~kombu.Connection` instance, +# and define a :meth:`get_consumers` method that returns a list of :class:`kombu.Consumer` instances to use. +class Worker(ConsumerProducerMixin): + """ + Woker class는 함수를 kombu 라이브러리의 ConsumerProducerMixin을 상속 받아 + kombu의 publisher, consumer의 기능을 사용하기 쉽게 지원을 해줍니다. + publisher: producer 호출시 Worker connetion을 클론 받는 Producer를 리턴 해줌 + consumer: run 함수 실행 시 get_consumers 함수가 호출되며 consumer_config 값에 설정된 라우팅 큐에 해당 되는 메세지 큐을 구독하는 Comsumer 생성 + """ + def __init__( + self, + connection: Connection, + accept_type: str, + app_ctx: AppContext + ): + self.connection = connection + self.accept_type = accept_type + self.channel_list = [] + self.app_ctx = app_ctx + self.__consumer_config_list = [] + self.is_ready = False + + def close_channels(self): + for channel in self.channel_list: + # TODO maybe_close_channel + if channel: + channel.close() + + def on_consume_ready(self, connection, channel, consumers, **kwargs): + self.is_ready = True + + def add_consumer_config(self, queue, on_task): + self.__consumer_config_list.append((queue, on_task)) + + def make_combine_function(self, func_list): + def func(data, routing_key): + func_dict = dict(func_list) + if func_dict.get(routing_key): + return func_dict[routing_key](**data) + return func + + def add_consumer_config_list(self, exchange): + binding_dict = exchange.get_binding_dict() + auto_delete = True if exchange.exchange_type == "all" else False + for queue_name, bind_list in binding_dict.items(): + func = self.make_combine_function(bind_list) + bindings = [ + binding(exchange.exchange, routing_key=routing_key) + for routing_key, _ in bind_list + ] + + def on_task(body, message): + routing_key = message.delivery_info["routing_key"] + try: + if func is not None: + self.app_ctx.push() + func(body, routing_key) + finally: + message.ack() + self.app_ctx.pop() + + queue = Queue( + name=queue_name, + exchange=exchange.exchange, + bindings=bindings, + durable=True, + auto_delete=auto_delete + ) + + self.add_consumer_config(queue, on_task) + + # kombu의 각각에 Channel에 독립적인 threading을 적용 하기 전 + # 사전 작업 으로 각자의 Consumer마다 channel을 할당 + def get_consumers(self, _, default_channel): + # TODO get_consumers 호출 마다 새로운 Connection을 연결해 주기에 + # 기존에 연결 되어 있는 Connection을 닫아 줘야 되지만 + # 현재 Connection.close 시 socket.timeout: timed out 에러 발생으로 원인 조사중 + self.close_channels() + + channel_list = [default_channel] + channel_list.extend([ + default_channel.connection.channel() + for _ in range(len(self.__consumer_config_list) - 1) + ]) + self.channel_list = channel_list + + return [ + Consumer( + channel=channel, + queues=[queue], + accept=[self.accept_type], + callbacks=[on_task] + ) + for channel, (queue, on_task) in zip(self.channel_list, self.__consumer_config_list) + ] + + # connection으로 channel들을 불러온다. + def on_consume_end(self, connection, default_channel): + self.close_channels() + + def stop(self): + self.should_stop = True + self.connection.release() + time.sleep(0.5) + + def publish(self, data, exchange, routing_key): + self.producer.publish( + body=data, + exchange=exchange, + routing_key=routing_key, + serializer=self.accept_type + ) diff --git a/example.md b/example.md new file mode 100644 index 0000000..c8ff388 --- /dev/null +++ b/example.md @@ -0,0 +1,34 @@ +Producer: 메세지를 보내는 Application + +Publish: Producer가 메세지를 보냄 + +Queue: 메세지를 저장하는 버퍼 + +Consumer : 메세지를 받기 위해 대기 하는 프로그램 + + + +Exchange + +Producer가 전달한 메시지를 Queue에 전달하는 역할 + + + + + +Connectoin: 메세지 브로커에 대한 실제 TCP 연결 + + + +Channel + +- 내부의 가상 연결(AMQP 연결) + +- TCP연결을 과부하 시키지 않오 애플리케이션 내에서 원하는 만큼 연결 사용 + +- 스레드가 여러개인 경우 스레드 Channel 마다 다른 것을 사용 하는게 좋음 + + + + + diff --git a/example/main/__init__.py b/example/main/__init__.py new file mode 100644 index 0000000..89ea014 --- /dev/null +++ b/example/main/__init__.py @@ -0,0 +1,29 @@ +from flask import Flask +import os +import sys +sys.path.append(os.getcwd()) +from dasima import Dasima + +default_cdmdir = f"{os.getcwd()}/example/main/default.cfg" + +dasimamq = Dasima() + + +def create_app(config_filename=default_cdmdir): + app = Flask(__name__) + app.config.from_pyfile(config_filename) + + dasimamq.init_app(app=app) + + with app.app_context(): + + # subscribe funtions import + from main.functions import ( + setting_info, + add, + mul, + div, + sub + ) + + return app diff --git a/example/main/default.cfg b/example/main/default.cfg new file mode 100644 index 0000000..4cad2ac --- /dev/null +++ b/example/main/default.cfg @@ -0,0 +1,6 @@ +DASIMA_CONNECTION_HOST = "localhost" +DASIMA_ACCEPT_TYPE = "json" +DASIMA_EXCHANGE_SETTING = [ + ("clue", "one"), + ("login", "all") +] diff --git a/example/main/functions.py b/example/main/functions.py new file mode 100644 index 0000000..4e40d40 --- /dev/null +++ b/example/main/functions.py @@ -0,0 +1,34 @@ +from flask import current_app as app +from main import dasimamq + + +@dasimamq.clue.subscribe("info") +def setting_info(): + print("Dasima setting parameters") + print("DASIMA_CONNECTION_HOST: ", app.config["DASIMA_CONNECTION_HOST"]) + print("DASIMA_ACCEPT_TYPE: ", app.config["DASIMA_ACCEPT_TYPE"]) + print("DASIMA_EXCHANGE_SETTING: ", app.config["DASIMA_EXCHANGE_SETTING"]) + +# 설정된 key(add)로 바인딩 +@dasimamq.clue.subscribe("add") +def add(x, y): + print("ADD", x, y) + return x + y + +# 설정된 key 없을 시 함수 이름 mul로 바인딩 +@dasimamq.clue.subscribe() +def mul(x, y): + print("MUL", x, y) + return x * y + +# () 없을 때도 위와 같이 함수 이름 div로 바인딩 +@dasimamq.login.subscribe +def div(x, y): + print("DIV", x, y) + return x // y + +# 설정된 key(test)로 바인딩 +@dasimamq.login.subscribe(routing_key="test") +def sub(x, y): + print("SUB", x, y) + return x - y diff --git a/example/run_message_recevie.py b/example/run_message_recevie.py new file mode 100644 index 0000000..a23b70d --- /dev/null +++ b/example/run_message_recevie.py @@ -0,0 +1,7 @@ +import sys +from main import create_app, dasimamq + +if __name__ == "__main__": + app = create_app() + dasimamq.run_subscribers() + app.run(debug=False, host="0.0.0.0", port=sys.argv[1]) diff --git a/example/run_message_send.py b/example/run_message_send.py new file mode 100644 index 0000000..9626c97 --- /dev/null +++ b/example/run_message_send.py @@ -0,0 +1,44 @@ +import timeit +from flask import Flask + +import os +import sys + +sys.path.append(os.getcwd()) +from dasima import Dasima + +default_cdmdir = os.path.join(os.getcwd(), "example", "main", "default.cfg") + +dasimamq = Dasima() + + +def create_app(config_filename=default_cdmdir): + app = Flask(__name__) + app.config.from_pyfile(config_filename) + + dasimamq.init_app(app=app) + + def test_time(): + dasimamq.clue.send_message({}, "info") + dasimamq.clue.send_message({"x": 3, "y": 3}, "add") + dasimamq.clue.send_message({"x": 3, "y": 3}, "mul") + dasimamq.login.send_message({"x": 3, "y": 3}, "div") + dasimamq.login.send_message({"x": 3, "y": 3}, "test") + + @app.route("/") + def send_message(): + result = timeit.timeit(test_time, number=10) + print(result) + return {"data": "Send message successful"} + + with app.app_context(): + + # subscribe funtions import + pass + + return app + + +if __name__ == "__main__": + app = create_app() + app.run(debug=False, host="0.0.0.0", port=sys.argv[1]) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..10041cf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +Flask==1.1.2 +kombu==5.1.0 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..79a366d --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[aliases] +test=pytest + +[tool:pytest] +collect_ignore = ["setup.py"] diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..e6a4594 --- /dev/null +++ b/setup.py @@ -0,0 +1,23 @@ +from setuptools import setup, find_packages + + +setup( + name="dasima", + version="0.2.0", + description="Message Queue Tools for flask project", + author="Linewalks", + author_email="jindex2411@linewalks.com", + url="https://github.com/linewalks/CLUE-MQ", + license="Linewalks", + python_requires=">=3.6", + long_description=open("README.md").read(), + long_description_content_type="text/markdown", + install_requires=[ + "Flask", + "kombu" + ], + setup_requires=["pytest-runner"], + test_suite="tests", + tests_require=["pytest"], + packages=find_packages(include=["dasima"]) +)