Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure we clear stashes when uploading new blocklist filters #23039

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 34 additions & 28 deletions src/olympia/blocklist/cron.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from datetime import datetime
from typing import List

import waffle
from django_statsd.clients import statsd
Expand All @@ -8,6 +7,7 @@
from olympia.constants.blocklist import (
MLBF_BASE_ID_CONFIG_KEY,
MLBF_TIME_CONFIG_KEY,
BlockListAction,
)
from olympia.zadmin.models import get_config

Expand Down Expand Up @@ -74,39 +74,36 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
)

base_filters: dict[BlockType, MLBF | None] = {key: None for key in BlockType}
base_filters_to_update: List[BlockType] = []
create_stash = False

upload_filters = False
upload_stash = False

# Determine which base filters need to be re uploaded
# and whether a new stash needs to be created.
for block_type in BlockType:
# This prevents us from updating a stash or filter based on new soft blocks
# until we are ready to enable soft blocking.
if block_type == BlockType.SOFT_BLOCKED and not waffle.switch_is_active(
'enable-soft-blocking'
):
log.info(
'Skipping soft-blocks because enable-soft-blocking switch is inactive'
)
continue

base_filter = MLBF.load_from_storage(get_base_generation_time(block_type))
base_filters[block_type] = base_filter

# Add this block type to the list of filters to be re-uploaded.
# For now we upload both filters together when either exceeds
# the change threshold. Additionally we brute force clear all stashes
# when uploading filters. This is the easiest way to ensure that stashes
# are always newer than any existing filters, a requirement of the way
# FX is reading the blocklist stash and filter sets.
# We may attempt handling block types separately in the future as a
# performance optimization https://github.com/mozilla/addons/issues/15217.
if (
force_base
or base_filter is None
or mlbf.should_upload_filter(block_type, base_filter)
):
base_filters_to_update.append(block_type)
upload_filters = True
upload_stash = False
# Only update the stash if we should AND if we aren't already
# re-uploading the filter for this block type.
# re-uploading the filters.
elif mlbf.should_upload_stash(block_type, previous_filter or base_filter):
create_stash = True
upload_stash = True

skip_update = len(base_filters_to_update) == 0 and not create_stash
if skip_update:
if not upload_filters and not upload_stash:
log.info('No new/modified/deleted Blocks in database; skipping MLBF generation')
# Delete the locally generated MLBF directory and files as they are not needed.
mlbf.delete()
Expand All @@ -125,7 +122,19 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
len(mlbf.data.not_blocked_items),
)

if create_stash:
if upload_filters:
for block_type in BlockType:
mlbf.generate_and_write_filter(block_type)

# Upload both filters and clear the stash to keep
# all of the records in sync with the expectations of FX.
actions = [
BlockListAction.UPLOAD_BLOCKED_FILTER,
BlockListAction.UPLOAD_SOFT_BLOCKED_FILTER,
BlockListAction.CLEAR_STASH,
]

elif upload_stash:
# We generate unified stashes, which means they can contain data
# for both soft and hard blocks. We need the base filters of each
# block type to determine what goes in a stash.
Expand All @@ -134,15 +143,12 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
blocked_base_filter=base_filters[BlockType.BLOCKED],
soft_blocked_base_filter=base_filters[BlockType.SOFT_BLOCKED],
)
actions = [
BlockListAction.UPLOAD_STASH,
]

for block_type in base_filters_to_update:
mlbf.generate_and_write_filter(block_type)

upload_filter.delay(
mlbf.created_at,
filter_list=[key.name for key in base_filters_to_update],
create_stash=create_stash,
)
# Serialize the actions to strings because celery doesn't support enums.
upload_filter.delay(mlbf.created_at, actions=[action.name for action in actions])


def process_blocklistsubmissions():
Expand Down
16 changes: 7 additions & 9 deletions src/olympia/blocklist/mlbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from django.utils.functional import cached_property

import waffle
from filtercascade import FilterCascade
from filtercascade.fileformats import HashAlgorithm

Expand Down Expand Up @@ -352,14 +351,13 @@ def generate_and_write_stash(
stash_json[STASH_KEYS[BlockType.BLOCKED]] = blocked_added
stash_json[UNBLOCKED_STASH_KEY] = blocked_removed

if waffle.switch_is_active('enable-soft-blocking'):
soft_blocked_added, soft_blocked_removed, _ = diffs[BlockType.SOFT_BLOCKED]
added_items.update(soft_blocked_added)
if not self.should_upload_filter(
BlockType.SOFT_BLOCKED, soft_blocked_base_filter
):
stash_json[STASH_KEYS[BlockType.SOFT_BLOCKED]] = soft_blocked_added
stash_json[UNBLOCKED_STASH_KEY].extend(soft_blocked_removed)
soft_blocked_added, soft_blocked_removed, _ = diffs[BlockType.SOFT_BLOCKED]
added_items.update(soft_blocked_added)
if not self.should_upload_filter(
BlockType.SOFT_BLOCKED, soft_blocked_base_filter
):
stash_json[STASH_KEYS[BlockType.SOFT_BLOCKED]] = soft_blocked_added
stash_json[UNBLOCKED_STASH_KEY].extend(soft_blocked_removed)

# Remove any items that were added to a block type.
stash_json[UNBLOCKED_STASH_KEY] = [
Expand Down
65 changes: 33 additions & 32 deletions src/olympia/blocklist/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from django.db import transaction
from django.utils.encoding import force_str

import waffle
from django_statsd.clients import statsd

import olympia.core.logger
Expand All @@ -21,6 +20,7 @@
MLBF_BASE_ID_CONFIG_KEY,
MLBF_TIME_CONFIG_KEY,
REMOTE_SETTINGS_COLLECTION_MLBF,
BlockListAction,
)
from olympia.lib.remote_settings import RemoteSettings
from olympia.zadmin.models import get_config, set_config
Expand Down Expand Up @@ -105,8 +105,12 @@ def monitor_remote_settings():


@task
def upload_filter(generation_time, filter_list=None, create_stash=False):
filters_to_upload: List[BlockType] = []
def upload_filter(generation_time: str, actions: List[str] = None):
# Deserialize the actions from the string list
# We have to do this because celery does not support enum arguments
actions = [BlockListAction[action] for action in actions]

filters_to_upload = []
base_filter_ids = dict()
bucket = settings.REMOTE_SETTINGS_WRITER_BUCKET
server = RemoteSettings(
Expand All @@ -118,26 +122,21 @@ def upload_filter(generation_time, filter_list=None, create_stash=False):
old_records = server.records()
attachment_types_to_delete = []

for block_type in BlockType:
# Skip soft blocked filters if the switch is not active.
if block_type == BlockType.SOFT_BLOCKED and not waffle.switch_is_active(
'enable-soft-blocking'
):
continue
if BlockListAction.UPLOAD_BLOCKED_FILTER in actions:
filters_to_upload.append(BlockType.BLOCKED)

# Only upload filters that are in the filter_list arg.
# We cannot send enum values to tasks so we serialize
# them in the filter_list arg as the name of the enum.
if filter_list and block_type.name in filter_list:
filters_to_upload.append(block_type)
if BlockListAction.UPLOAD_SOFT_BLOCKED_FILTER in actions:
filters_to_upload.append(BlockType.SOFT_BLOCKED)

# Get the last updated timestamp for each filter type
# regardless of whether we are uploading it or not.
# This will help us identify stale records that should be cleaned up.
for block_type in BlockType:
base_filter_id = get_config(
MLBF_BASE_ID_CONFIG_KEY(block_type, compat=True),
json_value=True,
)

# If there is an existing base filter id, we need to keep track of it
# so we can potentially delete stashes older than this timestamp.
if base_filter_id is not None:
base_filter_ids[block_type] = base_filter_id

Expand All @@ -157,15 +156,14 @@ def upload_filter(generation_time, filter_list=None, create_stash=False):
attachment_types_to_delete.append(attachment_type)

statsd.incr('blocklist.tasks.upload_filter.upload_mlbf.base')
# Update the base filter id for this block type to the generation time
# so we can delete stashes older than this new filter.
# If we are re-uploading a filter, we should overwrite the timestamp
# to ensure we delete records older than the new filter and not the old one.
base_filter_ids[block_type] = generation_time

# It is possible to upload a stash and a filter in the same task.
if create_stash:
if BlockListAction.UPLOAD_STASH in actions:
with mlbf.storage.open(mlbf.stash_path, 'r') as stash_file:
stash_data = json.load(stash_file)
# If we have a stash, write that
stash_upload_data = {
'key_format': MLBF.KEY_FORMAT,
'stash_time': generation_time,
Expand All @@ -174,26 +172,29 @@ def upload_filter(generation_time, filter_list=None, create_stash=False):
server.publish_record(stash_upload_data)
statsd.incr('blocklist.tasks.upload_filter.upload_stash')

# Get the oldest base filter id so we can delete only stashes
# that are definitely not needed anymore.
# Get the oldest base filter id so we can identify stale records
# that should be removed from remote settings or file storage.
oldest_base_filter_id = min(base_filter_ids.values()) if base_filter_ids else None

for record in old_records:
# Delete attachment records that match the
# attachment types of filters we just uploaded.
# This ensures we only have one filter attachment
# per block_type.
if 'attachment' in record:
# Delete attachment records that match the
# attachment types of filters we just uploaded.
# This ensures we only have one filter attachment
# per block_type.
attachment_type = record['attachment_type']
if attachment_type in attachment_types_to_delete:
server.delete_record(record['id'])

# Delete stash records that are older than the oldest
# pre-existing filter attachment records. These records
# cannot apply to any existing filter since we uploaded.
elif 'stash' in record and oldest_base_filter_id is not None:
record_time = record['stash_time']
if record_time < oldest_base_filter_id:
elif 'stash' in record:
# Delete stash records if that is one of the actions to perform.
# Currently we have a brute force approach to clearing stashes
# because we always upload both filters together in order to prevent
# stale stashes from being used by FX instances. Eventually, we may
# want support independently uploading filters and stashes which would
# require a more fine grained approach to clearing or even re-writing
# stashes based on which filters are being re-uploaded.
if BlockListAction.CLEAR_STASH in actions:
server.delete_record(record['id'])

# Commit the changes to remote settings for review + signing.
Expand Down
Loading
Loading