diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..6583c28 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 linewalks + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/dasima/__init__.py b/dasima/__init__.py index d624bd9..77730b6 100644 --- a/dasima/__init__.py +++ b/dasima/__init__.py @@ -1,7 +1,6 @@ import threading from kombu import Connection -from typing import List, Tuple from dasima.exchange import ExchangeWrapper from dasima.worker import Worker @@ -19,13 +18,16 @@ def init_app(self, app): self.exchange_list = self.app.config.get( "DASIMA_EXCHANGE_SETTING", [("dasima_test", "one")] - ) + ) + self.connection = Connection(self.app.config.get("DASIMA_CONNECTION_HOST", "localhost")) + self.connection.ensure_connection(max_retries=3) self.worker = Worker( - connection=Connection(self.app.config.get("DASIMA_CONNECTION_HOST", "localhost")), + connection=self.connection, accept_type=self.app.config.get("DASIMA_ACCEPT_TYPE", "json"), app_ctx=self.app_ctx ) self.create_exchange() + self.is_running = False def create_exchange(self): for exchange_name, exchange_type in self.exchange_list: @@ -48,7 +50,18 @@ def setup_queue(self): self.worker.add_consumer_config_list(exchange) def run_subscribers(self): - self.setup_queue() - t = threading.Thread(target=self.worker.run) - t.daemon = True - t.start() + if self.is_running: + raise RuntimeError("run_subscribers is aleady running!") + else: + self.is_running = True + self.setup_queue() + t = threading.Thread(target=self.worker.run) + t.daemon = True + t.start() + # worker가 준비가 될때 까지 잠시 기다려줌 + while True: + if self.worker.is_ready: + break + + def stop_subscribers(self): + self.worker.stop() diff --git a/dasima/exchange.py b/dasima/exchange.py index a57a61c..a441b1d 100644 --- a/dasima/exchange.py +++ b/dasima/exchange.py @@ -35,9 +35,11 @@ def subscribe(self, routing_key=None): if callable(routing_key): self.add_binding_dict(routing_key, None) return routing_key + def decorator(func): self.add_binding_dict(func, routing_key) return func + return decorator def add_binding_dict(self, func, routing_key): diff --git a/dasima/tests/conftest.py b/dasima/tests/conftest.py index 028302e..962417e 100644 --- a/dasima/tests/conftest.py +++ b/dasima/tests/conftest.py @@ -1,4 +1,17 @@ import pytest +@pytest.fixture(scope="session") +def exchange_setting_list(): + return [ + ("exchange_type_one", "one"), + ("exchange_type_all", "all") + ] + +@pytest.fixture(scope="session") +def flask_app(exchange_setting_list): + from flask import Flask + app = Flask(__name__) + app.config["DASIMA_EXCHANGE_SETTING"] = exchange_setting_list + return app diff --git a/dasima/tests/test_dasima_mq.py b/dasima/tests/test_dasima_mq.py deleted file mode 100644 index 930966c..0000000 --- a/dasima/tests/test_dasima_mq.py +++ /dev/null @@ -1,60 +0,0 @@ -import gevent -import time -import pytest -import random - - -class TestMQ: - @pytest.fixture(scope="class") - def testmq(self): - from flask import Flask - from dasima import Dasima - - app = Flask(__name__) - app.config["DASIMA_EXCHANGE_SETTING"] = [("test_exchange", "topic")] - testmq = Dasima() - testmq.init_app(app) - - return testmq - - @pytest.fixture(scope="class") - def test_cnt(self): - value = { - "cnt": 0, - "load_cnt": 0 - } - yield value - - def test_subscribe(self, testmq, test_cnt): - @testmq.test_exchange.subscribe("test") - def test_func(x, y): - test_cnt["cnt"] += 1 - return x + y - - @testmq.test_exchange.subscribe("load") - def test_load_func(x, y): - test_cnt["load_cnt"] += 1 - return x + y - - testmq.run_subscribers() - - def test_message_send_and_recevie(self, testmq, test_cnt): - number = random.randint(1, 1000) - for i in range(number): - testmq.test_exchange.send_message({"x": 3, "y": 3}, "test") - - # Wait for received message to be processed - time.sleep(number * 0.01) - assert number == test_cnt["cnt"] - - def test_multi_heavy_load(self, testmq, test_cnt): - def send(): - for _ in range(100): - testmq.test_exchange.send_message({"x": 3, "y": 3}, "load") - - gevent.joinall([gevent.spawn(send) for _ in range(100)]) - - # Wait for received message to be processed - time.sleep(10) - - assert test_cnt["load_cnt"] == 100 * 100 diff --git a/dasima/tests/test_mq.py b/dasima/tests/test_mq.py new file mode 100644 index 0000000..db75a03 --- /dev/null +++ b/dasima/tests/test_mq.py @@ -0,0 +1,125 @@ +import pytest +import time + +from collections import Counter +from dasima import Dasima + + +class TestSetup: + def test_init_app(self, flask_app): + dasmia1 = Dasima(flask_app) + dasmia2 = Dasima() + dasmia2.init_app(flask_app) + assert dasmia1.app == dasmia2.app + + def test_create_exchange(self, exchange_setting_list, flask_app): + dasima = Dasima(flask_app) + for exchange_name, exchange_type in exchange_setting_list: + exchange = getattr(dasima, exchange_name) + assert exchange.exchange_type == exchange_type + assert exchange.exchange.name == exchange_name + + +class TestSubscribe: + @pytest.fixture(scope="function") + def dasima(self, flask_app): + return Dasima(flask_app) + + def test_subscribe_exist_routing_key(self, dasima): + @dasima.exchange_type_one.subscribe("test") + def test_func(): + return + + binding_dict = dasima.exchange_type_one.get_binding_dict() + routing_key, _ = binding_dict["exchange_type_one"][0] + assert routing_key == "test" + + def test_subscribe_not_exist_routing_key(self, dasima): + @dasima.exchange_type_one.subscribe + def test_func(): + return + + binding_dict = dasima.exchange_type_one.get_binding_dict() + routing_key, _ = binding_dict["exchange_type_one"][0] + assert routing_key == "test_func" + + def test_run_subscribers_error(self, dasima): + dasima.run_subscribers() + with pytest.raises(RuntimeError): + dasima.run_subscribers() + + +class TestMessageSendReceive: + @pytest.fixture(scope="function") + def sub1(self, flask_app): + return Dasima(flask_app) + + @pytest.fixture(scope="function") + def sub2(self, flask_app): + return Dasima(flask_app) + + @pytest.fixture(scope="function") + def pub(self, flask_app): + return Dasima(flask_app) + + @pytest.mark.parametrize("number", [2, 10, 100]) + def test_exchange_type_one_recive(self, sub1, sub2, pub, number): + count_list = [] + + @sub1.exchange_type_one.subscribe("one") + def test_subscribe_function_1(): + count_list.append(1) + return + + @sub2.exchange_type_one.subscribe("one") + def test_subscribe_function_2(): + count_list.append(2) + return + + sub1.run_subscribers() + sub2.run_subscribers() + + for _ in range(number): + pub.exchange_type_one.send_message({}, "one") + + # Wait for received message to be processed + time.sleep(number * 0.01) + + counter = Counter(count_list) + + assert counter[1] == number // 2 + assert counter[2] == number // 2 + + sub1.stop_subscribers() + sub2.stop_subscribers() + + @pytest.mark.parametrize("number", [2, 10, 100]) + def test_exchange_type_all_recive(self, sub1, sub2, pub, number): + count_list = [] + + @sub1.exchange_type_all.subscribe("all") + def test_subscribe_function_1(): + count_list.append(1) + return + + @sub2.exchange_type_all.subscribe("all") + def test_subscribe_function_2(): + count_list.append(2) + return + + sub1.run_subscribers() + sub2.run_subscribers() + + for _ in range(number): + pub.exchange_type_all.send_message({}, "all") + + # Wait for received message to be processed + time.sleep(number * 0.01) + + counter = Counter(count_list) + + assert counter[1] == number + assert counter[2] == number + + sub1.stop_subscribers() + sub2.stop_subscribers() diff --git a/dasima/worker.py b/dasima/worker.py index b9b207c..6a5ceaa 100644 --- a/dasima/worker.py +++ b/dasima/worker.py @@ -1,3 +1,5 @@ +import time + from flask.ctx import AppContext from kombu import Connection, Consumer, Queue, binding from kombu.mixins import ConsumerProducerMixin @@ -23,13 +25,17 @@ def __init__( self.channel_list = [] self.app_ctx = app_ctx self.__consumer_config_list = [] + self.is_ready = False def close_channels(self): for channel in self.channel_list: # TODO maybe_close_channel if channel: channel.close() - + + def on_consume_ready(self, connection, channel, consumers, **kwargs): + self.is_ready = True + def add_consumer_config(self, queue, on_task): self.__consumer_config_list.append((queue, on_task)) @@ -42,7 +48,7 @@ def func(data, routing_key): def add_consumer_config_list(self, exchange): binding_dict = exchange.get_binding_dict() - auto_delete =True if exchange.exchange_type == "all" else False + auto_delete = True if exchange.exchange_type == "all" else False for queue_name, bind_list in binding_dict.items(): func = self.make_combine_function(bind_list) bindings = [ @@ -99,6 +105,11 @@ def get_consumers(self, _, default_channel): def on_consume_end(self, connection, default_channel): self.close_channels() + def stop(self): + self.should_stop = True + self.connection.release() + time.sleep(0.5) + def publish(self, data, exchange, routing_key): self.producer.publish( body=data, diff --git a/example/main/__init__.py b/example/main/__init__.py index 2548ca4..89ea014 100644 --- a/example/main/__init__.py +++ b/example/main/__init__.py @@ -18,7 +18,7 @@ def create_app(config_filename=default_cdmdir): with app.app_context(): # subscribe funtions import - from main.test_functions import ( + from main.functions import ( setting_info, add, mul, diff --git a/example/main/test_functions.py b/example/main/functions.py similarity index 92% rename from example/main/test_functions.py rename to example/main/functions.py index f94ca2f..4e40d40 100644 --- a/example/main/test_functions.py +++ b/example/main/functions.py @@ -21,7 +21,7 @@ def mul(x, y): print("MUL", x, y) return x * y -# () 없을 때도 위와 같은 함수 이름 div로 바인딩 +# () 없을 때도 위와 같이 함수 이름 div로 바인딩 @dasimamq.login.subscribe def div(x, y): print("DIV", x, y) diff --git a/setup.py b/setup.py index 0f61c88..e6a4594 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name="dasima", - version="0.1.4", + version="0.2.0", description="Message Queue Tools for flask project", author="Linewalks", author_email="jindex2411@linewalks.com", @@ -18,6 +18,6 @@ ], setup_requires=["pytest-runner"], test_suite="tests", - tests_require=["pytest", "gevent"], + tests_require=["pytest"], packages=find_packages(include=["dasima"]) )