Skip to content

Commit

Permalink
refactor: remove useless class from test
Browse files Browse the repository at this point in the history
  • Loading branch information
jstucke authored and rhelmke committed Feb 17, 2025
1 parent f34190a commit 1fa86be
Showing 1 changed file with 65 additions and 63 deletions.
128 changes: 65 additions & 63 deletions src/test/acceptance/rest/test_rest_analyze_firmware.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,66 +8,68 @@
test_container_uid = '418a54d78550e8584291c96e5d6168133621f352bfc1d43cf84e81187fef4962_787'


class TestRestFirmware:
def _rest_upload_firmware(self, test_client):
data = get_firmware_for_rest_upload_test()
rv = test_client.put('/rest/firmware', json=data, follow_redirects=True)
assert b'"status": 0' in rv.data, 'rest upload not successful'
assert test_container_uid.encode() in rv.data, 'uid not found in rest upload reply'

def _rest_get_analysis_result(self, test_client):
rv = test_client.get(f'/rest/firmware/{test_container_uid}', follow_redirects=True)
assert b'analysis_date' in rv.data, 'rest analysis download not successful'
assert b'software_components' in rv.data, 'rest analysis not successful'
assert b'"device_part": "test_part' in rv.data, 'device part not present'

def _rest_search(self, test_client):
query = urllib.parse.quote('{"device_class": "test_class"}')
rv = test_client.get(f'/rest/firmware?query={query}', follow_redirects=True)
assert test_container_uid.encode() in rv.data, 'test firmware not found in rest search'

def _rest_search_fw_only(self, test_client):
query = json.dumps({'sha256': test_container_uid.split('_')[0]})
rv = test_client.get(f'/rest/firmware?query={urllib.parse.quote(query)}', follow_redirects=True)
assert test_container_uid.encode() in rv.data, 'test firmware not found in rest search'

def _rest_update_analysis_bad_analysis(self, test_client):
query = urllib.parse.quote('["unknown_system"]')
rv = test_client.put(f'/rest/firmware/{test_container_uid}?update={query}', follow_redirects=True)
assert (
b'Unknown analysis system' in rv.data
), 'rest analysis update should break on request of non existing system'

def _rest_update_analysis_success(self, test_client):
update = urllib.parse.quote(json.dumps(['crypto_material']))
rv = test_client.put(f'/rest/firmware/{test_container_uid}?update={update}', follow_redirects=True)
assert b'error_message' not in rv.data, 'Error on update request'

def _rest_check_new_analysis_exists(self, test_client):
rv = test_client.get(f'/rest/firmware/{test_container_uid}', follow_redirects=True)
response_data = json.loads(rv.data.decode())
assert response_data['firmware']['analysis']['crypto_material']
assert (
response_data['firmware']['analysis']['crypto_material']['analysis_date']
> response_data['firmware']['analysis']['software_components']['analysis_date']
)

@pytest.mark.usefixtures('intercom_backend_binding')
# container including 3 files times 3 plugins
@pytest.mark.SchedulerTestConfig(items_to_analyze=4 * 3)
def test_run_from_upload_to_show_analysis_and_search(
self, test_client, analysis_finished_event, analysis_finished_counter
):
self._rest_upload_firmware(test_client)
assert analysis_finished_event.wait(timeout=30)
analysis_finished_counter.value -= 4 # only one plugin to update
analysis_finished_event.clear()
self._rest_get_analysis_result(test_client)
self._rest_search(test_client)
self._rest_search_fw_only(test_client)
self._rest_update_analysis_bad_analysis(test_client)
self._rest_update_analysis_success(test_client)

assert analysis_finished_event.wait(timeout=30)

self._rest_check_new_analysis_exists(test_client)
def _rest_upload_firmware(test_client):
data = get_firmware_for_rest_upload_test()
rv = test_client.put('/rest/firmware', json=data, follow_redirects=True)
assert b'"status": 0' in rv.data, 'rest upload not successful'
assert test_container_uid.encode() in rv.data, 'uid not found in rest upload reply'


def _rest_get_analysis_result(test_client):
rv = test_client.get(f'/rest/firmware/{test_container_uid}', follow_redirects=True)
assert b'analysis_date' in rv.data, 'rest analysis download not successful'
assert b'software_components' in rv.data, 'rest analysis not successful'
assert b'"device_part": "test_part' in rv.data, 'device part not present'


def _rest_search(test_client):
query = urllib.parse.quote('{"device_class": "test_class"}')
rv = test_client.get(f'/rest/firmware?query={query}', follow_redirects=True)
assert test_container_uid.encode() in rv.data, 'test firmware not found in rest search'


def _rest_search_fw_only(test_client):
query = json.dumps({'sha256': test_container_uid.split('_')[0]})
rv = test_client.get(f'/rest/firmware?query={urllib.parse.quote(query)}', follow_redirects=True)
assert test_container_uid.encode() in rv.data, 'test firmware not found in rest search'


def _rest_update_analysis_bad_analysis(test_client):
query = urllib.parse.quote('["unknown_system"]')
rv = test_client.put(f'/rest/firmware/{test_container_uid}?update={query}', follow_redirects=True)
assert b'Unknown analysis system' in rv.data, 'rest analysis update should break on request of non existing system'


def _rest_update_analysis_success(test_client):
update = urllib.parse.quote(json.dumps(['crypto_material']))
rv = test_client.put(f'/rest/firmware/{test_container_uid}?update={update}', follow_redirects=True)
assert b'error_message' not in rv.data, 'Error on update request'


def _rest_check_new_analysis_exists(test_client):
rv = test_client.get(f'/rest/firmware/{test_container_uid}', follow_redirects=True)
response_data = json.loads(rv.data.decode())
assert response_data['firmware']['analysis']['crypto_material']
assert (
response_data['firmware']['analysis']['crypto_material']['analysis_date']
> response_data['firmware']['analysis']['software_components']['analysis_date']
)


@pytest.mark.usefixtures('intercom_backend_binding')
# container including 3 files times 3 plugins
@pytest.mark.SchedulerTestConfig(items_to_analyze=4 * 3)
def test_run_from_upload_to_show_analysis_and_search(test_client, analysis_finished_event, analysis_finished_counter):
_rest_upload_firmware(test_client)
assert analysis_finished_event.wait(timeout=30)
analysis_finished_counter.value -= 4 # only one plugin to update
analysis_finished_event.clear()
_rest_get_analysis_result(test_client)
_rest_search(test_client)
_rest_search_fw_only(test_client)
_rest_update_analysis_bad_analysis(test_client)
_rest_update_analysis_success(test_client)

assert analysis_finished_event.wait(timeout=30)

_rest_check_new_analysis_exists(test_client)

0 comments on commit 1fa86be

Please sign in to comment.