Skip to content

Commit

Permalink
refactor(importer): revert google#2644
Browse files Browse the repository at this point in the history
This reverts commit 447a30c.

There is a suspected unknown performance regression.
  • Loading branch information
andrewpollock committed Oct 1, 2024
1 parent f839558 commit b678f06
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 151 deletions.
192 changes: 71 additions & 121 deletions docker/importer/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,95 +315,11 @@ def _vuln_ids_from_gcs_blob(self, client: storage.Client,
strict=self._strict_validation)
except Exception as e:
logging.error('Failed to parse vulnerability %s: %s', blob.name, e)
# TODO(andrewpollock): I think this needs to be reraised here...
# a jsonschema.exceptions.ValidationError only gets raised in strict
# validation mode.
return None
for vuln in vulns:
vuln_ids.append(vuln.id)
return vuln_ids

def _convert_blob_to_vuln(
self, storage_client: storage.Client, ndb_client: ndb.Client,
source_repo: osv.SourceRepository, blob: storage.Blob,
ignore_last_import_time: bool) -> Optional[Tuple[str]]:
"""Parse a GCS blob into a tuple of hash and Vulnerability
Criteria for returning a tuple:
- any record in the blob is new (i.e. a new ID) or modified since last run,
and the hash for the blob has changed
- the importer is reimporting the entire source
- ignore_last_import_time is True
- the record passes OSV JSON Schema validation
Usually an OSV file has a single vulnerability in it, but it is permissible
to have more than one, hence it returns a list of tuples.
This is runnable in parallel using concurrent.futures.ThreadPoolExecutor
Args:
storage_client: a storage.Client() to use for retrieval of the blob
ndb_client: an ndb.Client() to use for Data Store access
source_repo: the osv.SourceRepository the blob relates to
blob: the storage.Blob object to operate on
Raises:
jsonschema.exceptions.ValidationError when self._strict_validation is True
input fails OSV JSON Schema validation
Returns:
a list of one or more tuples of (hash, vulnerability) (from the
Vulnerability proto) or None when the blob has an unexpected name
"""
if not _is_vulnerability_file(source_repo, blob.name):
return None

utc_last_update_date = source_repo.last_update_date.replace(
tzinfo=datetime.timezone.utc)

if (not ignore_last_import_time and blob.updated and
blob.updated <= utc_last_update_date):
return None

# The record in GCS appears to be new/changed, examine further.
logging.info('Bucket entry triggered for %s/%s', source_repo.bucket,
blob.name)

# Download in a blob generation agnostic way to cope with the blob
# changing between when it was listed and now (if the generation doesn't
# match, retrieval fails otherwise).
blob_bytes = storage.Blob(
blob.name, blob.bucket,
generation=None).download_as_bytes(storage_client)

blob_hash = osv.sha256_bytes(blob_bytes)

# When self._strict_validation is True,
# this *may* raise a jsonschema.exceptions.ValidationError
vulns = osv.parse_vulnerabilities_from_data(
blob_bytes,
os.path.splitext(blob.name)[1],
strict=self._strict_validation)

# TODO(andrewpollock): integrate with linter here.

# This is the atypical execution path (when reimporting is triggered)
if ignore_last_import_time:
return blob_hash, blob.name

# This is the typical execution path (when reimporting not triggered)
with ndb_client.context():
for vuln in vulns:
bug = osv.Bug.get_by_id(vuln.id)
# The bug already exists and has been modified since last import
if bug is None or \
bug.import_last_modified != vuln.modified.ToDatetime():
return blob_hash, blob.name

return None

return None

def _sync_from_previous_commit(self, source_repo, repo):
"""Sync the repository from the previous commit.
Expand Down Expand Up @@ -528,16 +444,13 @@ def _process_updates_bucket(self, source_repo: osv.SourceRepository):
source_repo.ignore_last_import_time = False
source_repo.put()

# First retrieve a list of files to parallel download
storage_client = storage.Client()
utc_last_update_date = source_repo.last_update_date.replace(
tzinfo=datetime.timezone.utc)

# Get all of the existing records in the GCS bucket
logging.info(
'Listing blobs in gs://%s',
os.path.join(source_repo.bucket,
('' if source_repo.directory_path is None else
source_repo.directory_path)))
# Convert to list to retrieve all information into memory
# This makes its concurrent use later faster
# This makes its use in the concurrent map later faster
listed_blobs = list(
storage_client.list_blobs(
source_repo.bucket,
Expand All @@ -546,49 +459,86 @@ def _process_updates_bucket(self, source_repo: osv.SourceRepository):

import_failure_logs = []

# Get the hash and the parsed vulnerability from every GCS object that
# parses as an OSV record. Do this in parallel for a degree of expedience.
with concurrent.futures.ThreadPoolExecutor(
max_workers=_BUCKET_THREAD_COUNT) as executor:

logging.info('Parallel-parsing %d blobs in %s', len(listed_blobs),
source_repo.name)
future_to_blob = {
executor.submit(self._convert_blob_to_vuln, storage.Client(),
ndb.Client(), source_repo, blob,
ignore_last_import_time):
blob for blob in listed_blobs
}

converted_vulns = []
logging.info('Processing %d parallel-parsed blobs in %s',
len(future_to_blob), source_repo.name)
# TODO(andrewpollock): externalise like _vuln_ids_from_gcs_blob()
def convert_blob_to_vuln(blob: storage.Blob) -> Optional[Tuple[str, str]]:
"""Download and parse GCS blob into [blob_hash, blob.name]"""
if not _is_vulnerability_file(source_repo, blob.name):
return None
if not ignore_last_import_time and \
blob.updated is not None and \
not blob.updated > utc_last_update_date:
return None

for future in concurrent.futures.as_completed(future_to_blob):
blob = future_to_blob[future]
logging.info('Bucket entry triggered for %s/%s', source_repo.bucket,
blob.name)
# Use the _client_store thread local variable
# set in the thread pool initializer
# Download in a blob generation agnostic way to cope with the file
# changing between when it was listed and now.
blob_bytes = storage.Blob(
blob.name, blob.bucket,
generation=None).download_as_bytes(_client_store.storage_client)
if ignore_last_import_time:
blob_hash = osv.sha256_bytes(blob_bytes)
if self._strict_validation:
try:
_ = osv.parse_vulnerabilities_from_data(
blob_bytes,
os.path.splitext(blob.name)[1],
strict=self._strict_validation)
except Exception as e:
logging.error('Failed to parse vulnerability %s: %s', blob.name, e)
import_failure_logs.append('Failed to parse vulnerability "' +
blob.name + '"')
return None
return blob_hash, blob.name

with _client_store.ndb_client.context():
try:
if future.result():
converted_vulns.append(([vuln for vuln in future.result() if vuln]))
vulns = osv.parse_vulnerabilities_from_data(
blob_bytes,
os.path.splitext(blob.name)[1],
strict=self._strict_validation)
for vuln in vulns:
bug = osv.Bug.get_by_id(vuln.id)
# Check if the bug has been modified since last import
if bug is None or \
bug.import_last_modified != vuln.modified.ToDatetime():
blob_hash = osv.sha256_bytes(blob_bytes)
return blob_hash, blob.name

return None
except Exception as e:
# Don't include error stack trace as that might leak sensitive info
logging.error('Failed to parse vulnerability %s: %s', blob.name, e)
import_failure_logs.append(
'Failed to parse vulnerability (when considering for import) "' +
blob.name + '"')
# Don't include error stack trace as that might leak sensitive info
# List.append() is atomic and threadsafe.
import_failure_logs.append('Failed to parse vulnerability "' +
blob.name + '"')
return None

# Setup storage client
def thread_init():
_client_store.storage_client = storage.Client()
_client_store.ndb_client = ndb.Client()

# TODO(andrewpollock): switch to using c.f.submit() like in
# _process_deletions_bucket()
with concurrent.futures.ThreadPoolExecutor(
_BUCKET_THREAD_COUNT, initializer=thread_init) as executor:
converted_vulns = executor.map(convert_blob_to_vuln, listed_blobs)
for cv in converted_vulns:
if cv:
logging.info('Requesting analysis of bucket entry: %s/%s',
source_repo.bucket, cv[1])
self._request_analysis_external(source_repo, cv[0], cv[1])

replace_importer_log(storage_client, source_repo.name,
self._public_log_bucket, import_failure_logs)
replace_importer_log(storage_client, source_repo.name,
self._public_log_bucket, import_failure_logs)

source_repo.last_update_date = import_time_now
source_repo.put()
source_repo.last_update_date = import_time_now
source_repo.put()

logging.info('Finished processing bucket: %s', source_repo.name)
logging.info('Finished processing bucket: %s', source_repo.name)

def _process_deletions_bucket(self,
source_repo: osv.SourceRepository,
Expand Down
40 changes: 10 additions & 30 deletions docker/importer/importer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,31 +518,19 @@ def test_bucket(self, unused_mock_time: mock.MagicMock,

with self.assertLogs(level='WARNING') as logs:
imp.run()

self.assertEqual(
3,
len(logs.output),
msg='Expected number of WARNING level (or higher) logs not found')
self.assertEqual(3, len(logs.output))
self.assertEqual(
"WARNING:root:Failed to validate loaded OSV entry: 'modified' is a required property", # pylint: disable=line-too-long
logs.output[0],
msg='Expected schema validation failure log not found')
self.assertIn(
'WARNING:root:Invalid data:',
logs.output[1],
msg='Expected schema validation failure log not found')
logs.output[0])
self.assertIn('WARNING:root:Invalid data:', logs.output[1])
self.assertIn(
"ERROR:root:Failed to parse vulnerability a/b/test-invalid.json: 'modified' is a required property", # pylint: disable=line-too-long
logs.output[2],
msg='Expected schema validation failure log not found')
logs.output[2])

# Check if vulnerability parse failure was logged correctly.
self.assertTrue(
any(('Failed to parse vulnerability (when considering for import)'
' "a/b/test-invalid.json"') in x[0][0]
for x in upload_from_str.call_args_list),
msg=('Expected schema validation failure not logged in public log '
'bucket'))
any('Failed to parse vulnerability "a/b/test-invalid.json"' in x[0][0]
for x in upload_from_str.call_args_list))

# Expected pubsub calls for validly imported records.
mock_publish.assert_has_calls([
Expand Down Expand Up @@ -577,10 +565,7 @@ def test_bucket(self, unused_mock_time: mock.MagicMock,
path='a/b/DSA-3029-1.json',
original_sha256=mock.ANY,
deleted='false')
self.assertNotIn(
dsa_call,
mock_publish.mock_calls,
msg='Old record was processed unexpectedly')
self.assertNotIn(dsa_call, mock_publish.mock_calls)

# Test invalid entry is not published, as it failed validation.
invalid_call = mock.call(
Expand All @@ -591,10 +576,7 @@ def test_bucket(self, unused_mock_time: mock.MagicMock,
path='a/b/test-invalid.json',
original_sha256=mock.ANY,
deleted=mock.ANY)
self.assertNotIn(
invalid_call,
mock_publish.mock_calls,
msg='Invalid record was processed unexpectedly')
self.assertNotIn(invalid_call, mock_publish.mock_calls)

@mock.patch('google.cloud.pubsub_v1.PublisherClient.publish')
@mock.patch('time.time', return_value=12345.0)
Expand Down Expand Up @@ -717,8 +699,7 @@ def test_import_override(self, unused_mock_time: mock.MagicMock,

# Check if vulnerability parse failure was logged correctly.
self.assertTrue(
any(('Failed to parse vulnerability (when considering for import) '
'"a/b/test-invalid.json"') in x[0][0]
any('Failed to parse vulnerability "a/b/test-invalid.json"' in x[0][0]
for x in upload_from_str.call_args_list))

# Confirm a pubsub message was emitted for record reimported.
Expand All @@ -744,8 +725,7 @@ def test_import_override(self, unused_mock_time: mock.MagicMock,

# Check if vulnerability parse failure was logged correctly.
self.assertTrue(
any(('Failed to parse vulnerability (when considering for import) '
'"a/b/test-invalid.json"') in x[0][0]
any('Failed to parse vulnerability "a/b/test-invalid.json"' in x[0][0]
for x in upload_from_str.call_args_list))

# Confirm second run didn't reprocess any existing records.
Expand Down

0 comments on commit b678f06

Please sign in to comment.