-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbookscraper.py
101 lines (81 loc) · 3.29 KB
/
bookscraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import requests
import json
import os
from bs4 import BeautifulSoup, NavigableString
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def fetch_all_posts(user_id=""):
base_url = f"https://kemono.su/api/v1/patreon/user/{user_id}"
all_posts = []
offset = 0
while True:
url = f"{base_url}?o={offset}"
try:
response = requests.get(url)
response.raise_for_status()
posts = response.json()
if not posts:
break
all_posts.extend(posts)
offset += 50
except requests.RequestException as e:
logging.info(f"Error fetching posts: {e}")
break
# Sort posts by 'id' in ascending order
reversed_posts = all_posts[::-1]
reversed_posts.sort(key=lambda post: int(post['id']))
return reversed_posts
def clean_html(html_content):
soup = BeautifulSoup(html_content, "html.parser")
# Iterate over each paragraph and other block elements
for element in soup.find_all(['p', 'div', 'br', 'ul', 'ol', 'li', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']):
if element.name == 'br':
element.replace_with('\n\n')
else:
element.append('\n\n')
# Get text from the soup object
text = ''.join([str(elem) if isinstance(elem, NavigableString) else elem.get_text() for elem in soup.contents])
return text.strip()
def read_next_post(posts_filename="filtered_posts.json", books_directory="books", offset=None):
if not os.path.exists(books_directory):
os.makedirs(books_directory)
with open(posts_filename, 'r', encoding='utf-8') as file:
posts = json.load(file)
result = None
for post in posts:
post_id = post['id']
post_title = post['title'].replace('/', '_')
if offset is not None and int(post_id) < offset:
continue
filename = os.path.join(books_directory, f"{post_id}_{post_title}.txt")
if not os.path.exists(filename):
result = filename, clean_html(post['content'])
break
return result
def confirm_post_read(post_filename, post_content):
with open(post_filename, 'w', encoding='utf-8') as output_file:
output_file.write(post_content)
def filter_and_save_posts(posts, filter_text, filename="filtered_posts.json"):
filtered_posts = [post for post in posts if filter_text in post.get("title", "")]
with open(filename, 'w', encoding='utf-8') as file:
json.dump(filtered_posts, file, ensure_ascii=False, indent=4)
import subprocess
def run_royal_road_scraper(royal_road_id: str, name: str):
# Run code in ./royalroad-api using node
command = ["node", ".", royal_road_id, f"../fetchdata/{name}.json"]
subprocess.run(command, check=True, cwd="./royalroad-api")
if __name__ == "__main__":
# user = "24146169"
filter_text = ""
# name="filtered_posts"
user = "67742"
name="filtered_posts_elydes"
mode="kemono"
# Fetch, sort, and save posts
if mode == "royalroad":
run_royal_road_scraper(user, name)
elif mode == "kemono":
all_posts = fetch_all_posts("93608357")
filter_and_save_posts(all_posts, "", filename=f"fetchdata/k{name}.json")
else:
raise Exception("Invalid mode provided")