From 8288457ef0cbbec0ff483d972a0dacf453595c30 Mon Sep 17 00:00:00 2001 From: Egor Voynov Date: Tue, 20 Aug 2024 12:03:21 +0200 Subject: [PATCH] Store MySQL server version in basebackup_info (basebackup.json) and check backup version on restore_backup --- myhoard.json | 3 ++- myhoard/backup_stream.py | 4 ++++ myhoard/basebackup_operation.py | 4 ++-- myhoard/controller.py | 11 +++++++++++ myhoard/myhoard.py | 6 ++++++ pyproject.toml | 1 + test/conftest.py | 1 + test/test_backup_stream.py | 1 + test/test_basebackup_operation.py | 4 ++-- test/test_controller.py | 31 +++++++++++++++++++++++++++++-- 10 files changed, 59 insertions(+), 7 deletions(-) diff --git a/myhoard.json b/myhoard.json index 1f3ee2b..bdc72a7 100644 --- a/myhoard.json +++ b/myhoard.json @@ -70,5 +70,6 @@ "copy_threads": 1, "compress_threads": 1, "encrypt_threads": 1 - } + }, + "restrict_backup_version_higher": null } diff --git a/myhoard/backup_stream.py b/myhoard/backup_stream.py index d3c26a8..55796f5 100644 --- a/myhoard/backup_stream.py +++ b/myhoard/backup_stream.py @@ -11,6 +11,7 @@ DEFAULT_XTRABACKUP_SETTINGS, ERR_TIMEOUT, first_contains_gtids_not_in_second, + get_mysql_version, GtidExecuted, make_fs_metadata, mysql_cursor, @@ -978,10 +979,12 @@ def _take_basebackup(self) -> None: # FLUSH BINARY LOGS might take a long time if the server is under heavy load, # use longer than normal timeout here with multiple retries and increasing timeout. connect_params = dict(self.mysql_client_params) + mysql_version = None for retry, multiplier in [(True, 1), (True, 2), (False, 3)]: try: connect_params["timeout"] = DEFAULT_MYSQL_TIMEOUT * 5 * multiplier with mysql_cursor(**connect_params) as cursor: + mysql_version = get_mysql_version(cursor) cursor.execute("FLUSH BINARY LOGS") cursor.execute("SELECT @@GLOBAL.gtid_executed AS gtid_executed") gtid_executed = parse_gtid_range_string(cast(dict, cursor.fetchone())["gtid_executed"]) @@ -1031,6 +1034,7 @@ def _take_basebackup(self) -> None: "start_size": self.basebackup_operation.data_directory_size_start, "start_ts": start_time, "uploaded_from": self.server_id, + "mysql_version": mysql_version, } self.file_storage.store_file_from_memory( self._build_full_name("basebackup.json"), diff --git a/myhoard/basebackup_operation.py b/myhoard/basebackup_operation.py index efb3183..e2095bf 100644 --- a/myhoard/basebackup_operation.py +++ b/myhoard/basebackup_operation.py @@ -1,8 +1,8 @@ # Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/ from contextlib import suppress -from distutils.version import LooseVersion # pylint:disable=deprecated-module from myhoard.errors import BlockMismatchError, XtraBackupError from myhoard.util import get_mysql_version, mysql_cursor +from packaging.version import Version from rohmu.util import increase_pipe_capacity, set_stream_nonblocking from typing import Optional @@ -156,7 +156,7 @@ def _optimize_tables(self) -> None: params["timeout"] = CURSOR_TIMEOUT_DURING_OPTIMIZE with mysql_cursor(**params) as cursor: version = get_mysql_version(cursor) - if LooseVersion(version) < LooseVersion("8.0.29"): + if Version(version) < Version("8.0.29"): return # allow OPTIMIZE TABLE to run on tables without primary keys diff --git a/myhoard/controller.py b/myhoard/controller.py index 7e1215d..ba647e6 100644 --- a/myhoard/controller.py +++ b/myhoard/controller.py @@ -21,6 +21,7 @@ ) from http.client import RemoteDisconnected from httplib2 import ServerNotFoundError +from packaging.version import Version from rohmu import get_transfer from rohmu.compressor import DecompressSink from rohmu.encryptor import DecryptSink @@ -48,6 +49,7 @@ class BaseBackup(TypedDict): end_ts: float + mysql_version: Optional[str] class Backup(TypedDict): @@ -158,6 +160,7 @@ def __init__( temp_dir, restore_free_memory_percentage=None, xtrabackup_settings: Dict[str, int], + restrict_backup_version_higher: Optional[Version] = None, ): super().__init__() self.log = logging.getLogger(self.__class__.__name__) @@ -232,6 +235,7 @@ def __init__( self.xtrabackup_settings = xtrabackup_settings self._get_upload_backup_site() self._update_mode_tag() + self.restrict_backup_version_higher = restrict_backup_version_higher def is_log_backed_up(self, *, log_index: int): return all( @@ -306,6 +310,13 @@ def restore_backup( continue if not backup["basebackup_info"]: raise ValueError(f"Backup {backup!r} cannot be restored") + basebackup_mysql_version = backup["basebackup_info"].get("mysql_version") + if basebackup_mysql_version and self.restrict_backup_version_higher: + if Version(basebackup_mysql_version) > self.restrict_backup_version_higher: + raise ValueError( + f"Backup was taken with MySQL version {basebackup_mysql_version} which is higher than " + f"allowed {self.restrict_backup_version_higher}: {backup!r}" + ) if backup.get("broken_at"): raise ValueError(f"Cannot restore a broken backup: {backup!r}") diff --git a/myhoard/myhoard.py b/myhoard/myhoard.py index f7f626b..edd15c1 100644 --- a/myhoard/myhoard.py +++ b/myhoard/myhoard.py @@ -4,6 +4,7 @@ from myhoard.statsd import StatsClient from myhoard.util import DEFAULT_XTRABACKUP_SETTINGS, detect_running_process_id, wait_for_port from myhoard.web_server import WebServer +from packaging.version import Version import argparse import asyncio @@ -192,6 +193,10 @@ async def _start(self): tags=statsd_config["tags"], ) mysql = self.config["mysql"] + raw_restrict_backup_version_higher = self.config.get("restrict_backup_version_higher") + restrict_backup_version_higher = ( + Version(raw_restrict_backup_version_higher) if raw_restrict_backup_version_higher else None + ) self.controller = Controller( backup_settings=self.config["backup_settings"], backup_sites=self.config["backup_sites"], @@ -211,6 +216,7 @@ async def _start(self): stats=statsd, temp_dir=self.config["temporary_directory"], xtrabackup_settings=self.config.get("xtrabackup", DEFAULT_XTRABACKUP_SETTINGS), + restrict_backup_version_higher=restrict_backup_version_higher, ) self.controller.start() self.web_server = WebServer( diff --git a/pyproject.toml b/pyproject.toml index 3623b17..6ba65be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ dependencies = [ "python-snappy == 0.6.1", "rohmu >= 1.1.2", "sentry-sdk==1.14.0", + "packaging", ] [project.optional-dependencies] diff --git a/test/conftest.py b/test/conftest.py index 4f4509b..bb721be 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -382,6 +382,7 @@ def fixture_myhoard_config(default_backup_site, mysql_master, session_tmpdir): "systemd_service": None, "temporary_directory": temp_dir, "xtrabackup": DEFAULT_XTRABACKUP_SETTINGS, + "restrict_backup_version_higher": None, } diff --git a/test/test_backup_stream.py b/test/test_backup_stream.py index bea24dc..b48f7ad 100644 --- a/test/test_backup_stream.py +++ b/test/test_backup_stream.py @@ -116,6 +116,7 @@ def _run_backup_stream_test(session_tmpdir, mysql_master: MySQLConfig, backup_st assert bs.is_binlog_safe_to_delete(new_binlogs[0]) assert bs.is_log_backed_up(log_index=new_binlogs[0]["local_index"]) + assert bs.state["basebackup_info"].get("mysql_version") is not None # remote_gtid_executed will be updated once the stream notices the new binlog that was uploaded above wait_for_condition(lambda: bs_observer.state["remote_gtid_executed"] != gtid_executed) diff --git a/test/test_basebackup_operation.py b/test/test_basebackup_operation.py index 94c9522..51c4258 100644 --- a/test/test_basebackup_operation.py +++ b/test/test_basebackup_operation.py @@ -1,7 +1,7 @@ # Copyright (c) 2019 Aiven, Helsinki, Finland. https://aiven.io/ from . import build_statsd_client, MySQLConfig, restart_mysql -from distutils.version import LooseVersion # pylint:disable=deprecated-module from myhoard.basebackup_operation import BasebackupOperation +from packaging.version import Version from typing import IO from unittest import SkipTest from unittest.mock import mock_open, patch @@ -139,7 +139,7 @@ def stream_handler(_stream): def test_backup_with_non_optimized_tables(mysql_master: MySQLConfig) -> None: with myhoard_util.mysql_cursor(**mysql_master.connect_options) as cursor: version = myhoard_util.get_mysql_version(cursor) - if LooseVersion(version) < LooseVersion("8.0.29"): + if Version(version) < Version("8.0.29"): raise SkipTest("DB version doesn't need OPTIMIZE TABLE") def create_test_db(*, db_name: str, table_name: str, add_pk: bool) -> None: diff --git a/test/test_controller.py b/test/test_controller.py index 9cfc759..a0228eb 100644 --- a/test/test_controller.py +++ b/test/test_controller.py @@ -14,6 +14,7 @@ parse_gtid_range_string, partition_sort_and_combine_gtid_ranges, ) +from packaging.version import Version from rohmu import get_transfer from typing import Any, cast, Dict, List, Optional, Set, TypedDict from unittest.mock import MagicMock, patch @@ -375,7 +376,7 @@ def test_backup_state_from_removed_site_is_removed(default_backup_site, mysql_em fake_file_names = create_fake_state_files(controller) controller.state["backups"] = [ Backup( - basebackup_info=BaseBackup(end_ts=0.0), + basebackup_info=BaseBackup(end_ts=0.0, mysql_version=None), closed_at=None, completed_at=None, recovery_site=False, @@ -1679,6 +1680,31 @@ def restoration_is_failed(): new_controller.stop() +def test_forbid_to_restore_backup( + master_controller, +) -> None: + controller, _ = master_controller + controller.restrict_backup_version_higher = Version("8.0.1") + controller.state["backups"] = [ + Backup( + basebackup_info=BaseBackup(end_ts=0.0, mysql_version="8.0.30"), + closed_at=None, + completed_at=None, + broken_at=None, + preserve_until=None, + recovery_site=False, + stream_id="1234", + resumable=False, + site="default", + ) + ] + with pytest.raises(ValueError, match="Backup was taken with MySQL version 8.0.30 which is higher than allowed 8.0.1:"): + controller.restore_backup(site="default", stream_id="1234") + controller.restrict_backup_version_higher = Version("8.0.30") + controller.restore_backup(site="default", stream_id="1234") + controller.state = controller.Mode.restore + + @patch.object(RestoreCoordinator, "MAX_BASEBACKUP_ERRORS", 2) def test_restore_failed_basebackup_and_retry_with_prior( default_backup_site, @@ -1812,7 +1838,7 @@ def remove_backup(backup_stream) -> None: def _append_backup(stream_id: str, ts: float, read_only: bool = True) -> None: controller.state["backups"].append( { - "basebackup_info": {"end_ts": ts}, + "basebackup_info": {"end_ts": ts, "mysql_version": None}, "closed_at": ts, "completed_at": ts, "broken_at": None, @@ -1869,6 +1895,7 @@ def _add_fake_backup(stream_id: str) -> None: { "basebackup_info": { "end_ts": now - 20 * 60, + "mysql_version": None, }, "closed_at": now - 3 * 60, "completed_at": now - 5 * 60,