From f9055343fe702749455346786b0b9cdcb6a1a35b Mon Sep 17 00:00:00 2001 From: kouloumos Date: Thu, 13 Jun 2024 15:45:21 +0300 Subject: [PATCH] introduce `DateFetcher` for retrieving btctranscripts related data This replaces and enchances the previously used utility functions. Currently unused methods are related with to-be-published code that is related with transcripts curation and metadata processing. --- app/data_fetcher.py | 82 ++++++++++++++++++++++++++++++++++++++++++++ app/transcription.py | 6 ++-- app/types.py | 20 ++++++++++- app/utils.py | 49 ++------------------------ 4 files changed, 107 insertions(+), 50 deletions(-) create mode 100644 app/data_fetcher.py diff --git a/app/data_fetcher.py b/app/data_fetcher.py new file mode 100644 index 0000000..21b0904 --- /dev/null +++ b/app/data_fetcher.py @@ -0,0 +1,82 @@ +import json +import os +import requests +from typing import Dict, Literal, Optional, List + +from app import ( + logging +) +from app.types import SourceType, TranscriptionCoverage + +logger = logging.get_logger() + + +class DataFetcher: + """ + The DataFetcher class is responsible for retrieving and caching JSON data from Bitcoin Transcripts, + which serve as the source of truth for various transcription-related information. It provides methods + to fetch data on transcription status, sources, existing media, speakers, and tags, ensuring efficient + data retrieval and reducing redundant network requests through caching. + """ + + def __init__(self, base_url: str, cache_dir: Optional[str] = "cache/"): + self.base_url = base_url + self.cache_dir = cache_dir + if self.cache_dir: + os.makedirs(self.cache_dir, exist_ok=True) + + def fetch_json(self, name: Literal['status', 'sources', 'directories'], cache: bool = False): + """Fetches JSON data from a configured URL or local cache""" + cached_file_path = os.path.join( + self.cache_dir, f"{name}.json") if self.cache_dir else None + + if cache and cached_file_path and os.path.exists(cached_file_path): + # Load data from the local file + logger.info(f"Fetched data from {cached_file_path}") + with open(cached_file_path, "r") as file: + return json.load(file) + + # Fetch data from the remote URL + url = f"{self.base_url}/{name}.json" + response = requests.get(url) + if response.status_code == 200: + data = response.json() + logger.info(f"Fetched data from {url} (cache={cache})") + if cache and cached_file_path: + # Store the fetched data locally + with open(cached_file_path, "w") as file: + json.dump(data, file) + return data + else: + raise Exception( + f"Failed to fetch data from {url}. Status code: {response.status_code}") + + def get_existing_media(self) -> Dict[str, bool]: + """Returns a dictionary of existing media""" + data = self.fetch_json("status") + return {value: True for value in data.get("existing", {}).get("media", [])} + + def get_transcription_queue(self) -> List[str]: + """Returns a list of items that need transcription""" + data = self.fetch_json("status") + return data.get("needs", {}).get("transcript", []) + + def get_sources(self, loc: str, transcription_coverage: TranscriptionCoverage, cache: bool = False) -> list[SourceType]: + """Returns filtered sources based on location and transcription coverage""" + data: list[SourceType] = self.fetch_json('sources', cache) + filtered_data = [ + source for source in data if source['loc'] == loc or loc == 'all'] + if transcription_coverage != 'none': + filtered_data = [source for source in filtered_data if source.get( + 'transcription_coverage') == transcription_coverage] + return filtered_data + + def get_speakers(self) -> List[str]: + """Returns a list of existing speakers""" + data = self.fetch_json("status") + return data.get("existing", {}).get("speakers", []) + + def get_tags(self) -> List[str]: + """Returns a list of existing tags""" + data = self.fetch_json("status") + return data.get("existing", {}).get("tags", []) diff --git a/app/transcription.py b/app/transcription.py index ef91c6e..800fb48 100644 --- a/app/transcription.py +++ b/app/transcription.py @@ -30,6 +30,7 @@ GitHubMode, ) from app.data_writer import DataWriter +from app.data_fetcher import DataFetcher class Transcription: @@ -74,6 +75,7 @@ def __init__( self.queuer = Queuer(test_mode=test_mode) if queue is True else None self.existing_media = None self.preprocessing_output = [] if batch_preprocessing_output else None + self.data_fetcher = DataFetcher(base_url="http://btctranscripts.com") self.logger.info(f"Temp directory: {self.tmp_dir}") @@ -231,7 +233,7 @@ def add_transcription_source( if os.path.isfile(source_file): local = True if not nocheck and not local and self.existing_media is None and not self.test_mode: - self.existing_media = utils.get_existing_media() + self.existing_media = self.data_fetcher.get_existing_media() # combine existing media from btctranscripts.com with excluded media given from source excluded_media = {value: True for value in excluded_media} if self.existing_media is not None: @@ -275,7 +277,7 @@ def add_transcription_source( f"Source added for transcription: {source.title}") else: transcription_sources['exist'].append(source.source_file) - self.logger.info(f"Source already exists: {source.title}") + self.logger.info(f"Source already exists ({self.data_fetcher.base_url}): {source.title}") else: raise Exception(f"Invalid source: {source_file}") if source.type in ['playlist', 'rss']: diff --git a/app/types.py b/app/types.py index e0729b1..6908b04 100644 --- a/app/types.py +++ b/app/types.py @@ -1,11 +1,29 @@ from typing import ( Literal, TypedDict, - Optional + Optional, + Union ) GitHubMode = Literal["remote", "local", "none"] +TranscriptionCoverage = Optional[Literal["full", "none"]] + + +class TranscriptType(TypedDict): + title: str + media: Optional[Union[str, list[str]]] + episode: int + + +class SourceType(TypedDict): + title: str + source: str + categories: Optional[str] + loc: str + cutoff_date: str + transcription_coverage: TranscriptionCoverage + transcripts: list[TranscriptType] class Word(TypedDict): diff --git a/app/utils.py b/app/utils.py index b541309..13e8090 100644 --- a/app/utils.py +++ b/app/utils.py @@ -3,8 +3,6 @@ import re from datetime import datetime, date -import requests - from app.logging import get_logger logger = get_logger() @@ -88,7 +86,8 @@ def configure_metadata_given_from_JSON(source, from_json=None): metadata["date"] = source.get("date", None) metadata["summary"] = source.get("summary", None) metadata["episode"] = source.get("episode", None) - metadata["additional_resources"] = source.get("additional_resources", None) + metadata["additional_resources"] = source.get( + "additional_resources", None) metadata["cutoff_date"] = source.get("cutoff_date", None) metadata["youtube_metadata"] = source.get("youtube", None) metadata["media"] = source.get("media", None) @@ -120,47 +119,3 @@ def configure_metadata_given_from_JSON(source, from_json=None): return metadata except KeyError as e: raise Exception(f"Parsing JSON: {e} is required") - - -def get_status(): - """Helper method to fetch and store status.json locally""" - STATUS_FILE_PATH = "status.json" # the file path for storing the status locally - try: - source = STATUS_FILE_PATH - if os.path.exists(STATUS_FILE_PATH): - # If the file exists locally, load the data from the file - with open(STATUS_FILE_PATH, "r") as file: - data = json.load(file) - else: - # If the file doesn't exist locally, fetch it from the remote URL - url = "http://btctranscripts.com/status.json" - source = url - response = requests.get(url) - if response.status_code == 200: - data = response.json() - # Store the fetched data locally - with open(STATUS_FILE_PATH, "w") as file: - json.dump(data, file) - else: - raise Exception(f"Status code: {response.status_code}") - - return data, source - except Exception as e: - logger.error(f"Error fetching status data: {e}") - return None - - -def get_existing_media(): - """Helper method to create a dictionary with all the existing media from btctranscripts.com - It can be used to quickly check if a source is already transcribed""" - try: - data, source = get_status() # Fetch status data - if data: - logger.info( - f"Fetched {len(data['existing']['media'])} existing media sources from {source}") - return {value: True for value in data["existing"]["media"]} - else: - return {} - except Exception as e: - logger.error(f"Error fetching media data: {e}") - return {}