Skip to content

Commit

Permalink
Merge pull request #35 from linewalks/issue.refactor_test_code
Browse files Browse the repository at this point in the history
Issue.refactor test code
  • Loading branch information
jindex2411 authored Jan 12, 2022
2 parents f3b09ad + 06b46f5 commit 2222156
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 73 deletions.
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2021 linewalks

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
27 changes: 20 additions & 7 deletions dasima/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import threading

from kombu import Connection
from typing import List, Tuple

from dasima.exchange import ExchangeWrapper
from dasima.worker import Worker
Expand All @@ -19,13 +18,16 @@ def init_app(self, app):
self.exchange_list = self.app.config.get(
"DASIMA_EXCHANGE_SETTING",
[("dasima_test", "one")]
)
)
self.connection = Connection(self.app.config.get("DASIMA_CONNECTION_HOST", "localhost"))
self.connection.ensure_connection(max_retries=3)
self.worker = Worker(
connection=Connection(self.app.config.get("DASIMA_CONNECTION_HOST", "localhost")),
connection=self.connection,
accept_type=self.app.config.get("DASIMA_ACCEPT_TYPE", "json"),
app_ctx=self.app_ctx
)
self.create_exchange()
self.is_running = False

def create_exchange(self):
for exchange_name, exchange_type in self.exchange_list:
Expand All @@ -48,7 +50,18 @@ def setup_queue(self):
self.worker.add_consumer_config_list(exchange)

def run_subscribers(self):
self.setup_queue()
t = threading.Thread(target=self.worker.run)
t.daemon = True
t.start()
if self.is_running:
raise RuntimeError("run_subscribers is aleady running!")
else:
self.is_running = True
self.setup_queue()
t = threading.Thread(target=self.worker.run)
t.daemon = True
t.start()
# worker가 준비가 될때 까지 잠시 기다려줌
while True:
if self.worker.is_ready:
break

def stop_subscribers(self):
self.worker.stop()
2 changes: 2 additions & 0 deletions dasima/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ def subscribe(self, routing_key=None):
if callable(routing_key):
self.add_binding_dict(routing_key, None)
return routing_key

def decorator(func):
self.add_binding_dict(func, routing_key)
return func

return decorator

def add_binding_dict(self, func, routing_key):
Expand Down
13 changes: 13 additions & 0 deletions dasima/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
import pytest


@pytest.fixture(scope="session")
def exchange_setting_list():
return [
("exchange_type_one", "one"),
("exchange_type_all", "all")
]


@pytest.fixture(scope="session")
def flask_app(exchange_setting_list):
from flask import Flask
app = Flask(__name__)
app.config["DASIMA_EXCHANGE_SETTING"] = exchange_setting_list
return app
60 changes: 0 additions & 60 deletions dasima/tests/test_dasima_mq.py

This file was deleted.

125 changes: 125 additions & 0 deletions dasima/tests/test_mq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import pytest
import time

from collections import Counter
from dasima import Dasima


class TestSetup:
def test_init_app(self, flask_app):
dasmia1 = Dasima(flask_app)
dasmia2 = Dasima()
dasmia2.init_app(flask_app)
assert dasmia1.app == dasmia2.app

def test_create_exchange(self, exchange_setting_list, flask_app):
dasima = Dasima(flask_app)
for exchange_name, exchange_type in exchange_setting_list:
exchange = getattr(dasima, exchange_name)
assert exchange.exchange_type == exchange_type
assert exchange.exchange.name == exchange_name


class TestSubscribe:
@pytest.fixture(scope="function")
def dasima(self, flask_app):
return Dasima(flask_app)

def test_subscribe_exist_routing_key(self, dasima):
@dasima.exchange_type_one.subscribe("test")
def test_func():
return

binding_dict = dasima.exchange_type_one.get_binding_dict()
routing_key, _ = binding_dict["exchange_type_one"][0]
assert routing_key == "test"

def test_subscribe_not_exist_routing_key(self, dasima):
@dasima.exchange_type_one.subscribe
def test_func():
return

binding_dict = dasima.exchange_type_one.get_binding_dict()
routing_key, _ = binding_dict["exchange_type_one"][0]
assert routing_key == "test_func"

def test_run_subscribers_error(self, dasima):
dasima.run_subscribers()
with pytest.raises(RuntimeError):
dasima.run_subscribers()


class TestMessageSendReceive:
@pytest.fixture(scope="function")
def sub1(self, flask_app):
return Dasima(flask_app)

@pytest.fixture(scope="function")
def sub2(self, flask_app):
return Dasima(flask_app)

@pytest.fixture(scope="function")
def pub(self, flask_app):
return Dasima(flask_app)

@pytest.mark.parametrize("number", [2, 10, 100])
def test_exchange_type_one_recive(self, sub1, sub2, pub, number):
count_list = []

@sub1.exchange_type_one.subscribe("one")
def test_subscribe_function_1():
count_list.append(1)
return

@sub2.exchange_type_one.subscribe("one")
def test_subscribe_function_2():
count_list.append(2)
return

sub1.run_subscribers()
sub2.run_subscribers()

for _ in range(number):
pub.exchange_type_one.send_message({}, "one")

# Wait for received message to be processed
time.sleep(number * 0.01)

counter = Counter(count_list)

assert counter[1] == number // 2
assert counter[2] == number // 2

sub1.stop_subscribers()
sub2.stop_subscribers()

@pytest.mark.parametrize("number", [2, 10, 100])
def test_exchange_type_all_recive(self, sub1, sub2, pub, number):
count_list = []

@sub1.exchange_type_all.subscribe("all")
def test_subscribe_function_1():
count_list.append(1)
return

@sub2.exchange_type_all.subscribe("all")
def test_subscribe_function_2():
count_list.append(2)
return

sub1.run_subscribers()
sub2.run_subscribers()

for _ in range(number):
pub.exchange_type_all.send_message({}, "all")

# Wait for received message to be processed
time.sleep(number * 0.01)

counter = Counter(count_list)

assert counter[1] == number
assert counter[2] == number

sub1.stop_subscribers()
sub2.stop_subscribers()
15 changes: 13 additions & 2 deletions dasima/worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time

from flask.ctx import AppContext
from kombu import Connection, Consumer, Queue, binding
from kombu.mixins import ConsumerProducerMixin
Expand All @@ -23,13 +25,17 @@ def __init__(
self.channel_list = []
self.app_ctx = app_ctx
self.__consumer_config_list = []
self.is_ready = False

def close_channels(self):
for channel in self.channel_list:
# TODO maybe_close_channel
if channel:
channel.close()


def on_consume_ready(self, connection, channel, consumers, **kwargs):
self.is_ready = True

def add_consumer_config(self, queue, on_task):
self.__consumer_config_list.append((queue, on_task))

Expand All @@ -42,7 +48,7 @@ def func(data, routing_key):

def add_consumer_config_list(self, exchange):
binding_dict = exchange.get_binding_dict()
auto_delete =True if exchange.exchange_type == "all" else False
auto_delete = True if exchange.exchange_type == "all" else False
for queue_name, bind_list in binding_dict.items():
func = self.make_combine_function(bind_list)
bindings = [
Expand Down Expand Up @@ -99,6 +105,11 @@ def get_consumers(self, _, default_channel):
def on_consume_end(self, connection, default_channel):
self.close_channels()

def stop(self):
self.should_stop = True
self.connection.release()
time.sleep(0.5)

def publish(self, data, exchange, routing_key):
self.producer.publish(
body=data,
Expand Down
2 changes: 1 addition & 1 deletion example/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def create_app(config_filename=default_cdmdir):
with app.app_context():

# subscribe funtions import
from main.test_functions import (
from main.functions import (
setting_info,
add,
mul,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def mul(x, y):
print("MUL", x, y)
return x * y

# () 없을 때도 위와 같은 함수 이름 div로 바인딩
# () 없을 때도 위와 같이 함수 이름 div로 바인딩
@dasimamq.login.subscribe
def div(x, y):
print("DIV", x, y)
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name="dasima",
version="0.1.4",
version="0.2.0",
description="Message Queue Tools for flask project",
author="Linewalks",
author_email="jindex2411@linewalks.com",
Expand All @@ -18,6 +18,6 @@
],
setup_requires=["pytest-runner"],
test_suite="tests",
tests_require=["pytest", "gevent"],
tests_require=["pytest"],
packages=find_packages(include=["dasima"])
)

0 comments on commit 2222156

Please sign in to comment.