Skip to content

Commit

Permalink
test_athenad: create with_upload_handler decorator (commaai#31250)
Browse files Browse the repository at this point in the history
cleaner
  • Loading branch information
jnewb1 authored Feb 1, 2024
1 parent 7222eb3 commit 5c85925
Showing 1 changed file with 60 additions and 83 deletions.
143 changes: 60 additions & 83 deletions selfdrive/athena/tests/test_athenad.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
from functools import partial
from functools import partial, wraps
import json
import multiprocessing
import os
Expand Down Expand Up @@ -42,6 +42,20 @@ def seed_athena_server(host, port):
with_mock_athena = partial(with_http_server, handler=HTTPRequestHandler, setup=seed_athena_server)


def with_upload_handler(func):
@wraps(func)
def wrapper(*args, **kwargs):
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
return func(*args, **kwargs)
finally:
end_event.set()
thread.join()
return wrapper


class TestAthenadMethods(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand Down Expand Up @@ -209,77 +223,59 @@ def test_uploadFileToUrl_does_not_exist(self, host):
self.assertEqual(not_exists_resp, {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.bz2']})

@with_mock_athena
@with_upload_handler
def test_upload_handler(self, host):
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)

end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()

athenad.upload_queue.put_nowait(item)
try:
self._wait_for_upload()
time.sleep(0.1)
self._wait_for_upload()
time.sleep(0.1)

# TODO: verify that upload actually succeeded
self.assertEqual(athenad.upload_queue.qsize(), 0)
finally:
end_event.set()
# TODO: verify that upload actually succeeded
self.assertEqual(athenad.upload_queue.qsize(), 0)

@parameterized.expand([(500, True), (412, False)])
@with_mock_athena
@mock.patch('requests.put')
def test_upload_handler_retry(self, host, mock_put):
for status, retry in ((500, True), (412, False)):
mock_put.return_value.status_code = status
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)

end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
@with_upload_handler
def test_upload_handler_retry(self, status, retry, mock_put, host):
mock_put.return_value.status_code = status
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)

athenad.upload_queue.put_nowait(item)
try:
self._wait_for_upload()
time.sleep(0.1)
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
time.sleep(0.1)

self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0)
finally:
end_event.set()
self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0)

if retry:
self.assertEqual(athenad.upload_queue.get().retry_count, 1)
if retry:
self.assertEqual(athenad.upload_queue.get().retry_count, 1)

@with_upload_handler
def test_upload_handler_timeout(self):
"""When an upload times out or fails to connect it should be placed back in the queue"""
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT)

end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()

try:
athenad.upload_queue.put_nowait(item_no_retry)
self._wait_for_upload()
time.sleep(0.1)

# Check that upload with retry count exceeded is not put back
self.assertEqual(athenad.upload_queue.qsize(), 0)
athenad.upload_queue.put_nowait(item_no_retry)
self._wait_for_upload()
time.sleep(0.1)

athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
time.sleep(0.1)
# Check that upload with retry count exceeded is not put back
self.assertEqual(athenad.upload_queue.qsize(), 0)

# Check that upload item was put back in the queue with incremented retry count
self.assertEqual(athenad.upload_queue.qsize(), 1)
self.assertEqual(athenad.upload_queue.get().retry_count, 1)
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
time.sleep(0.1)

finally:
end_event.set()
# Check that upload item was put back in the queue with incremented retry count
self.assertEqual(athenad.upload_queue.qsize(), 1)
self.assertEqual(athenad.upload_queue.get().retry_count, 1)

@with_upload_handler
def test_cancelUpload(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={},
created_at=int(time.time()*1000), id='id', allow_cellular=True)
Expand All @@ -288,18 +284,13 @@ def test_cancelUpload(self):

self.assertIn(item.id, athenad.cancelled_uploads)

end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
self._wait_for_upload()
time.sleep(0.1)
self._wait_for_upload()
time.sleep(0.1)

self.assertEqual(athenad.upload_queue.qsize(), 0)
self.assertEqual(len(athenad.cancelled_uploads), 0)
finally:
end_event.set()
self.assertEqual(athenad.upload_queue.qsize(), 0)
self.assertEqual(len(athenad.cancelled_uploads), 0)

@with_upload_handler
def test_cancelExpiry(self):
t_future = datetime.now() - timedelta(days=40)
ts = int(t_future.strftime("%s")) * 1000
Expand All @@ -308,42 +299,28 @@ def test_cancelExpiry(self):
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True)

athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
time.sleep(0.1)

end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
time.sleep(0.1)

self.assertEqual(athenad.upload_queue.qsize(), 0)
finally:
end_event.set()
self.assertEqual(athenad.upload_queue.qsize(), 0)

def test_listUploadQueueEmpty(self):
items = dispatcher["listUploadQueue"]()
self.assertEqual(len(items), 0)

@with_http_server
@with_upload_handler
def test_listUploadQueueCurrent(self, host: str):
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)

end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()

try:
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()

items = dispatcher["listUploadQueue"]()
self.assertEqual(len(items), 1)
self.assertTrue(items[0]['current'])
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()

finally:
end_event.set()
items = dispatcher["listUploadQueue"]()
self.assertEqual(len(items), 1)
self.assertTrue(items[0]['current'])

def test_listUploadQueue(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={},
Expand Down

0 comments on commit 5c85925

Please sign in to comment.