Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Refactor database backup and restore to use class inheritance
Browse files Browse the repository at this point in the history
  • Loading branch information
Heavybullets8 committed May 26, 2024
1 parent 5005537 commit 6d3ca25
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 147 deletions.
151 changes: 47 additions & 104 deletions functions/backup_restore/database/backup.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import subprocess
import logging
import gzip
from pathlib import Path
from typing import Dict
from charts.api_fetch import APIChartFetcher
from utils.type_check import type_check
from utils.singletons import KubernetesClientManager
from .utils import DatabaseUtils
from .base import CNPGBase

class BackupCNPGDatabase:
class BackupCNPGDatabase(CNPGBase):
"""
Class responsible for backing up a CNPG (Cloud Native PostgreSQL) database.
"""
Expand All @@ -22,19 +19,49 @@ def __init__(self, backup_dir: Path, app_name: str):
backup_dir (Path): Directory where the backup will be stored.
app_name (str): Name of the application.
"""
self.logger = logging.getLogger('BackupLogger')
self.v1_client = KubernetesClientManager.fetch()
self.chart_info = APIChartFetcher(app_name)
super().__init__(app_name)
self.backup_dir = backup_dir
self.app_name = app_name
self.namespace = f"ix-{app_name}"
self.db_utils = DatabaseUtils(self.namespace)
self.primary_pod = None
self.database_name = None
self.dump_command = None
self.output_file = self.backup_dir / f"{self.app_name}.sql.gz"
self.temp_file = self.backup_dir / f"{self.app_name}.sql"
self.error = None

def _modify_dump_for_immich(self) -> None:
"""
Modify the dump data for immich database.
"""
self.logger.debug("Modifying dump data for immich database.")
try:
with open(self.temp_file, 'r') as f:
data = f.read().splitlines()
modified_data = "\n".join(
'-- ' + line if 'DROP ROLE IF EXISTS postgres;' in line or 'CREATE ROLE postgres;' in line
else "SELECT pg_catalog.set_config('search_path', 'public, pg_catalog', true);" if "SELECT pg_catalog.set_config('search_path', '', false);" in line
else line
for line in data
)
with open(self.temp_file, 'w') as f:
f.write(modified_data)

self.logger.debug(f"Dump data for immich modified successfully.")
except IOError as e:
message = f"Failed to modify dump data for immich: {e}"
self.logger.error(message, exc_info=True)
raise

def _compress_backup(self) -> None:
"""
Compress the temporary backup file to gzip format and remove the temporary file.
"""
self.logger.debug(f"Compressing the backup file {self.temp_file} to {self.output_file}")
try:
with open(self.temp_file, 'rb') as f_in:
with gzip.open(self.output_file, 'wb') as f_out:
f_out.writelines(f_in)
self.temp_file.unlink()
self.logger.debug(f"Backup file compressed successfully.")
except IOError as e:
message = f"Failed to compress backup file: {e}"
self.logger.error(message, exc_info=True)
raise

def backup(self, timeout=300, interval=5) -> Dict[str, str]:
"""
Expand All @@ -59,38 +86,25 @@ def backup(self, timeout=300, interval=5) -> Dict[str, str]:

if app_status == "STOPPED":
self.logger.debug(f"App {self.app_name} is stopped, starting it for backup.")
if not self.db_utils.start_app(self.app_name):
if not self.start_app(self.app_name):
message = f"Failed to start app {self.app_name}."
self.logger.error(message)
result["message"] = message
return result
was_stopped = True

self.primary_pod = self._get_primary_pod(timeout, interval)
self.primary_pod = self.fetch_primary_pod(timeout, interval)
if not self.primary_pod:
message = "Primary pod not found."
self.logger.error(message)
result["message"] = message

if was_stopped:
self.logger.debug(f"Stopping app {self.app_name} after backup failure.")
self.db_utils.stop_app(self.app_name)
self.stop_app(self.app_name)

return result

if self.chart_info.chart_name != "immich":
self.database_name = self._get_database_name()
if not self.database_name:
message = "Database name retrieval failed."
self.logger.error(message)
result["message"] = message

if was_stopped:
self.logger.debug(f"Stopping app {self.app_name} after backup failure.")
self.db_utils.stop_app(self.app_name)

return result

try:
self.dump_command = self._get_dump_command()
self.logger.debug(f"Executing dump command: {self.dump_command}")
Expand All @@ -100,7 +114,7 @@ def backup(self, timeout=300, interval=5) -> Dict[str, str]:
if not result["success"]:
if was_stopped:
self.logger.debug(f"Stopping app {self.app_name} after backup failure.")
self.db_utils.stop_app(self.app_name)
self.stop_app(self.app_name)
return result

if self.chart_info.chart_name == "immich":
Expand All @@ -118,42 +132,10 @@ def backup(self, timeout=300, interval=5) -> Dict[str, str]:

if was_stopped:
self.logger.debug(f"Stopping app {self.app_name} after successful backup.")
self.db_utils.stop_app(self.app_name)
self.stop_app(self.app_name)

return result

def _get_primary_pod(self, timeout, interval) -> str:
"""
Fetch the primary pod for the database.
Parameters:
timeout (int): Maximum time to wait for the primary pod to be found.
interval (int): Interval between retries to find the primary pod.
Returns:
str: The name of the primary pod if found, None otherwise.
"""
self.logger.debug(f"Fetching primary pod for app: {self.app_name} with timeout: {timeout} and interval: {interval}")
primary_pod = self.db_utils.fetch_primary_pod(timeout, interval)
if primary_pod:
self.logger.debug(f"Primary pod found: {primary_pod}")
return primary_pod

def _get_database_name(self) -> str:
"""
Fetch the database name.
Returns:
str: The name of the database if found, None otherwise.
"""
self.logger.debug(f"Fetching database name for app: {self.app_name}")
database_name = self.db_utils.fetch_database_name()
if not database_name:
self.logger.error("Failed to get database name.")
return None
self.logger.debug(f"Database name found: {database_name}")
return database_name

def _get_dump_command(self) -> list:
"""
Get the appropriate dump command based on the chart name.
Expand Down Expand Up @@ -226,42 +208,3 @@ def _execute_backup_command(self) -> Dict[str, str]:
result["message"] = message

return result

def _compress_backup(self) -> None:
"""
Compress the temporary backup file to gzip format and remove the temporary file.
"""
self.logger.debug(f"Compressing the backup file {self.temp_file} to {self.output_file}")
try:
with open(self.temp_file, 'rb') as f_in:
with gzip.open(self.output_file, 'wb') as f_out:
f_out.writelines(f_in)
self.temp_file.unlink() # Remove the temporary file
self.logger.debug(f"Backup file compressed successfully.")
except IOError as e:
message = f"Failed to compress backup file: {e}"
self.logger.error(message, exc_info=True)
raise

def _modify_dump_for_immich(self) -> None:
"""
Modify the dump data for immich database.
"""
self.logger.debug("Modifying dump data for immich database.")
try:
with open(self.temp_file, 'r') as f:
data = f.read().splitlines()
modified_data = "\n".join(
'-- ' + line if 'DROP ROLE IF EXISTS postgres;' in line or 'CREATE ROLE postgres;' in line
else "SELECT pg_catalog.set_config('search_path', 'public, pg_catalog', true);" if "SELECT pg_catalog.set_config('search_path', '', false);" in line
else line
for line in data
)
with open(self.temp_file, 'w') as f:
f.write(modified_data)

self.logger.debug(f"Dump data for immich modified successfully.")
except IOError as e:
message = f"Failed to modify dump data for immich: {e}"
self.logger.error(message, exc_info=True)
raise
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,41 @@
from utils.shell import run_command
from utils.singletons import KubernetesClientManager
from utils.type_check import type_check
from charts.api_fetch import APIChartFetcher

class DatabaseUtils:
class CNPGBase:
"""
Utility class for database operations.
Base class for CNPG (Cloud Native PostgreSQL) database operations (backup and restore).
"""

@type_check
def __init__(self, namespace: str):
def __init__(self, app_name: str):
"""
Initialize the DatabaseUtils class.
Initialize the CNPGBase class.
Parameters:
namespace (str): The namespace to operate in.
app_name (str): The name of the application.
backup_dir (Path): Directory where the backup will be stored.
backup_file (Path): The path to the backup file.
"""
self.logger = logging.getLogger('BackupLogger')
self.v1_client = KubernetesClientManager.fetch()
self.namespace = namespace
self.app_name = app_name
self.namespace = f"ix-{app_name}"
self.chart_info = APIChartFetcher(app_name)

def fetch_primary_pod(self, timeout=300, interval=5) -> str:
self.primary_pod = None
self.database_name = None
self.database_user = None
self.dump_command = None
self.error = None

# Fetch database name and user if needed
if self.chart_info.chart_name != "immich":
self.database_name = self.fetch_database_name()
self.database_user = self.fetch_database_user() or self.database_name

def fetch_primary_pod(self, timeout=600, interval=5) -> str:
"""
Wait for the primary pod to be in the 'Running' state and return its name.
Expand All @@ -37,7 +53,7 @@ def fetch_primary_pod(self, timeout=300, interval=5) -> str:
"""
deadline = time.time() + timeout
self.logger.debug(f"Waiting for primary pod in namespace '{self.namespace}' with timeout={timeout} and interval={interval}")

while time.time() < deadline:
try:
pods = self.v1_client.list_namespaced_pod(self.namespace, label_selector='role=primary')
Expand All @@ -48,7 +64,7 @@ def fetch_primary_pod(self, timeout=300, interval=5) -> str:
except ApiException as e:
self.logger.error(f"Failed to list pods: {e}")
time.sleep(interval)

self.logger.error("Timed out waiting for primary pod.")
return None

Expand Down Expand Up @@ -85,7 +101,7 @@ def _fetch_secret_data(self, suffix: str, key: str) -> str:
str: The decoded data from the secret if found, otherwise None.
"""
self.logger.debug(f"Fetching secret data with suffix '{suffix}' and key '{key}' in namespace '{self.namespace}'")

try:
secrets = self.v1_client.list_namespaced_secret(self.namespace)
for secret in secrets.items:
Expand All @@ -95,7 +111,7 @@ def _fetch_secret_data(self, suffix: str, key: str) -> str:
return decoded_data
except ApiException as e:
self.logger.error(f"Failed to fetch secrets: {e}", exc_info=True)

self.logger.warning(f"No secret found with suffix '{suffix}' and key '{key}'")
return None

Expand Down Expand Up @@ -144,4 +160,4 @@ def stop_app(self, app_name: str):
self.logger.debug(f"App {app_name} stopped successfully.")
else:
self.logger.error(f"Failed to stop app {app_name}: {result.get_error()}")
raise RuntimeError(f"Failed to stop app {app_name}: {result.get_error()}")
raise RuntimeError(f"Failed to stop app {app_name}: {result.get_error()}")
Loading

0 comments on commit 6d3ca25

Please sign in to comment.