From 3f564ca06dc3ed1eab1e8d0568564bacf4cb2ee4 Mon Sep 17 00:00:00 2001 From: Herr Erde Date: Sun, 29 Sep 2024 05:15:10 +0200 Subject: [PATCH] add bords upgrades add fetch_upgrades boards upgrades scraper refactored fetch_outfits --- script/fetch_outfits.py | 52 ++++++------- script/fetch_upgrades.py | 157 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 184 insertions(+), 25 deletions(-) create mode 100644 script/fetch_upgrades.py diff --git a/script/fetch_outfits.py b/script/fetch_outfits.py index ce31534..2506b4a 100644 --- a/script/fetch_outfits.py +++ b/script/fetch_outfits.py @@ -8,17 +8,17 @@ output_file_path = "temp/upload/characters_outfit.json" -def load_input_data(file_path): +def load_data(file_path): with open(file_path, "r") as file: return json.load(file) -def save_output_data(output, file_path): +def save_data(output, file_path): with open(file_path, "w", encoding="utf-8") as file: json.dump(output, file, indent=2, ensure_ascii=False) -def fetch_page_content(session, url): +def fetch_content(session, url): try: response = session.get(url, allow_redirects=True) response.raise_for_status() @@ -30,8 +30,8 @@ def fetch_page_content(session, url): return None -def extract_outfit_names(appearance_section): - outfit_names = [ +def extract_names(appearance_section): + names = [ ( li.find("span", class_="toctext").get_text() if li.find("span", class_="toctext") @@ -40,43 +40,45 @@ def extract_outfit_names(appearance_section): for li in appearance_section.find_all_next("li") ] + """ # Clean outfit names and remove duplicates - cleaned_outfit_names = [] + cleaned_names = [] seen_names = set() - for name in outfit_names: + for name in names: parts = name.split("Outfit") - outfit_name = parts[0] + "Outfit" if len(parts) > 1 else name + outfit_name = parts[0] + "Outfit" if len(parts) > 2 else parts[0] outfit_name = outfit_name.strip() if outfit_name and outfit_name not in seen_names: - cleaned_outfit_names.append(outfit_name) + cleaned_names.append(outfit_name) seen_names.add(outfit_name) + """ - return cleaned_outfit_names + return names -def extract_outfits_from_tabber(tabber_div, cleaned_outfit_names): +def extract_data(tabber_div, cleaned_names): tab_content_divs = tabber_div.find_all("div", class_="wds-tab__content") - outfits = [] + data = [] for i, tab_content_div in enumerate(tab_content_divs): - outfit_name = cleaned_outfit_names[i] if i < len(cleaned_outfit_names) else "" + name = cleaned_names[i] if i < len(cleaned_names) else "" img_tag = tab_content_div.find("img") if img_tag: img_url = img_tag["src"].split(".png")[0] + ".png" - outfits.append({"name": outfit_name, "url": img_url}) + data.append({"name": name, "url": img_url}) - return outfits + return data -def fetch_outfits(session, entry): +def fetch_data(session, entry): if not entry["available"]: print(f"Skipping '{entry['name']}'") return None url = f"https://subwaysurf.fandom.com/wiki/{entry['name'].replace(' ', '_')}" - print(f"'{entry['name']}': {url}") + print(f"{entry['name']}: {url}") - soup = fetch_page_content(session, url) + soup = fetch_content(session, url) if soup is None: return {"name": entry["name"], "outfits": None} @@ -90,7 +92,7 @@ def fetch_outfits(session, entry): print("Error: Appearance section not found.") return {"name": entry["name"], "outfits": None} - cleaned_outfit_names = extract_outfit_names(appearance_section) + names = extract_names(appearance_section) infobox_table = soup.find("table", class_="infobox") if infobox_table is None: @@ -102,7 +104,7 @@ def fetch_outfits(session, entry): print("Error: tabber div not found in infobox table.") return {"name": entry["name"], "outfits": None} - outfits = extract_outfits_from_tabber(tabber_div, cleaned_outfit_names) + outfits = extract_data(tabber_div, names) return {"name": entry["name"], "outfits": outfits} @@ -111,9 +113,9 @@ def process_entries(data, limit): try: with requests.Session() as session: for entry in data[:limit]: - outfits_data = fetch_outfits(session, entry) - if outfits_data is not None: - output.append(outfits_data) + data = fetch_data(session, entry) + if data is not None: + output.append(data) except Exception as e: print("Error:", e) except KeyboardInterrupt: @@ -123,14 +125,14 @@ def process_entries(data, limit): def main(limit): - data = load_input_data(input_file_path) + data = load_data(input_file_path) # Handle cases where limit is None or 0 if limit is None or limit == 0: limit = len(data) output = process_entries(data, limit) - save_output_data(output, output_file_path) + save_data(output, output_file_path) if __name__ == "__main__": diff --git a/script/fetch_upgrades.py b/script/fetch_upgrades.py new file mode 100644 index 0000000..28470a5 --- /dev/null +++ b/script/fetch_upgrades.py @@ -0,0 +1,157 @@ +import json +import sys + +import requests +from bs4 import BeautifulSoup + +input_file_path = "temp/upload/boards_links.json" +output_file_path = "temp/upload/boards_upgrades.json" + + +def load_data(file_path): + with open(file_path, "r") as file: + return json.load(file) + + +def save_data(output, file_path): + with open(file_path, "w", encoding="utf-8") as file: + json.dump(output, file, indent=2, ensure_ascii=False) + + +def fetch_content(session, url): + try: + response = session.get(url, allow_redirects=True) + response.raise_for_status() + return BeautifulSoup(response.content, "html.parser") + except requests.exceptions.HTTPError as http_err: + print(f"HTTP error occurred: {http_err}") + except Exception as e: + print(f"Error occurred: {e}") + return None + + +def extract_data(tabber_div, names): + # Find all tab content divs within the tabber_div + tab_content_divs = tabber_div.find_all("div", class_="wds-tab__content") + data = [] + + for i, tab_content_div in enumerate(tab_content_divs): + name = names[i] if i < len(names) else "" + a_tag = tab_content_div.find("a") + img_url = None + + # Extract the href URL from the tag, if it exists + if ( + a_tag + and not "File:" in a_tag.get("title", "") + and not "/wiki/" in a_tag.get("href", "") + ): + img_url = a_tag.get("href") + else: + img_url = None + + if img_url: + img_url = img_url.split(".png")[0] + ".png" + + data.append({"name": name, "url": img_url}) + + return data + + +def fetch_data(session, entry): + if not entry["available"]: + print(f"Skipping '{entry['name']}'") + return None + + url = f"https://subwaysurf.fandom.com/wiki/{entry['name'].replace(' ', '_')}" + print(f"{entry['name']}: {url}") + + soup = fetch_content(session, url) + if soup is None: + print(f"Error: Unable to fetch content for {entry['name']}") + return {"name": entry["name"], "upgrades": None} + + infobox_table = soup.find("table", class_="infobox") + if infobox_table is None: + print("Error: Infobox table not found.") + return {"name": entry["name"], "upgrades": None} + + tabber_div = infobox_table.find("div", class_="tabber wds-tabber") + if tabber_div is None: + print("Error: Tabber div not found in infobox table.") + return {"name": entry["name"], "upgrades": None} + + tbody_elements = infobox_table.select("tbody") + + tr_elements = tbody_elements[0].select("tr") + + if len(tr_elements) > 9: + names = [] + + target_tr = tr_elements[9] + second_td = target_tr.find_all("td")[1] + + a_tags = target_tr.select("a") + for a in a_tags: + title = a.get("title") + content = a.get_text(strip=True) + if title and title not in ["Key", "Event Coin", "Shells"]: + names.append(content) + else: + continue + + process = not a_tags or any( + a.get_text(strip=True) not in ["Key", "Event Coin", "Shells"] + for a in a_tags + ) + + if process: + td_content = second_td.get_text(separator="\n", strip=True).split("\n") + + for line in td_content: + line = line.strip() + if line and not line.isdigit(): + names.append(line) + + else: + names = [] + + names.insert(0, "Original") + if len(names) >= 3: + names.insert(len(names), "Fully upgraded") + + upgrades = extract_data(tabber_div, names) + + return {"name": entry["name"], "upgrades": upgrades} + + +def process_entries(data, limit): + output = [] + try: + with requests.Session() as session: + for entry in data[:limit]: + data = fetch_data(session, entry) + if data is not None: + output.append(data) + except Exception as e: + print("Error:", e) + except KeyboardInterrupt: + print("\nKeyboard interrupt received. Finishing current processing.") + + return output + + +def main(limit): + data = load_data(input_file_path) + + # Handle cases where limit is None or 0 + if limit is None or limit == 0: + limit = len(data) + + output = process_entries(data, limit) + save_data(output, output_file_path) + + +if __name__ == "__main__": + limit = int(sys.argv[1]) if len(sys.argv) > 1 else None + main(limit)