diff --git a/checkov/terraform/base_runner.py b/checkov/terraform/base_runner.py index 93579fea54e..541540b525c 100644 --- a/checkov/terraform/base_runner.py +++ b/checkov/terraform/base_runner.py @@ -21,6 +21,7 @@ from checkov.common.util.secrets import omit_secret_value_from_graph_checks from checkov.common.variables.context import EvaluationContext from checkov.runner_filter import RunnerFilter +from checkov.terraform.checks.terraform.registry import terraform_registry from checkov.terraform.modules.module_objects import TFDefinitionKey from checkov.terraform.checks.data.registry import data_registry from checkov.terraform.checks.module.registry import module_registry @@ -87,6 +88,7 @@ def __init__( "data": data_registry, "provider": provider_registry, "module": module_registry, + "terraform": terraform_registry, } @abstractmethod diff --git a/checkov/terraform/checks/terraform/__init__.py b/checkov/terraform/checks/terraform/__init__.py new file mode 100644 index 00000000000..7f15315c53f --- /dev/null +++ b/checkov/terraform/checks/terraform/__init__.py @@ -0,0 +1 @@ +from checkov.terraform.checks.terraform.terraform import * # noqa diff --git a/checkov/terraform/checks/terraform/base_check.py b/checkov/terraform/checks/terraform/base_check.py new file mode 100644 index 00000000000..76e76c67711 --- /dev/null +++ b/checkov/terraform/checks/terraform/base_check.py @@ -0,0 +1,35 @@ +from abc import abstractmethod +from collections.abc import Iterable +from typing import List, Dict, Any, Optional + +from checkov.common.checks.base_check import BaseCheck +from checkov.common.models.enums import CheckCategories, CheckResult +from checkov.terraform.checks.terraform.registry import terraform_registry + + +class BaseTerraformBlockCheck(BaseCheck): + def __init__( + self, + name: str, + id: str, + categories: "Iterable[CheckCategories]", + supported_blocks: "Iterable[str]", + guideline: Optional[str] = None + ) -> None: + super().__init__( + name=name, + id=id, + categories=categories, + supported_entities=supported_blocks, + block_type="terraform", + guideline=guideline, + ) + self.supported_blocks = supported_blocks + terraform_registry.register(self) + + def scan_entity_conf(self, conf: Dict[str, List[Any]], entity_type: str) -> CheckResult: + return self.scan_terraform_block_conf(conf) + + @abstractmethod + def scan_terraform_block_conf(self, conf: Dict[str, List[Any]]) -> CheckResult: + raise NotImplementedError() diff --git a/checkov/terraform/checks/terraform/base_registry.py b/checkov/terraform/checks/terraform/base_registry.py new file mode 100644 index 00000000000..1bd9b122019 --- /dev/null +++ b/checkov/terraform/checks/terraform/base_registry.py @@ -0,0 +1,37 @@ +from typing import Dict, Any, Tuple + +from checkov.common.checks.base_check_registry import BaseCheckRegistry +from checkov.common.util.consts import START_LINE, END_LINE + + +class Registry(BaseCheckRegistry): + def extract_entity_details(self, entity: Dict[str, Any]) -> Tuple[str, str, Dict[str, Any]]: + terraform_configuration = dict(entity.items()) + + if START_LINE not in terraform_configuration or END_LINE not in terraform_configuration: + start_lines = [] + end_lines = [] + + def find_line_numbers(d): + for k, v in d.items(): + if k == START_LINE: + start_lines.append(v) + elif k == END_LINE: + end_lines.append(v) + elif isinstance(v, dict): + find_line_numbers(v) + elif isinstance(v, list): + for item in v: + if isinstance(item, dict): + find_line_numbers(item) + + find_line_numbers(terraform_configuration) + + if start_lines and end_lines: + terraform_configuration[START_LINE] = min(start_lines) + terraform_configuration[END_LINE] = max(end_lines) + else: + terraform_configuration[START_LINE] = 1 + terraform_configuration[END_LINE] = 1 + + return "terraform", "terraform", terraform_configuration diff --git a/checkov/terraform/checks/terraform/registry.py b/checkov/terraform/checks/terraform/registry.py new file mode 100644 index 00000000000..7b7db4256be --- /dev/null +++ b/checkov/terraform/checks/terraform/registry.py @@ -0,0 +1,4 @@ +from checkov.common.bridgecrew.check_type import CheckType +from checkov.terraform.checks.terraform.base_registry import Registry + +terraform_registry = Registry(CheckType.TERRAFORM) diff --git a/checkov/terraform/checks/terraform/terraform/StateLock.py b/checkov/terraform/checks/terraform/terraform/StateLock.py new file mode 100644 index 00000000000..9755495840a --- /dev/null +++ b/checkov/terraform/checks/terraform/terraform/StateLock.py @@ -0,0 +1,31 @@ +from typing import Dict, List, Any + +from checkov.common.models.enums import CheckResult, CheckCategories +from checkov.terraform.checks.terraform.base_check import BaseTerraformBlockCheck + + +class StateLock(BaseTerraformBlockCheck): + def __init__(self) -> None: + name = "Ensure state files are locked" + id = "CKV_TF_3" + supported_blocks = ("terraform",) + categories = (CheckCategories.SUPPLY_CHAIN,) + super().__init__(name=name, id=id, categories=categories, supported_blocks=supported_blocks) + + def scan_terraform_block_conf(self, conf: Dict[str, List[Any]]) -> CheckResult: + # see: https://developer.hashicorp.com/terraform/language/terraform + if "backend" not in conf: + return CheckResult.UNKNOWN + + backend = conf["backend"][0] if isinstance(conf["backend"], list) else conf["backend"] + + if "s3" not in backend: + return CheckResult.UNKNOWN + + s3_config = backend["s3"] + if ("use_lockfile" not in s3_config or not s3_config["use_lockfile"]) and "dynamodb_table" not in s3_config: + return CheckResult.FAILED + return CheckResult.PASSED + + +check = StateLock() diff --git a/checkov/terraform/checks/terraform/terraform/__init__.py b/checkov/terraform/checks/terraform/terraform/__init__.py new file mode 100644 index 00000000000..613c8302d46 --- /dev/null +++ b/checkov/terraform/checks/terraform/terraform/__init__.py @@ -0,0 +1,3 @@ +from .StateLock import StateLock + +__all__ = ['StateLock'] diff --git a/checkov/terraform/context_parsers/parsers/terraform_context_parser.py b/checkov/terraform/context_parsers/parsers/terraform_context_parser.py new file mode 100644 index 00000000000..159834cc841 --- /dev/null +++ b/checkov/terraform/context_parsers/parsers/terraform_context_parser.py @@ -0,0 +1,28 @@ +from typing import Dict, Any, List + +from hcl2 import END_LINE, START_LINE + +from checkov.terraform.context_parsers.base_parser import BaseContextParser + + +class TerraformBlockContextParser(BaseContextParser): + def __init__(self) -> None: + definition_type = "terraform" + super().__init__(definition_type=definition_type) + + def get_entity_context_path(self, entity_block: Dict[str, Dict[str, Any]]) -> List[str]: + return ["terraform"] + + def enrich_definition_block(self, definition_blocks: List[Dict[str, Any]]) -> Dict[str, Any]: + for entity_block in definition_blocks: + entity_config = entity_block + self.context["terraform"] = { + "start_line": entity_config[START_LINE], + "end_line": entity_config[END_LINE], + "code_lines": self.file_lines[entity_config[START_LINE] - 1: entity_config[END_LINE]], + } + + return self.context + + +parser = TerraformBlockContextParser() diff --git a/checkov/terraform/runner.py b/checkov/terraform/runner.py index 7135be36545..c76cd4823b3 100644 --- a/checkov/terraform/runner.py +++ b/checkov/terraform/runner.py @@ -40,7 +40,7 @@ _TerraformContext: TypeAlias = "dict[TFDefinitionKey, dict[str, Any]]" _TerraformDefinitions: TypeAlias = "dict[TFDefinitionKey, dict[str, Any]]" -CHECK_BLOCK_TYPES = frozenset(["resource", "data", "provider", "module"]) +CHECK_BLOCK_TYPES = frozenset(["resource", "data", "provider", "module", "terraform"]) class Runner(BaseTerraformRunner[_TerraformDefinitions, _TerraformContext, TFDefinitionKey]): diff --git a/tests/terraform/checks/terraform/terraform/__init__.py b/tests/terraform/checks/terraform/terraform/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/terraform/checks/terraform/terraform/resources/lock/fail1.tf b/tests/terraform/checks/terraform/terraform/resources/lock/fail1.tf new file mode 100644 index 00000000000..a22195e4d0a --- /dev/null +++ b/tests/terraform/checks/terraform/terraform/resources/lock/fail1.tf @@ -0,0 +1,7 @@ +terraform { + backend "s3" { + bucket = "example-bucket" + key = "path/to/state" + region = "us-east-1" + } +} \ No newline at end of file diff --git a/tests/terraform/checks/terraform/terraform/resources/lock/pass.tf b/tests/terraform/checks/terraform/terraform/resources/lock/pass.tf new file mode 100644 index 00000000000..afef00d6268 --- /dev/null +++ b/tests/terraform/checks/terraform/terraform/resources/lock/pass.tf @@ -0,0 +1,9 @@ +terraform { + backend "s3" { + bucket = "example-bucket" + key = "path/to/state" + region = "us-east-1" + use_lockfile = true + dynamodb_table = "terraform-locks" + } +} \ No newline at end of file diff --git a/tests/terraform/checks/terraform/terraform/resources/lock/pass_dynamodb_table.tf b/tests/terraform/checks/terraform/terraform/resources/lock/pass_dynamodb_table.tf new file mode 100644 index 00000000000..90cb9759de0 --- /dev/null +++ b/tests/terraform/checks/terraform/terraform/resources/lock/pass_dynamodb_table.tf @@ -0,0 +1,8 @@ +terraform { + backend "s3" { + bucket = "example-bucket" + key = "path/to/state" + region = "us-east-1" + dynamodb_table = "terraform-locks" + } +} \ No newline at end of file diff --git a/tests/terraform/checks/terraform/terraform/resources/lock/unknown.tf b/tests/terraform/checks/terraform/terraform/resources/lock/unknown.tf new file mode 100644 index 00000000000..7f14c41b031 --- /dev/null +++ b/tests/terraform/checks/terraform/terraform/resources/lock/unknown.tf @@ -0,0 +1,5 @@ +terraform { + backend "local" { + path = "relative/path/to/terraform.tfstate" + } +} diff --git a/tests/terraform/checks/terraform/terraform/test_StateLock.py b/tests/terraform/checks/terraform/terraform/test_StateLock.py new file mode 100644 index 00000000000..2433f083228 --- /dev/null +++ b/tests/terraform/checks/terraform/terraform/test_StateLock.py @@ -0,0 +1,41 @@ +import os +import unittest + +from checkov.runner_filter import RunnerFilter +from checkov.terraform.checks.terraform.terraform.StateLock import check +from checkov.common.models.enums import CheckResult +from checkov.terraform.runner import Runner + + +class TestStateLock(unittest.TestCase): + def test(self): + runner = Runner() + current_dir = os.path.dirname(os.path.realpath(__file__)) + + test_files_dir = current_dir + "/resources/lock" + report = runner.run( + root_folder=test_files_dir, runner_filter=RunnerFilter(checks=[check.id]) + ) + summary = report.get_summary() + + passing_resources = { + "terraform", + } + failing_resources = { + "terraform", + } + + passed_check_resources = set([c.resource for c in report.passed_checks]) + failed_check_resources = set([c.resource for c in report.failed_checks]) + + self.assertEqual(summary["passed"], 2) + self.assertEqual(summary["failed"], 1) + self.assertEqual(summary["skipped"], 0) + self.assertEqual(summary["parsing_errors"], 0) + + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == '__main__': + unittest.main()