diff --git a/camayoc/config.py b/camayoc/config.py index 1d2ef724..63cb1665 100644 --- a/camayoc/config.py +++ b/camayoc/config.py @@ -22,6 +22,7 @@ Validator("camayoc.db_cleanup", default=True), Validator("camayoc.snapshot_test_reference_path", default=None), Validator("camayoc.snapshot_test_actual_path", default=None), + Validator("camayoc.snapshot_test_reference_synthetic", default=False), Validator("quipucords_server.hostname", default=""), Validator("quipucords_server.https", default=False), Validator("quipucords_server.port", default=8000), diff --git a/camayoc/tests/qpc/cli/test_credentials.py b/camayoc/tests/qpc/cli/test_credentials.py index f9ed9ea0..c87032eb 100644 --- a/camayoc/tests/qpc/cli/test_credentials.py +++ b/camayoc/tests/qpc/cli/test_credentials.py @@ -710,3 +710,69 @@ def test_clear_all(isolated_filesystem, qpc_server_config, cleaning_data_provide ) assert "No credentials exist yet." in output assert exitstatus == 0 + + +@pytest.mark.upgrade_only +def test_edit_existing_credential_username(qpc_server_config, source_type): + """Check that credential that existed before upgrade can be edited. + + We want to check all credential types, but the only field that is common + for all of them is the name - and this is one field that CLI can not + change. So we try to change username instead, but RHACS and OpenShift + token do not have it. + + :id: 93dd6711-ef58-4370-9e4a-d427785a9bf9 + :description: Edit existing credential + :steps: + 1) Select a credential of specified type + 2) Edit credential, changing the username + 3) Verify that credential was changed + 4) Edit credential again, restoring old username + :expectedresults: Credential is modified and changes are saved. + """ + new_username = utils.uuid4() + + output, _ = pexpect.run( + "{} -v cred list --type {}".format(client_cmd, source_type), + encoding="utf8", + withexitstatus=True, + ) + try: + all_credentials = json.loads(output) + except ValueError: + pytest.skip("There are no credentials of this type") + + credential = random.choice(all_credentials) + credential_name = credential.get("name") + if not credential.get("username"): + pytest.skip("Credential does not store username") + + # Edit credential + output, exitstatus = pexpect.run( + "{} -v cred edit --name={} --username={}".format(client_cmd, credential_name, new_username), + encoding="utf-8", + withexitstatus=True, + ) + assert f'Credential "{credential_name}" was updated' in output + assert exitstatus == 0 + + # Grab the new data, prepare both for comparison, compare + output, exitstatus = pexpect.run( + "{} -v cred show --name={}".format(client_cmd, credential_name), + encoding="utf-8", + withexitstatus=True, + ) + updated_credential = json.loads(output) + expected = credential.copy() + expected["username"] = new_username + expected.pop("updated_at", None) + updated_credential.pop("updated_at", None) + + assert expected == updated_credential + + # Restore old username + pexpect.run( + "{} -v cred edit --name={} --username={}".format( + client_cmd, credential_name, credential.get("username") + ) + ) diff --git a/camayoc/tests/qpc/cli/test_endtoend.py b/camayoc/tests/qpc/cli/test_endtoend.py new file mode 100644 index 00000000..1b5183ee --- /dev/null +++ b/camayoc/tests/qpc/cli/test_endtoend.py @@ -0,0 +1,105 @@ +import random +import re +import tarfile + +import pytest + +from camayoc.config import settings +from camayoc.constants import CONNECTION_PASSWORD_INPUT +from camayoc.qpc_models import Scan +from camayoc.tests.qpc.utils import assert_ansible_logs +from camayoc.tests.qpc.utils import assert_sha256sums +from camayoc.tests.qpc.utils import end_to_end_sources_names +from camayoc.utils import uuid4 + +from .utils import cred_add_and_check +from .utils import report_download +from .utils import scan_add_and_check +from .utils import scan_job +from .utils import scan_start +from .utils import source_add_and_check +from .utils import wait_for_scan + + +@pytest.mark.slow +@pytest.mark.nightly_only +@pytest.mark.parametrize("source_name", end_to_end_sources_names()) +def test_end_to_end(tmp_path, qpc_server_config, data_provider, source_name): + """End-to-end test using command line interface. + + :id: 5649c69b-1e14-4279-b571-5aec12ea0716 + :description: This is end-to-end user journey through command line interface. + :steps: + 1) Create new credential. + 2) Create new source. + 3) Trigger scan for newly created source. + 4) Wait for scan to complete. + 5) Download scan report. + :expectedresults: Credential and Source are created. Scan is completed. + Report is downloaded. + """ + scan_name = uuid4() + + # Get a random credential associated with a source in configuration + known_sources_map = { + source_definition.name: source_definition for source_definition in settings.sources + } + source_definition = known_sources_map.get(source_name) + credential_name = random.choice(source_definition.credentials) + credential_model = data_provider.credentials.new_one({"name": credential_name}, data_only=True) + data_provider.mark_for_cleanup(credential_model) + + # Create a credential + cred_add_args = { + "name": credential_model.name, + "username": credential_model.username, + "type": credential_model.cred_type, + } + secret_inputs = [] + if cred_password := credential_model.password: + secret_inputs.append((CONNECTION_PASSWORD_INPUT, cred_password)) + cred_add_args["password"] = None + if cred_ssh_key := credential_model.ssh_key: + secret_inputs.append(("Private SSH Key:", cred_ssh_key)) + cred_add_args["sshkey"] = None + cred_add_and_check(cred_add_args, inputs=secret_inputs) + + # Create a source + source_model = data_provider.sources.new_one( + {"name": source_definition.name}, new_dependencies=True, data_only=True + ) + source_add_args = { + "name": source_model.name, + "cred": [credential_model.name], + "hosts": source_model.hosts, + "type": source_model.source_type, + } + if source_port := getattr(source_model, "port", None): + source_add_args["port"] = source_port + if source_options := getattr(source_model, "options", {}): + source_add_args.update({key.replace("_", "-"): val for key, val in source_options.items()}) + data_provider.mark_for_cleanup(source_model) + source_add_and_check(source_add_args) + + # Create and run a scan + data_provider.mark_for_cleanup(Scan(name=scan_name)) + scan_add_and_check({"name": scan_name, "sources": source_model.name}) + + result = scan_start({"name": scan_name}) + match = re.match(r'Scan "(\d+)" started.', result) + assert match is not None + scan_job_id = match.group(1) + wait_for_scan(scan_job_id) + result = scan_job({"id": scan_job_id}) + assert result["status"] == "completed" + assert result["report_id"] + + # Download and verify a report + is_network_scan = source_definition.type == "network" + downloaded_report = tmp_path / "report.tar.gz" + + report_download({"scan-job": scan_job_id, "output-file": downloaded_report.as_posix()}) + + tarfile.open(downloaded_report).extractall(tmp_path, filter="tar") + assert_sha256sums(tmp_path) + assert_ansible_logs(tmp_path, is_network_scan) diff --git a/camayoc/tests/qpc/cli/test_scanjobs.py b/camayoc/tests/qpc/cli/test_scanjobs.py index d7ff12f2..8f2081a2 100644 --- a/camayoc/tests/qpc/cli/test_scanjobs.py +++ b/camayoc/tests/qpc/cli/test_scanjobs.py @@ -11,19 +11,25 @@ import json import random import re +import tarfile import pytest from littletable import Table +from camayoc.config import settings from camayoc.constants import QPC_OPTIONAL_PRODUCTS from camayoc.qpc_models import Scan +from camayoc.tests.qpc.utils import assert_ansible_logs +from camayoc.tests.qpc.utils import assert_sha256sums from camayoc.utils import uuid4 from .utils import report_detail +from .utils import report_download from .utils import scan_add_and_check from .utils import scan_cancel from .utils import scan_job from .utils import scan_start +from .utils import scans_with_source_type from .utils import wait_for_scan @@ -295,3 +301,46 @@ def test_scanjob_cancel(qpc_server_config, data_provider): wait_for_scan(scan_job_id, status="running", timeout=60) scan_cancel({"id": scan_job_id}) wait_for_scan(scan_job_id, status="canceled", timeout=60) + + +@pytest.mark.upgrade_only +@pytest.mark.runs_scan +@pytest.mark.skipif( + bool(settings.camayoc.snapshot_test_reference_synthetic), + reason="Snapshot reference data is synthetic, scans on rerun would fail", +) +def test_rerun_scanjob(tmp_path, qpc_server_config, source_type): + """After upgrade, run existing scan again. + + :id: 283c89be-b950-481d-ad81-76b74663823e + :description: Find a scan that was run before the upgrade and run it again. + :steps: + 1) Select a random scan that used a source of given type. + 2) Run that scan again + 3) Wait for scan to complete. + 4) Download scan report. + :expectedresults: Scan is completed, report is downloaded. + """ + matching_scans = scans_with_source_type(source_type) + if not matching_scans: + pytest.skip("There are no scans with sources of this type") + + scan = random.choice(matching_scans) + + result = scan_start({"name": scan.get("name")}) + match = re.match(r'Scan "(\d+)" started.', result) + assert match is not None + scan_job_id = match.group(1) + wait_for_scan(scan_job_id) + result = scan_job({"id": scan_job_id}) + assert result["status"] == "completed" + assert result["report_id"] + + is_network_scan = source_type == "network" + downloaded_report = tmp_path / "report.tar.gz" + + report_download({"scan-job": scan_job_id, "output-file": downloaded_report.as_posix()}) + + tarfile.open(downloaded_report).extractall(tmp_path, filter="tar") + assert_sha256sums(tmp_path) + assert_ansible_logs(tmp_path, is_network_scan) diff --git a/camayoc/tests/qpc/cli/test_sources.py b/camayoc/tests/qpc/cli/test_sources.py index 2e468449..e0bc6fab 100644 --- a/camayoc/tests/qpc/cli/test_sources.py +++ b/camayoc/tests/qpc/cli/test_sources.py @@ -1708,3 +1708,59 @@ def test_clear_all(cleaning_data_provider, isolated_filesystem, qpc_server_confi assert qpc_source_list.expect(pexpect.EOF) == 0 qpc_source_list.close() assert qpc_source_list.exitstatus == 0 + + +@pytest.mark.upgrade_only +def test_edit_existing_source_hosts(qpc_server_config, source_type): + """Check that source that existed before upgrade can be edited. + + :id: 2ccfb156-5a27-403b-a808-c9dc4404a158 + :description: Edit existing source + :steps: + 1) Select a source of specified type + 2) Edit source, changing the hosts field + 3) Verify that source was changed + 4) Edit source again, restoring old hosts + :expectedresults: Source is modified and changes are saved. + """ + new_hosts = "127.0.0.{}".format(random.randint(1, 254)) + + output, _ = pexpect.run( + "{} -v source list --type {}".format(client_cmd, source_type), + encoding="utf8", + withexitstatus=True, + ) + try: + all_sources = json.loads(output) + except ValueError: + pytest.skip("There are no sources of this type") + + source = random.choice(all_sources) + source_name = source.get("name") + + # Edit source + output, exitstatus = pexpect.run( + "{} -v source edit --name={} --hosts={}".format(client_cmd, source_name, new_hosts), + encoding="utf-8", + withexitstatus=True, + ) + assert f'Source "{source_name}" was updated' in output + assert exitstatus == 0 + + # Grab the new data, prepare both for comparison, compare + output, exitstatus = pexpect.run( + "{} -v source show --name={}".format(client_cmd, source_name), + encoding="utf-8", + withexitstatus=True, + ) + updated_source = json.loads(output) + expected = source.copy() + expected["hosts"] = [new_hosts] + + assert expected == updated_source + + # Restore old username + old_hosts = source.get("hosts") + pexpect.run( + "{} -v cred edit --name={} --hosts={}".format(client_cmd, source_name, ",".join(old_hosts)) + ) diff --git a/camayoc/tests/qpc/cli/utils.py b/camayoc/tests/qpc/cli/utils.py index b38eb849..6262045d 100644 --- a/camayoc/tests/qpc/cli/utils.py +++ b/camayoc/tests/qpc/cli/utils.py @@ -365,6 +365,25 @@ def retrieve_report(scan_job_id): return details, deployments +def scans_with_source_type(source_type): + """Find scans created for sources of given type. + + Conceptually, this belongs to DataProvider. However, DataProvider is + concerned with efficiently creating a data defined in settings - here, we + are concerned with data that exists in Quipucords database. DataProvider + could, and arguably should, be extended to cover this use case, but as of + this comment, there's a single test that needs this. Test in question is + run after upgrade and entities defined in settings may already exist, but + DataProvider is not aware of them. + """ + matching_scans = [] + all_scans = json.loads(cli_command("{} -v scan list".format(client_cmd))) + for scan in all_scans: + if any(source_type == source.get("source_type") for source in scan.get("sources", [])): + matching_scans.append(scan) + return matching_scans + + def setup_qpc(): """Configure and login qpc with Camayoc's configuration info. diff --git a/camayoc/tests/qpc/ui/test_endtoend.py b/camayoc/tests/qpc/ui/test_endtoend.py index 7b47657b..c20f738e 100644 --- a/camayoc/tests/qpc/ui/test_endtoend.py +++ b/camayoc/tests/qpc/ui/test_endtoend.py @@ -16,6 +16,7 @@ from camayoc.qpc_models import Scan from camayoc.tests.qpc.utils import assert_ansible_logs from camayoc.tests.qpc.utils import assert_sha256sums +from camayoc.tests.qpc.utils import end_to_end_sources_names from camayoc.types.ui import AddCredentialDTO from camayoc.types.ui import AddSourceDTO from camayoc.ui import Client @@ -52,17 +53,9 @@ def create_endtoend_dtos(source_name, data_provider): return credential_dto, source_dto, trigger_scan_dto -def source_names(): - for source_definition in settings.sources: - if source_definition.type in ("openshift",): - continue - fixture_id = f"{source_definition.name}-{source_definition.type}" - yield pytest.param(source_definition.name, id=fixture_id) - - @pytest.mark.slow @pytest.mark.nightly_only -@pytest.mark.parametrize("source_name", source_names()) +@pytest.mark.parametrize("source_name", end_to_end_sources_names()) def test_end_to_end(tmp_path, cleaning_data_provider, ui_client: Client, source_name): """End-to-end test using web user interface. @@ -104,7 +97,7 @@ def test_end_to_end(tmp_path, cleaning_data_provider, ui_client: Client, source_ @pytest.mark.skip("Skipped due to intermittent failure - DISCOVERY-426") -@pytest.mark.parametrize("source_name", source_names()) +@pytest.mark.parametrize("source_name", end_to_end_sources_names()) def test_trigger_scan(cleaning_data_provider, ui_client: Client, source_name): """Mostly end-to-end test using web user interface (without downloading scan results). diff --git a/camayoc/tests/qpc/utils.py b/camayoc/tests/qpc/utils.py index 1feca513..2689af9b 100644 --- a/camayoc/tests/qpc/utils.py +++ b/camayoc/tests/qpc/utils.py @@ -6,6 +6,8 @@ from pathlib import Path from typing import Callable +import pytest + from camayoc import api from camayoc.config import settings from camayoc.constants import QPC_SCAN_STATES @@ -161,6 +163,18 @@ def scan_names(predicate: Callable[[ScanOptions], bool]) -> list[str]: return matching_scans +def end_to_end_sources_names(): + """Generate source names as pytest params. + + This is used by CLI and UI end_to_end tests. + """ + for source_definition in settings.sources: + if source_definition.type in ("openshift",): + continue + fixture_id = f"{source_definition.name}-{source_definition.type}" + yield pytest.param(source_definition.name, id=fixture_id) + + def wait_until_state(scanjob, timeout=settings.camayoc.scan_timeout, state="completed"): """Wait until the scanjob has failed or reached desired state. diff --git a/camayoc/types/settings.py b/camayoc/types/settings.py index 959372c5..5fff9515 100644 --- a/camayoc/types/settings.py +++ b/camayoc/types/settings.py @@ -16,6 +16,7 @@ class CamayocOptions(BaseModel): db_cleanup: Optional[bool] = True snapshot_test_reference_path: Optional[Path] = None snapshot_test_actual_path: Optional[Path] = None + snapshot_test_reference_synthetic: Optional[bool] = False class QuipucordsServerOptions(BaseModel):