Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(importer): extract inline GCS parsing function #2644

Merged
193 changes: 122 additions & 71 deletions docker/importer/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,96 @@ def _vuln_ids_from_gcs_blob(self, client: storage.Client,
strict=self._strict_validation)
except Exception as e:
logging.error('Failed to parse vulnerability %s: %s', blob.name, e)
# TODO(andrewpollock): I think this needs to be reraised here...
# a jsonschema.exceptions.ValidationError only gets raised in strict
# validation mode.
return None
for vuln in vulns:
vuln_ids.append(vuln.id)
return vuln_ids

def _convert_blob_to_vuln(
self, storage_client: storage.Client, ndb_client: ndb.Client,
source_repo: osv.SourceRepository, blob: storage.Blob,
ignore_last_import_time: bool) -> Optional[Tuple[str]]:
"""Parse a GCS blob into a tuple of hash and Vulnerability

Criteria for returning a tuple:
- any record in the blob is new (i.e. a new ID) or modified since last run,
and the hash for the blob has changed
- the importer is reimporting the entire source
- ignore_last_import_time is True
- the record passes OSV JSON Schema validation

Usually an OSV file has a single vulnerability in it, but it is permissible
to have more than one, hence it returns a list of tuples.

This is runnable in parallel using concurrent.futures.ThreadPoolExecutor

Args:
storage_client: a storage.Client() to use for retrieval of the blob
ndb_client: an ndb.Client() to use for Data Store access
source_repo: the osv.SourceRepository the blob relates to
blob: the storage.Blob object to operate on

Raises:
jsonschema.exceptions.ValidationError when self._strict_validation is True
input fails OSV JSON Schema validation

Returns:
a list of one or more tuples of (hash, vulnerability) (from the
Vulnerability proto) or None when the blob has an unexpected name
"""
if not _is_vulnerability_file(source_repo, blob.name):
return None

utc_last_update_date = source_repo.last_update_date.replace(
tzinfo=datetime.timezone.utc)

if not ignore_last_import_time and \
blob.updated is not None and \
not blob.updated > utc_last_update_date:
return None
andrewpollock marked this conversation as resolved.
Show resolved Hide resolved

# The record in GCS appears to be new/changed, examine further.
logging.info('Bucket entry triggered for %s/%s', source_repo.bucket,
blob.name)

# Download in a blob generation agnostic way to cope with the blob
# changing between when it was listed and now (if the generation doesn't
# match, retrieval fails otherwise).
blob_bytes = storage.Blob(
blob.name, blob.bucket,
generation=None).download_as_bytes(storage_client)

blob_hash = osv.sha256_bytes(blob_bytes)

# When self._strict_validation is True,
# this *may* raise a jsonschema.exceptions.ValidationError
vulns = osv.parse_vulnerabilities_from_data(
blob_bytes,
os.path.splitext(blob.name)[1],
strict=self._strict_validation)

# TODO(andrewpollock): integrate with linter here.

# This is the atypical execution path (when reimporting is triggered)
if ignore_last_import_time:
return blob_hash, blob.name

# This is the typical execution path (when reimporting not triggered)
with ndb_client.context():
for vuln in vulns:
bug = osv.Bug.get_by_id(vuln.id)
# The bug already exists and has been modified since last import
if bug is None or \
bug.import_last_modified != vuln.modified.ToDatetime():
return blob_hash, blob.name

return None

return None

def _sync_from_previous_commit(self, source_repo, repo):
"""Sync the repository from the previous commit.

Expand Down Expand Up @@ -442,13 +527,16 @@ def _process_updates_bucket(self, source_repo: osv.SourceRepository):
source_repo.ignore_last_import_time = False
source_repo.put()

# First retrieve a list of files to parallel download
storage_client = storage.Client()
utc_last_update_date = source_repo.last_update_date.replace(
tzinfo=datetime.timezone.utc)

# Get all of the existing records in the GCS bucket
logging.info(
'Listing blobs in gs://%s',
os.path.join(source_repo.bucket,
('' if source_repo.directory_path is None else
source_repo.directory_path)))
# Convert to list to retrieve all information into memory
# This makes its use in the concurrent map later faster
# This makes its concurrent use later faster
listed_blobs = list(
storage_client.list_blobs(
source_repo.bucket,
Expand All @@ -457,86 +545,49 @@ def _process_updates_bucket(self, source_repo: osv.SourceRepository):

import_failure_logs = []

# TODO(andrewpollock): externalise like _vuln_ids_from_gcs_blob()
def convert_blob_to_vuln(blob: storage.Blob) -> Optional[Tuple[str, str]]:
"""Download and parse GCS blob into [blob_hash, blob.name]"""
if not _is_vulnerability_file(source_repo, blob.name):
return None
if not ignore_last_import_time and \
blob.updated is not None and \
not blob.updated > utc_last_update_date:
return None
# Get the hash and the parsed vulnerability from every GCS object that
# parses as an OSV record. Do this in parallel for a degree of expedience.
with concurrent.futures.ThreadPoolExecutor(
max_workers=_BUCKET_THREAD_COUNT) as executor:

logging.info('Bucket entry triggered for %s/%s', source_repo.bucket,
blob.name)
# Use the _client_store thread local variable
# set in the thread pool initializer
# Download in a blob generation agnostic way to cope with the file
# changing between when it was listed and now.
blob_bytes = storage.Blob(
blob.name, blob.bucket,
generation=None).download_as_bytes(_client_store.storage_client)
if ignore_last_import_time:
blob_hash = osv.sha256_bytes(blob_bytes)
if self._strict_validation:
try:
_ = osv.parse_vulnerabilities_from_data(
blob_bytes,
os.path.splitext(blob.name)[1],
strict=self._strict_validation)
except Exception as e:
logging.error('Failed to parse vulnerability %s: %s', blob.name, e)
import_failure_logs.append('Failed to parse vulnerability "' +
blob.name + '"')
return None
return blob_hash, blob.name

with _client_store.ndb_client.context():
logging.info('Parallel-parsing %d blobs in %s', len(listed_blobs),
source_repo.name)
future_to_blob = {
executor.submit(self._convert_blob_to_vuln, storage.Client(),
ndb.Client(), source_repo, blob,
ignore_last_import_time):
blob for blob in listed_blobs
}

converted_vulns = []
logging.info('Processing %d parallel-parsed blobs in %s',
len(future_to_blob), source_repo.name)

for future in concurrent.futures.as_completed(future_to_blob):
blob = future_to_blob[future]
try:
vulns = osv.parse_vulnerabilities_from_data(
blob_bytes,
os.path.splitext(blob.name)[1],
strict=self._strict_validation)
for vuln in vulns:
bug = osv.Bug.get_by_id(vuln.id)
# Check if the bug has been modified since last import
if bug is None or \
bug.import_last_modified != vuln.modified.ToDatetime():
blob_hash = osv.sha256_bytes(blob_bytes)
return blob_hash, blob.name

return None
if future.result():
converted_vulns.append(([vuln for vuln in future.result() if vuln]))
except Exception as e:
logging.error('Failed to parse vulnerability %s: %s', blob.name, e)
# Don't include error stack trace as that might leak sensitive info
# List.append() is atomic and threadsafe.
import_failure_logs.append('Failed to parse vulnerability "' +
blob.name + '"')
return None

# Setup storage client
def thread_init():
_client_store.storage_client = storage.Client()
_client_store.ndb_client = ndb.Client()
logging.error('Failed to parse vulnerability %s: %s', blob.name, e)
import_failure_logs.append(
'Failed to parse vulnerability (when considering for import) "' +
blob.name + '"')

# TODO(andrewpollock): switch to using c.f.submit() like in
# _process_deletions_bucket()
with concurrent.futures.ThreadPoolExecutor(
_BUCKET_THREAD_COUNT, initializer=thread_init) as executor:
converted_vulns = executor.map(convert_blob_to_vuln, listed_blobs)
for cv in converted_vulns:
if cv:
logging.info('Requesting analysis of bucket entry: %s/%s',
source_repo.bucket, cv[1])
self._request_analysis_external(source_repo, cv[0], cv[1])

replace_importer_log(storage_client, source_repo.name,
self._public_log_bucket, import_failure_logs)
replace_importer_log(storage_client, source_repo.name,
self._public_log_bucket, import_failure_logs)

source_repo.last_update_date = import_time_now
source_repo.put()
source_repo.last_update_date = import_time_now
source_repo.put()

logging.info('Finished processing bucket: %s', source_repo.name)
logging.info('Finished processing bucket: %s', source_repo.name)

def _process_deletions_bucket(self,
source_repo: osv.SourceRepository,
Expand Down
40 changes: 30 additions & 10 deletions docker/importer/importer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,19 +511,31 @@ def test_bucket(self, unused_mock_time: mock.MagicMock,

with self.assertLogs(level='WARNING') as logs:
imp.run()
self.assertEqual(3, len(logs.output))

self.assertEqual(
3,
len(logs.output),
msg='Expected number of WARNING level (or higher) logs not found')
self.assertEqual(
"WARNING:root:Failed to validate loaded OSV entry: 'modified' is a required property", # pylint: disable=line-too-long
logs.output[0])
self.assertIn('WARNING:root:Invalid data:', logs.output[1])
logs.output[0],
msg='Expected schema validation failure log not found')
self.assertIn(
'WARNING:root:Invalid data:',
logs.output[1],
msg='Expected schema validation failure log not found')
self.assertIn(
"ERROR:root:Failed to parse vulnerability a/b/test-invalid.json: 'modified' is a required property", # pylint: disable=line-too-long
logs.output[2])
logs.output[2],
msg='Expected schema validation failure log not found')

# Check if vulnerability parse failure was logged correctly.
self.assertTrue(
any('Failed to parse vulnerability "a/b/test-invalid.json"' in x[0][0]
for x in upload_from_str.call_args_list))
any(('Failed to parse vulnerability (when considering for import)'
' "a/b/test-invalid.json"') in x[0][0]
for x in upload_from_str.call_args_list),
msg=('Expected schema validation failure not logged in public log '
'bucket'))

# Expected pubsub calls for validly imported records.
mock_publish.assert_has_calls([
Expand Down Expand Up @@ -558,7 +570,10 @@ def test_bucket(self, unused_mock_time: mock.MagicMock,
path='a/b/DSA-3029-1.json',
original_sha256=mock.ANY,
deleted='false')
self.assertNotIn(dsa_call, mock_publish.mock_calls)
self.assertNotIn(
dsa_call,
mock_publish.mock_calls,
msg='Old record was processed unexpectedly')

# Test invalid entry is not published, as it failed validation.
invalid_call = mock.call(
Expand All @@ -569,7 +584,10 @@ def test_bucket(self, unused_mock_time: mock.MagicMock,
path='a/b/test-invalid.json',
original_sha256=mock.ANY,
deleted=mock.ANY)
self.assertNotIn(invalid_call, mock_publish.mock_calls)
self.assertNotIn(
invalid_call,
mock_publish.mock_calls,
msg='Invalid record was processed unexpectedly')

@mock.patch('google.cloud.pubsub_v1.PublisherClient.publish')
@mock.patch('time.time', return_value=12345.0)
Expand Down Expand Up @@ -692,7 +710,8 @@ def test_import_override(self, unused_mock_time: mock.MagicMock,

# Check if vulnerability parse failure was logged correctly.
self.assertTrue(
any('Failed to parse vulnerability "a/b/test-invalid.json"' in x[0][0]
any(('Failed to parse vulnerability (when considering for import) '
'"a/b/test-invalid.json"') in x[0][0]
for x in upload_from_str.call_args_list))

# Confirm a pubsub message was emitted for record reimported.
Expand All @@ -718,7 +737,8 @@ def test_import_override(self, unused_mock_time: mock.MagicMock,

# Check if vulnerability parse failure was logged correctly.
self.assertTrue(
any('Failed to parse vulnerability "a/b/test-invalid.json"' in x[0][0]
any(('Failed to parse vulnerability (when considering for import) '
'"a/b/test-invalid.json"') in x[0][0]
for x in upload_from_str.call_args_list))

# Confirm second run didn't reprocess any existing records.
Expand Down
Loading