-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpoll.py
125 lines (93 loc) · 3.97 KB
/
poll.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import indieweb_utils
import requests
from granary import atom, jsonfeed, microformats2, rss
import json
from tqdm import tqdm
import concurrent.futures
from urllib.parse import urlparse
import datetime
import os
from dateutil import parser
USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36"
today = datetime.datetime.now().strftime("%Y-%m-%d")
with open("feeds.txt", "r") as f:
feeds = f.read().splitlines()
feeds = [feed.strip() for feed in feeds]
if os.path.exists("pages/_data/feed.json"):
with open("pages/_data/feed.json", "r") as f:
existing_feed = json.load(f)
existing_posts = [post.get("id", "") for post in existing_feed if post and post.get("id")]
print("Found", len(existing_posts), "existing posts")
else:
existing_feed = []
existing_posts = []
FEED_IDENTIFICATION = {
"rss+xml": rss.to_activities,
"atom+xml": atom.atom_to_activities,
"html": microformats2.html_to_activities,
"feed+json": jsonfeed.jsonfeed_to_activities,
"json": jsonfeed.jsonfeed_to_activities,
"mf2+json": microformats2.json_to_activities,
"xml": rss.to_activities,
}
results = []
CONVERSION_FUNCTION = jsonfeed.activities_to_jsonfeed
def poll_feed(feed):
try:
resp = requests.get(
feed, headers={"User-Agent": USER_AGENT}, allow_redirects=True, timeout=30
)
except requests.RequestException:
print("Failed to fetch", feed)
return []
if resp.status_code != 200:
print("Failed to fetch", feed, "with status code", resp.status_code)
return []
content_type = resp.headers.get("Content-Type", "").split(";")[0].split("/")[1]
if "html" in content_type:
feeds = indieweb_utils.discover_web_page_feeds(feed)
if len(feeds) > 0:
try:
resp = requests.get(
feeds[0].url, headers={"User-Agent": USER_AGENT}, allow_redirects=True, timeout=30
)
content_type = resp.headers.get("Content-Type", "").split(";")[0].split("/")[1]
except requests.RequestException:
print("Failed to fetch", feed)
return []
if content_type not in FEED_IDENTIFICATION:
print("Unsupported feed type", content_type)
return []
if content_type in ["json", "feed+json"]:
activities = CONVERSION_FUNCTION(FEED_IDENTIFICATION[content_type](resp.json())[0])
else:
activities = CONVERSION_FUNCTION(FEED_IDENTIFICATION[content_type](resp.text))
items = activities.get("items", [])
if not items:
print("No items found in", feed)
return []
for item in items:
if not item.get("url"):
item["url"] = feed
# if url doesn't start with https://{domain}, add
if not item["url"].startswith("https://" + urlparse(item["url"]).netloc) and not item["url"].startswith("http://" + urlparse(item["url"]).netloc):
item["url"] = "https://" + urlparse(item["url"]).netloc + "/" + item["url"].lstrip("/")
item["domain"] = urlparse(item.get("url")).netloc
try:
item["date_published"] = parser.parse(item.get("date_published", None)).strftime("%Y-%m-%d")
except:
item["date_published"] = today
if item.get("content_html"):
del item["content_html"]
items = [item for item in items if item.get("id") and item["id"] not in existing_posts]
print("Found", len(items), "new items in", feed)
return items
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(executor.map(poll_feed, feeds))
results = [item for sublist in results for item in sublist]
for item in existing_feed:
if item.get("id") not in existing_posts:
results.append(item)
results = sorted(results, key=lambda x: x.get("date", ""), reverse=True)
with open("pages/_data/feed.json", "w+") as f:
json.dump([result for result in results if result], f, indent=2)