-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(terraform): Add a terraform block check (#6904)
* Initial commit * fix flake8 * fix dogfood * fix tests * handle tf json * barak feedback
- Loading branch information
Showing
15 changed files
with
212 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from checkov.terraform.checks.terraform.terraform import * # noqa |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from abc import abstractmethod | ||
from collections.abc import Iterable | ||
from typing import List, Dict, Any, Optional | ||
|
||
from checkov.common.checks.base_check import BaseCheck | ||
from checkov.common.models.enums import CheckCategories, CheckResult | ||
from checkov.terraform.checks.terraform.registry import terraform_registry | ||
|
||
|
||
class BaseTerraformBlockCheck(BaseCheck): | ||
def __init__( | ||
self, | ||
name: str, | ||
id: str, | ||
categories: "Iterable[CheckCategories]", | ||
supported_blocks: "Iterable[str]", | ||
guideline: Optional[str] = None | ||
) -> None: | ||
super().__init__( | ||
name=name, | ||
id=id, | ||
categories=categories, | ||
supported_entities=supported_blocks, | ||
block_type="terraform", | ||
guideline=guideline, | ||
) | ||
self.supported_blocks = supported_blocks | ||
terraform_registry.register(self) | ||
|
||
def scan_entity_conf(self, conf: Dict[str, List[Any]], entity_type: str) -> CheckResult: | ||
return self.scan_terraform_block_conf(conf) | ||
|
||
@abstractmethod | ||
def scan_terraform_block_conf(self, conf: Dict[str, List[Any]]) -> CheckResult: | ||
raise NotImplementedError() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
from typing import Dict, Any, Tuple | ||
|
||
from checkov.common.checks.base_check_registry import BaseCheckRegistry | ||
from checkov.common.util.consts import START_LINE, END_LINE | ||
|
||
|
||
class Registry(BaseCheckRegistry): | ||
def extract_entity_details(self, entity: Dict[str, Any]) -> Tuple[str, str, Dict[str, Any]]: | ||
terraform_configuration = dict(entity.items()) | ||
|
||
if START_LINE not in terraform_configuration or END_LINE not in terraform_configuration: | ||
start_lines = [] | ||
end_lines = [] | ||
|
||
def find_line_numbers(d): | ||
for k, v in d.items(): | ||
if k == START_LINE: | ||
start_lines.append(v) | ||
elif k == END_LINE: | ||
end_lines.append(v) | ||
elif isinstance(v, dict): | ||
find_line_numbers(v) | ||
elif isinstance(v, list): | ||
for item in v: | ||
if isinstance(item, dict): | ||
find_line_numbers(item) | ||
|
||
find_line_numbers(terraform_configuration) | ||
|
||
if start_lines and end_lines: | ||
terraform_configuration[START_LINE] = min(start_lines) | ||
terraform_configuration[END_LINE] = max(end_lines) | ||
else: | ||
terraform_configuration[START_LINE] = 1 | ||
terraform_configuration[END_LINE] = 1 | ||
|
||
return "terraform", "terraform", terraform_configuration |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from checkov.common.bridgecrew.check_type import CheckType | ||
from checkov.terraform.checks.terraform.base_registry import Registry | ||
|
||
terraform_registry = Registry(CheckType.TERRAFORM) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
from typing import Dict, List, Any | ||
|
||
from checkov.common.models.enums import CheckResult, CheckCategories | ||
from checkov.terraform.checks.terraform.base_check import BaseTerraformBlockCheck | ||
|
||
|
||
class StateLock(BaseTerraformBlockCheck): | ||
def __init__(self) -> None: | ||
name = "Ensure state files are locked" | ||
id = "CKV_TF_3" | ||
supported_blocks = ("terraform",) | ||
categories = (CheckCategories.SUPPLY_CHAIN,) | ||
super().__init__(name=name, id=id, categories=categories, supported_blocks=supported_blocks) | ||
|
||
def scan_terraform_block_conf(self, conf: Dict[str, List[Any]]) -> CheckResult: | ||
# see: https://developer.hashicorp.com/terraform/language/terraform | ||
if "backend" not in conf: | ||
return CheckResult.UNKNOWN | ||
|
||
backend = conf["backend"][0] if isinstance(conf["backend"], list) else conf["backend"] | ||
|
||
if "s3" not in backend: | ||
return CheckResult.UNKNOWN | ||
|
||
s3_config = backend["s3"] | ||
if ("use_lockfile" not in s3_config or not s3_config["use_lockfile"]) and "dynamodb_table" not in s3_config: | ||
return CheckResult.FAILED | ||
return CheckResult.PASSED | ||
|
||
|
||
check = StateLock() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .StateLock import StateLock | ||
|
||
__all__ = ['StateLock'] |
28 changes: 28 additions & 0 deletions
28
checkov/terraform/context_parsers/parsers/terraform_context_parser.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from typing import Dict, Any, List | ||
|
||
from hcl2 import END_LINE, START_LINE | ||
|
||
from checkov.terraform.context_parsers.base_parser import BaseContextParser | ||
|
||
|
||
class TerraformBlockContextParser(BaseContextParser): | ||
def __init__(self) -> None: | ||
definition_type = "terraform" | ||
super().__init__(definition_type=definition_type) | ||
|
||
def get_entity_context_path(self, entity_block: Dict[str, Dict[str, Any]]) -> List[str]: | ||
return ["terraform"] | ||
|
||
def enrich_definition_block(self, definition_blocks: List[Dict[str, Any]]) -> Dict[str, Any]: | ||
for entity_block in definition_blocks: | ||
entity_config = entity_block | ||
self.context["terraform"] = { | ||
"start_line": entity_config[START_LINE], | ||
"end_line": entity_config[END_LINE], | ||
"code_lines": self.file_lines[entity_config[START_LINE] - 1: entity_config[END_LINE]], | ||
} | ||
|
||
return self.context | ||
|
||
|
||
parser = TerraformBlockContextParser() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
7 changes: 7 additions & 0 deletions
7
tests/terraform/checks/terraform/terraform/resources/lock/fail1.tf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
terraform { | ||
backend "s3" { | ||
bucket = "example-bucket" | ||
key = "path/to/state" | ||
region = "us-east-1" | ||
} | ||
} |
9 changes: 9 additions & 0 deletions
9
tests/terraform/checks/terraform/terraform/resources/lock/pass.tf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
terraform { | ||
backend "s3" { | ||
bucket = "example-bucket" | ||
key = "path/to/state" | ||
region = "us-east-1" | ||
use_lockfile = true | ||
dynamodb_table = "terraform-locks" | ||
} | ||
} |
8 changes: 8 additions & 0 deletions
8
tests/terraform/checks/terraform/terraform/resources/lock/pass_dynamodb_table.tf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
terraform { | ||
backend "s3" { | ||
bucket = "example-bucket" | ||
key = "path/to/state" | ||
region = "us-east-1" | ||
dynamodb_table = "terraform-locks" | ||
} | ||
} |
5 changes: 5 additions & 0 deletions
5
tests/terraform/checks/terraform/terraform/resources/lock/unknown.tf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
terraform { | ||
backend "local" { | ||
path = "relative/path/to/terraform.tfstate" | ||
} | ||
} |
41 changes: 41 additions & 0 deletions
41
tests/terraform/checks/terraform/terraform/test_StateLock.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import os | ||
import unittest | ||
|
||
from checkov.runner_filter import RunnerFilter | ||
from checkov.terraform.checks.terraform.terraform.StateLock import check | ||
from checkov.common.models.enums import CheckResult | ||
from checkov.terraform.runner import Runner | ||
|
||
|
||
class TestStateLock(unittest.TestCase): | ||
def test(self): | ||
runner = Runner() | ||
current_dir = os.path.dirname(os.path.realpath(__file__)) | ||
|
||
test_files_dir = current_dir + "/resources/lock" | ||
report = runner.run( | ||
root_folder=test_files_dir, runner_filter=RunnerFilter(checks=[check.id]) | ||
) | ||
summary = report.get_summary() | ||
|
||
passing_resources = { | ||
"terraform", | ||
} | ||
failing_resources = { | ||
"terraform", | ||
} | ||
|
||
passed_check_resources = set([c.resource for c in report.passed_checks]) | ||
failed_check_resources = set([c.resource for c in report.failed_checks]) | ||
|
||
self.assertEqual(summary["passed"], 2) | ||
self.assertEqual(summary["failed"], 1) | ||
self.assertEqual(summary["skipped"], 0) | ||
self.assertEqual(summary["parsing_errors"], 0) | ||
|
||
self.assertEqual(passing_resources, passed_check_resources) | ||
self.assertEqual(failing_resources, failed_check_resources) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |