Skip to content

Commit

Permalink
Indicator processing updates. Relates to #76. Relates to #62.
Browse files Browse the repository at this point in the history
  • Loading branch information
jshcodes committed Jan 9, 2023
1 parent 0bf2b09 commit e0393b9
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions cs_misp_import/indicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,11 @@ def process_indicators(self, indicators_mins_before):
# MAIN INDICATORS PROCESSING
self.log.info("Starting import of CrowdStrike indicators into MISP.")
indicators_count = 0
for indicators_page in self.intel_api_client.get_indicators(start_get_events, self.delete_outdated):

for indicators_page in self.intel_api_client.get_indicators(start_get_events,
self.delete_outdated,
self.import_settings["type"]
):
self.push_indicators(indicators_page)
indicators_count += len(indicators_page)

Expand Down Expand Up @@ -332,6 +336,7 @@ def process_indicator_batch(self, batch_to_process):

def clean_laundry(self, batch_size, all_tot, f_failure, m_failure):
"""Save each of the events that have been flagged as dirty. Spawns multiple threads."""
saved = []
dirty = self.get_laundry()
self.log.info(
"This batch of %s produced %s indicators for %s events.",
Expand All @@ -342,20 +347,20 @@ def clean_laundry(self, batch_size, all_tot, f_failure, m_failure):
thousands(f_failure),
thousands(m_failure)
)
saved = []
thread_lock = Lock()
# Spawn multiple threads to save any events that are dirty
with concurrent.futures.ThreadPoolExecutor(self.misp.thread_count, thread_name_prefix="thread") as executor:
futures = {
executor.submit(self.event_thread, feed_data["object"], feed_data["count"], thread_lock)
for feed_data in dirty.values()
}
if dirty:
with concurrent.futures.ThreadPoolExecutor(self.misp.thread_count, thread_name_prefix="thread") as executor:
futures = {
executor.submit(self.event_thread, feed_data["object"], feed_data["count"], thread_lock)
for feed_data in dirty.values()
}

if not futures:
return
if not futures:
return

for fut in futures:
saved.append(fut.result())
for fut in futures:
saved.append(fut.result())

return saved

Expand Down Expand Up @@ -390,7 +395,6 @@ def push_indicators(self, indicators):
pushed_so_far += len(batch)
self.log.debug("%s have been pushed so far out of this batch of %s.", thousands(pushed_so_far), thousands(len(indicators)))


batch_duration = datetime.now().timestamp() - total_batch_start # Total time for the entire indicator run
self.log.info("Pushed %s indicators into MISP in %.2f seconds.", thousands(len(indicators)), batch_duration)
# Grab the latest timestamp from our list of processed indicators
Expand Down

0 comments on commit e0393b9

Please sign in to comment.