-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcosca.py
197 lines (185 loc) · 7.39 KB
/
cosca.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import logging
import json
import os
import sys
import tempfile
import argparse
import importlib
import docker
from docker.errors import DockerException
from common.logging_setup import setup_logger
from common.target_type import TargetType
class Cosca:
def __init__(self):
self.args = self.parse_args()
self.log_level = (
logging.WARNING
if self.args.quiet
else (logging.DEBUG if self.args.verbose else logging.INFO)
)
self.logger = setup_logger(__name__, "❇️ ", level=self.log_level)
if not self.is_docker_daemon_running():
self.logger.error(
"Docker daemon is not running. Please start docker daemon and run cosca again."
)
sys.exit(1)
def parse_args(self):
self.parser = argparse.ArgumentParser(
description="Scan targets based on their types."
)
self.parser.add_argument(
"-c",
"--combo",
help="Name of the combo of scanners to execute. Combos are defined in combo.json",
default="default",
)
self.parser.add_argument(
"-t",
"--target",
nargs="+",
help="Space separated targets to scan. Could be directory, docker image, website, openapi, graphql, soap or Github repo"
)
self.parser.add_argument(
"-o",
"--output",
nargs="+",
help=f"Specify outputs. Separate more than one option with spaces. Options: {' '.join(self.get_filenames('output_handlers'))}",
default=["pdf", "zip"],
)
log_group = self.parser.add_mutually_exclusive_group()
log_group.add_argument(
"-q",
"--quiet",
action="store_true",
help="Run in quiet mode (only shows json minimal output)",
default=False,
)
log_group.add_argument(
"-v",
"--verbose",
action="store_true",
help="Run in verbose mode (debugging output)",
default=True,
)
log_group.add_argument(
"-f",
"--force_pull",
action="store_true",
help="Force docker to pull the scanner images from the registry before running the scanners. This ensures that the latest version of the scanners are being are being used. It may also avoid to use tampered images that may reside in the local docker daemon.",
default=True,
)
log_group.add_argument(
"-n",
"--network",
help="Docker network to use with the scanner container. Useful to scan local targets.",
default="",
)
args = self.parser.parse_args()
return args
def get_combo_mappings(self, combo):
file_path = "combos.json"
try:
with open(file_path, "r", encoding="utf-8") as file:
data = json.load(file)
for c in data["combos"]:
if c["name"] == combo:
mappings = {}
for t in c["mappings"]:
mappings[t["type"]] = t["scanners"]
return mappings
except json.JSONDecodeError as e:
self.logger.error("JSONDecodeError: %s", e)
except FileNotFoundError as e:
self.logger.error("FileNotFoundError: %s", e)
self.logger.error(
"Combo %s not found in %s. Please verify combo name.", combo, file_path
)
sys.exit(1)
def trigger_scans(self, target, combo, working_dir, outputs, network):
mappings = self.get_combo_mappings(combo)
reports = []
self.logger.info("Combo: %s", combo)
for i_target, t in enumerate(target):
self.logger.info("Scanning target #%d out of %d", i_target + 1, len(target))
target_type = TargetType.get_target_type(t)
self.logger.info("Target: %s", t)
self.logger.info("Target type: %s", target_type.value)
self.logger.info("Working directory: %s", working_dir)
for index, scanner in enumerate(mappings[target_type.value]):
self.logger.info(
"Running scanner #%d out of %d",
index + 1,
len(mappings[target_type.value]),
)
try:
self.logger.info("Scanner: %s", scanner)
scanner_sub_dir = os.path.join(working_dir, scanner)
os.makedirs(scanner_sub_dir)
try:
module = importlib.import_module(f"scanners.{scanner}")
except ModuleNotFoundError:
self.logger.error(
"Scanner module scanners.%s not found. Please implement a class inherited from Scanner in scanners/%s.py",
scanner,
scanner,
)
sys.exit(1)
cls = getattr(module, "CustomScanner")
instance = cls(log_level=self.log_level, target_type=target_type)
output_details = instance.scan(t, scanner_sub_dir, outputs, network)
aux_args = instance.get_aux_args()
reports.append(
{
"output": output_details,
"target": t,
"target_id": instance.get_target_id(t),
"scanner": scanner,
"aux_args": aux_args,
}
)
except (AttributeError, ModuleNotFoundError) as e:
self.logger.error("Error while invoking scanner: %s", e)
sys.exit(1)
return reports
def get_filenames(self, folder_name):
filenames = []
for filename in os.listdir(folder_name):
if os.path.isfile(os.path.join(folder_name, filename)):
filenames.append(os.path.splitext(filename)[0])
return filenames
def main(self):
outputs = []
for output in self.args.output:
try:
module = importlib.import_module(f"output_handlers.{output}")
except ModuleNotFoundError:
self.logger.error(
"Output module output_handlers.%s not found. Please implement a class inherited from OutputHandler in output_handlers/%s.py",
output,
output,
)
sys.exit(1)
cls = getattr(module, "CustomOutputHandler")
instance = cls(output, self.parser, self.log_level)
outputs.append(instance)
with tempfile.TemporaryDirectory(prefix="cosca_") as tmp_dir:
json_summary = self.trigger_scans(
self.args.target,
self.args.combo,
tmp_dir,
outputs,
network=self.args.network,
)
if self.args.quiet:
print(json.dumps(json_summary))
self.logger.debug(json_summary)
def is_docker_daemon_running(self):
try:
client = docker.from_env()
client.ping()
return True
except DockerException:
return False
if __name__ == "__main__":
app = Cosca()
app.main()