Skip to content

Commit

Permalink
GCP effective firewall listing (#361)
Browse files Browse the repository at this point in the history
* Add firewall listing

* Code tidy + documentation

* Add unit test

* Whitespace fix

* Mypy fix

* Code review changes

* Code review comments

Co-authored-by: Theo <gtheo@google.com>
  • Loading branch information
Jonathan Greig and giovannt0 authored Aug 5, 2021
1 parent ef026ed commit f10086d
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 0 deletions.
108 changes: 108 additions & 0 deletions libcloudforensics/providers/gcp/internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
# https://cloud.google.com/compute/docs/general-purpose-machines#e2-standard
E2_STANDARD_CPU_CORES = [2, 4, 8, 16, 32]

# Numerical policy_level value for non-hierarchical FW rules
NON_HIERARCHICAL_FW_POLICY_LEVEL = 999

class GoogleCloudCompute(common.GoogleCloudComputeClient):
"""Class representing all Google Cloud Compute objects in a project.
Expand Down Expand Up @@ -1224,6 +1226,112 @@ def DetachServiceAccount(self) -> None:
raise errors.ServiceAccountRemovalError('Service account detatchment '
'failure: {0:s}'.format(str(exception)), __name__)

def _NormaliseFirewallL4Config(self, l4config: List[Any]) -> List[Any]:
"""Normalise l4config dict key names that differ between policies and
firewalls.
Args:
l4config List[Any]: the l4config to be normalised
Returns:
List[Any]: the normalised l4config"""
normalised_l4config = []
for config in l4config:
normalised = {}
if 'ipProtocol' in config:
normalised['ip_protocol'] = config['ipProtocol']
elif 'IPProtocol' in config:
normalised['ip_protocol'] = config['IPProtocol']
if 'ports' in config:
normalised['ports'] = config['ports']
normalised_l4config.append(normalised)

return normalised_l4config

def _NormaliseFirewallRules(self, nic_rules: Dict[str, Any]) -> List[Any]:
"""Normalise firewall policies and firewall rules into a common format.
Args:
nic_rules: the effective firewall rules for an individual NIC.
Returns:
List[Dict[str, Any]]: The normalised firewall rules for a NIC with
individual rules in the following format:
{
'type': 'policy' or 'firewall',
'policy_level': int,
'priority': int,
'direction': 'INGRESS' or 'EGRESS',
'l4config': [
{
'ip_protocol': str,
'ports': List[str]
}]
'ips': List[str],
'action': 'allow' or 'deny' or 'goto_next'
}
"""

normalised_rules = []
firewall_policies = nic_rules['firewallPolicys']
firewalls = nic_rules['firewalls']

for policy_level, policy in enumerate(firewall_policies):
for rule in policy['rules']:
is_ingress = rule['direction'] == 'INGRESS'
normalised_rule = {
'type': 'policy',
'policy_level': policy_level,
'priority': rule['priority'],
'direction': rule['direction'],
'l4config': self._NormaliseFirewallL4Config(
rule['match']['layer4Configs']),
'ips': (rule['match']['srcIpRanges'] if is_ingress else
rule['match']['destIpRanges']),
'action': rule['action']}
normalised_rules.append(normalised_rule)

for rule in firewalls:
is_ingress = rule['direction'] == 'INGRESS'
is_allow = 'allowed' in rule
normalised_rule = {
'type': 'firewall',
'policy_level': NON_HIERARCHICAL_FW_POLICY_LEVEL,
'priority': rule['priority'],
'direction': rule['direction'],
'l4config': self._NormaliseFirewallL4Config(
rule['allowed'] if is_allow else rule['denied']),
'ips': (rule['sourceRanges'] if is_ingress else
rule['destinationRanges']),
'action': 'allow' if is_allow else 'deny'}
normalised_rules.append(normalised_rule)

return normalised_rules

def GetEffectiveFirewallRules(self) -> Dict[str, List[Any]]:
"""Get the effective firewall rules for an instance.
Returns:
Dict[str, List[Any]]: The effective firewall rules per NIC.
"""
gce_instance_client = self.GceApi().instances()
instance_info = self.GetOperation()
fw_rules = {}

for nic in instance_info.get('networkInterfaces', []):
nic_name = nic['name']
nic_fw_rules = []
request = {'project': self.project_id, 'instance': self.name,
'zone': self.zone, 'networkInterface': nic_name}
responses = common.ExecuteRequest(
gce_instance_client, 'getEffectiveFirewalls', request)
for response in responses:
nic_fw_rules.extend(self._NormaliseFirewallRules(response))
fw_rules[nic_name] = nic_fw_rules

return fw_rules


class GoogleComputeDisk(compute_base_resource.GoogleComputeBaseResource):
"""Class representing a Compute Engine disk."""

Expand Down
95 changes: 95 additions & 0 deletions tests/providers/gcp/gcp_mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,3 +556,98 @@
# See: https://cloud.google.com/compute/docs/reference/rest/v1/disks
REGEX_DISK_NAME = re.compile('^(?=.{1,63}$)[a-z]([-a-z0-9]*[a-z0-9])?$')
STARTUP_SCRIPT = 'scripts/startup.sh'

# pylint: disable=line-too-long
MOCK_NETWORK_INTERFACES = [
{
'network': 'https://www.googleapis.com/compute/v1/projects/fake-project/global/networks/default',
'subnetwork': 'https://www.googleapis.com/compute/v1/projects/fake-project/regions/fake-region/subnetworks/default',
'networkIP': '10.1.1.1',
'name': 'nic0',
'accessConfigs': [
{
'type': 'ONE_TO_ONE_NAT',
'name': 'External NAT',
'natIP': '0.0.0.0',
'networkTier': 'PREMIUM',
'kind': 'compute#accessConfig'
}
],
'fingerprint': 'bm9mcGZwZnA=',
'kind': 'compute#networkInterface'
}
]
MOCK_EFFECTIVE_FIREWALLS = {
"firewallPolicys": [
{
"name": "111111111111",
"rules": [
{
"action": "allow",
"description": "",
"direction": "INGRESS",
"kind": "compute#firewallPolicyRule",
"match": {
"layer4Configs": [
{
"ipProtocol": "tcp"
}
],
"srcIpRanges": [
"8.8.8.8/24"
]
},
"priority": 1
}
]
},
{
"name": "222222222222",
"rules": [
{
"action": "goto_next",
"description": "",
"direction": "INGRESS",
"kind": "compute#firewallPolicyRule",
"match": {
"layer4Configs": [
{
"ipProtocol": "tcp"
}
],
"srcIpRanges": [
"8.8.4.4/24"
]
},
"priority": 1
}
]
}
],
"firewalls": [
{
"allowed": [
{
"IPProtocol": "tcp"
}
],
"creationTimestamp": "2021-01-01T00:00:00.000+00:00",
"description": "allow all",
"direction": "INGRESS",
"disabled": False,
"id": "1111111111111111111",
"kind": "compute#firewall",
"logConfig": {
"enable": False
},
"name": "default-111111111111111111111111",
"network": "https://www.googleapis.com/compute/v1/projects/fake-project/global/networks/default",
"priority": 1000,
"selfLink": "https://www.googleapis.com/compute/v1/projects/fake-project/global/firewalls/default-111111111111111111111111",
"sourceRanges": [
"0.0.0.0/0"
]
}
]
}
# pylint: enable=line-too-long
40 changes: 40 additions & 0 deletions tests/providers/gcp/internal/test_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,46 @@ def testDelete(
]
mock_disk_delete.assert_has_calls(calls)

@typing.no_type_check
@mock.patch('libcloudforensics.providers.gcp.internal.compute.GoogleComputeInstance.GetOperation')
@mock.patch('libcloudforensics.providers.gcp.internal.common.GoogleCloudComputeClient.GceApi')
def testGetEffectiveFirewallRules(self, mock_gce_api, mock_get_operation):
"""Tests that firewall rules are properly formatted"""
mock_get_operation.return_value = {'networkInterfaces': gcp_mocks.MOCK_NETWORK_INTERFACES}
mock_gce_api.return_value.instances.return_value.getEffectiveFirewalls.return_value.execute.return_value = gcp_mocks.MOCK_EFFECTIVE_FIREWALLS
fw_rules = gcp_mocks.FAKE_INSTANCE.GetEffectiveFirewallRules()
self.assertDictEqual(
fw_rules,
{'nic0': [
{
'type': 'policy',
'policy_level': 0,
'priority': 1,
'direction': 'INGRESS',
'l4config': [{'ip_protocol': 'tcp'}],
'ips': ['8.8.8.8/24'],
'action': 'allow'
},
{
'type': 'policy',
'policy_level': 1,
'priority': 1,
'direction': 'INGRESS',
'l4config': [{'ip_protocol': 'tcp'}],
'ips': ['8.8.4.4/24'],
'action': 'goto_next'
},
{
'type': 'firewall',
'policy_level': 999,
'priority': 1000,
'direction': 'INGRESS',
'l4config': [{'ip_protocol': 'tcp'}],
'ips': ['0.0.0.0/0'],
'action':
'allow'
}]})


class GoogleComputeDiskTest(unittest.TestCase):
"""Test Google Cloud Compute Disk class."""
Expand Down

0 comments on commit f10086d

Please sign in to comment.