Skip to content

Commit

Permalink
add some micro scaling to the queue when it is growing
Browse files Browse the repository at this point in the history
  • Loading branch information
deeleeramone committed Dec 21, 2024
1 parent 34a0253 commit 696b1da
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ def __init__( # pylint: disable=too-many-positional-arguments
temp_file_path = temp_file.name
self.results_path = Path(temp_file_path).absolute()
self.results_file = temp_file_path
if ":" in results_file:
self.results_file = results_file
self.results_path = results_file # type: ignore
kwargs["uri"] = True
else:
if ":" in results_file:
self.results_file = results_file
self.results_path = results_file # type: ignore
kwargs["uri"] = True
self.results_path = Path(results_file).absolute()
self.results_file = results_file

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ async def enqueue(self, message):

retries = 0
while retries < self.max_retries:

if self.queue.qsize() / self.queue.maxsize > 0.3:
await sleep(0.000005)
if self.queue.qsize() / self.queue.maxsize > 0.5:
await sleep(0.00005)
if self.queue.qsize() / self.queue.maxsize > 0.55:
await sleep(0.00005)
if self.queue.qsize() / self.queue.maxsize > 0.6:
await sleep(0.00005)
if self.queue.qsize() / self.queue.maxsize > 0.65:
await sleep(0.00005)
if self.queue.qsize() / self.queue.maxsize > 0.7:
await sleep(0.00005)
if self.queue.qsize() / self.queue.maxsize > 0.99:
await sleep(0.00005)

if self.queue.full():
retries += 1
msg = f"Queue is full. Retrying {retries}/{self.max_retries}..."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
@pytest.fixture(scope="module")
def database():
"""Return a MessageQueue instance."""
return Database(table_name="test_table")
return Database(table_name="test")


def test_setup_database(database):
Expand Down Expand Up @@ -61,7 +61,7 @@ def test_multiple_connections(database):
)
another_db = Database(
results_file=database.results_file,
table_name="other_table",
table_name="other_test",
)
assert new_db.fetch_all()[0] == MOCK_MESSAGES[0]
database.write_to_db(MOCK_MESSAGES[1])
Expand All @@ -87,16 +87,16 @@ def test_query_db(database):
assert len(database.query(query)) == 2
query = "json_extract (message, '$.type') == 'quote'"
assert len(database.query(query)) == 1
query = "SELECT message FROM test_table WHERE json_extract (message, '$.type') = 'trade'"
query = "SELECT message FROM test WHERE json_extract (message, '$.type') = 'trade'"
assert len(database.query(query)) == 2
query = "SELECT json_extract (message, '$.symbol') FROM test_table WHERE json_extract (message, '$.type') = 'trade'"
query = "SELECT json_extract (message, '$.symbol') FROM test WHERE json_extract (message, '$.type') = 'trade'"
assert database.query(query) == ["test1", "test3"]


def test_limit():
"""Test if the limit parameter is working and that the auto increment index doesn't reset when cleared."""
database = Database(
table_name="test_limit_table",
table_name="test_limit",
limit=2,
)
assert database
Expand All @@ -112,5 +112,5 @@ def test_limit():
database.clear_results()
assert database.fetch_all() == []
database.write_to_db(MOCK_MESSAGES[0])
query = "SELECT id FROM test_limit_table"
query = "SELECT id FROM test_limit"
assert database.query(query)[0] > 3
298 changes: 0 additions & 298 deletions openbb_platform/extensions/websockets/openbb_websockets/broadcast.py

This file was deleted.

Loading

0 comments on commit 696b1da

Please sign in to comment.