Skip to content

Commit

Permalink
Use transfer pool to make code thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
giacomo-alzetta-aiven committed Mar 18, 2024
1 parent 39dc2fe commit 0ed5c6a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 93 deletions.
180 changes: 88 additions & 92 deletions myhoard/restore_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
)
from contextlib import suppress
from pymysql import OperationalError
from rohmu import errors as rohmu_errors, get_transfer
from rohmu.object_storage.base import BaseTransfer
from rohmu import errors as rohmu_errors
from rohmu.transfer_pool import TransferPool
from typing import Any, Dict, Iterable, List, Optional, Tuple, TypedDict

import contextlib
Expand Down Expand Up @@ -185,7 +185,7 @@ def __init__(
# can be successfully restored.
self.binlog_streams = binlog_streams
self.current_file = None
self.file_storage: Optional[BaseTransfer] = None
self.file_storage_pool = TransferPool()
self.file_storage_config = file_storage_config
self.free_memory_percentage = free_memory_percentage
self.is_running = True
Expand Down Expand Up @@ -325,10 +325,6 @@ def run(self) -> None:

while self.is_running:
try:
if not self.file_storage:
self.log.info("Creating file storage accessor")
self.file_storage = get_transfer(self.file_storage_config)

if self.phase == self.Phase.getting_backup_info:
self.get_backup_info()
if self.phase == self.Phase.initiating_binlog_downloads:
Expand Down Expand Up @@ -839,9 +835,9 @@ def _build_full_name(self, name: str) -> str:
return f"{self.site}/{self.stream_id}/{name}"

def _load_file_data(self, name, missing_ok=False):
assert self.file_storage
try:
info_str, _ = self.file_storage.get_contents_to_string(self._build_full_name(name))
with self.file_storage_pool.with_transfer(self.file_storage_config) as file_storage:
info_str, _ = file_storage.get_contents_to_string(self._build_full_name(name))
return json.loads(info_str)
except rohmu_errors.FileNotFoundFromStorageError as ex:
if not missing_ok:
Expand All @@ -859,29 +855,28 @@ def _load_file_data(self, name, missing_ok=False):
def _basebackup_data_provider(self, target_stream) -> None:
name = self._build_full_name("basebackup.xbstream")
compressed_size = self.state["basebackup_info"].get("compressed_size")
file_storage = get_transfer(self.file_storage_config)

last_time = [time.monotonic()]
last_value = [0]
self.basebackup_bytes_downloaded = 0

def download_progress(progress, max_progress):
if progress and max_progress and compressed_size:
# progress may be the actual number of bytes or it may be percentages
self.basebackup_bytes_downloaded = int(compressed_size * progress / max_progress)
# Track both absolute number and explicitly calculated rate. The rate can be useful as
# a separate measurement because downloads are not ongoing all the time and calculating
# rate based on raw byte counter requires knowing when the operation started and ended
self.stats.gauge_int("myhoard.restore.basebackup_bytes_downloaded", self.basebackup_bytes_downloaded)
last_value[0], last_time[0] = track_rate(
current=self.basebackup_bytes_downloaded,
last_recorded=last_value[0],
last_recorded_time=last_time[0],
metric_name="myhoard.restore.basebackup_download_rate",
stats=self.stats,
)
with self.file_storage_pool.with_transfer(self.file_storage_config) as file_storage:
last_time = [time.monotonic()]
last_value = [0]
self.basebackup_bytes_downloaded = 0

def download_progress(progress, max_progress):
if progress and max_progress and compressed_size:
# progress may be the actual number of bytes or it may be percentages
self.basebackup_bytes_downloaded = int(compressed_size * progress / max_progress)
# Track both absolute number and explicitly calculated rate. The rate can be useful as
# a separate measurement because downloads are not ongoing all the time and calculating
# rate based on raw byte counter requires knowing when the operation started and ended
self.stats.gauge_int("myhoard.restore.basebackup_bytes_downloaded", self.basebackup_bytes_downloaded)
last_value[0], last_time[0] = track_rate(
current=self.basebackup_bytes_downloaded,
last_recorded=last_value[0],
last_recorded_time=last_time[0],
metric_name="myhoard.restore.basebackup_download_rate",
stats=self.stats,
)

file_storage.get_contents_to_fileobj(name, target_stream, progress_callback=download_progress)
file_storage.get_contents_to_fileobj(name, target_stream, progress_callback=download_progress)

def _get_iteration_sleep(self) -> float:
if self.phase in self.POLL_PHASES:
Expand Down Expand Up @@ -909,53 +904,54 @@ def _list_binlogs_in_bucket(
start_time = time.monotonic()
target_time_reached_by_server = set()

assert self.file_storage
self.log.debug("Listing binlogs in bucket %s", bucket)
try:
list_iter = self.file_storage.list_iter(self._build_binlog_full_name(f"binlogs/{bucket}"))
for info in self._get_sorted_file_infos(list_iter):
binlog = parse_fs_metadata(info["metadata"])
# We may be handling binlogs from multiple streams. To make the other logic work, calculate
# monotonically increasing index across all streams. (Individual streams have their indexes
# always start from 1.)
binlog["adjusted_remote_index"] = self.state["binlog_stream_offset"] + binlog["remote_index"]
binlog["remote_key"] = info["name"]
binlog["remote_size"] = info["size"]
highest_index = max(highest_index, binlog["remote_index"])
if last_processed_index is not None and binlog["adjusted_remote_index"] <= last_processed_index:
continue
# We're handing binlogs in order. If we've reached target time for any earlier binlog then this
# binlog must be out of range as well. This check is needed because we might have binlogs without
# GTIDs that cannot be excluded based on start/end checks
if binlog["server_id"] in target_time_reached_by_server:
continue
if self.target_time and binlog["gtid_ranges"]:
if binlog["gtid_ranges"][0]["start_ts"] >= self.target_time:
# We exclude entries whose time matches recovery target time so any file whose start_ts
# is equal or higher than target time is certain not to contain data we're going to apply
self.log.info(
"Start time %s of binlog %s from server %s is after our target time %s, skipping",
binlog["gtid_ranges"][0]["start_ts"],
binlog["remote_index"],
binlog["server_id"],
self.target_time,
)
target_time_reached_by_server.add(binlog["server_id"])
with self.file_storage_pool.with_transfer(self.file_storage_config) as file_storage:
list_iter = file_storage.list_iter(self._build_binlog_full_name(f"binlogs/{bucket}"))
for info in self._get_sorted_file_infos(list_iter):
binlog = parse_fs_metadata(info["metadata"])
# We may be handling binlogs from multiple streams. To make the other logic work, calculate
# monotonically increasing index across all streams. (Individual streams have their indexes
# always start from 1.)
binlog["adjusted_remote_index"] = self.state["binlog_stream_offset"] + binlog["remote_index"]
binlog["remote_key"] = info["name"]
binlog["remote_size"] = info["size"]
highest_index = max(highest_index, binlog["remote_index"])
if last_processed_index is not None and binlog["adjusted_remote_index"] <= last_processed_index:
continue
if binlog["gtid_ranges"][0]["end_ts"] >= self.target_time:
# Log and mark target time reached but include binlog and continue processing results. We may
# get binlogs from multiple servers in some race conditions and we don't yet know if this binlog
# was from a server that was actually valid at that point in time and some other server may have
# binlogs that are still relevant.
self.log.info(
"End time %s of binlog %s from server %s is at or after our target time %s, target time reached",
binlog["gtid_ranges"][0]["end_ts"],
binlog["remote_index"],
binlog["server_id"],
self.target_time,
)
target_time_reached_by_server.add(binlog["server_id"])
new_binlogs.append(binlog)
# We're handing binlogs in order. If we've reached target time for any earlier binlog then this
# binlog must be out of range as well. This check is needed because we might have binlogs without
# GTIDs that cannot be excluded based on start/end checks
if binlog["server_id"] in target_time_reached_by_server:
continue
if self.target_time and binlog["gtid_ranges"]:
if binlog["gtid_ranges"][0]["start_ts"] >= self.target_time:
# We exclude entries whose time matches recovery target time so any file whose start_ts
# is equal or higher than target time is certain not to contain data we're going to apply
self.log.info(
"Start time %s of binlog %s from server %s is after our target time %s, skipping",
binlog["gtid_ranges"][0]["start_ts"],
binlog["remote_index"],
binlog["server_id"],
self.target_time,
)
target_time_reached_by_server.add(binlog["server_id"])
continue
if binlog["gtid_ranges"][0]["end_ts"] >= self.target_time:
# Log and mark target time reached but include binlog and continue processing results. We may
# get binlogs from multiple servers in some race conditions and we don't yet know if this binlog
# was from a server that was actually valid at that point in time and some other server may have
# binlogs that are still relevant.
self.log.info(
"End time %s of binlog %s from server %s is at or after our target time %s,"
" target time reached",
binlog["gtid_ranges"][0]["end_ts"],
binlog["remote_index"],
binlog["server_id"],
self.target_time,
)
target_time_reached_by_server.add(binlog["server_id"])
new_binlogs.append(binlog)
except rohmu_errors.FileNotFoundFromStorageError:
pass
except Exception as ex: # pylint: disable=broad-except
Expand Down Expand Up @@ -1012,25 +1008,25 @@ def _fetch_more_binlogs_infos_for_current_stream(self) -> None:

# Also refresh promotions list so that we know which of the remote
# binlogs are actually valid
assert self.file_storage
promotions: Dict[int, Any] = {}
try:
for info in self.file_storage.list_iter(self._build_binlog_full_name("promotions")):
# There could theoretically be multiple promotions with the same
# index value if new master got promoted but then failed before
# managing to upload any binlogs. To cope with that only keep one
# promotion info per server id (the one with most recent timestamp)
info = parse_fs_metadata(info["metadata"])
existing = promotions.get(info["start_index"])
if existing and info["promoted_at"] < existing["promoted_at"]:
continue
promotions[info["start_index"]] = info
self.log.info(
"server_id %s valid starting from %s (at %s)",
info["server_id"],
info["start_index"],
info["promoted_at"],
)
with self.file_storage_pool.with_transfer(self.file_storage_config) as file_storage:
for info in file_storage.list_iter(self._build_binlog_full_name("promotions")):
# There could theoretically be multiple promotions with the same
# index value if new master got promoted but then failed before
# managing to upload any binlogs. To cope with that only keep one
# promotion info per server id (the one with most recent timestamp)
info = parse_fs_metadata(info["metadata"])
existing = promotions.get(info["start_index"])
if existing and info["promoted_at"] < existing["promoted_at"]:
continue
promotions[info["start_index"]] = info
self.log.info(
"server_id %s valid starting from %s (at %s)",
info["server_id"],
info["start_index"],
info["promoted_at"],
)
except Exception as ex: # pylint: disable=broad-except
# There should always be one promotion file so file not found is real error too
self.log.error("Failed to list promotions: %r", ex)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies = [
"PySocks",
# rohmu is incompatible with latest version snappy 0.7.1
"python-snappy == 0.6.1",
"rohmu == 1.0.7",
"rohmu >= 1.1.2",
"sentry-sdk==1.14.0",
]

Expand Down

0 comments on commit 0ed5c6a

Please sign in to comment.