-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfullfeed
executable file
·161 lines (137 loc) · 4.67 KB
/
fullfeed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# fullfeed is a script to convert partial RSS/Atom feeds to full feeds.
# The basic usage is
#
# fullfeed <file>
#
# fullfeed can also read stdin data, e.g.,
#
# curl -sL https://www.economist.com/asia/rss.xml | fullfeed
#
# For more options run,
#
# fullfeed -h
#
# Sometimes RSS/Atom feeds do not contain the full text of the articles
# they link to. Feed publishers do this so that the user actually
# visits the site to read the full article (to generate more ad revenue
# presumably).
#
# fullfeed attempts to solve this problem by fetching the page each item
# in the feed links to and extracting the main body text using the
# Python Readability library.
#
# To use fullfeed, you need a feed reader (e.g., Liferea, feed2imap,
# etc.) that supports using a filtering command, or has the ability to
# read feeds from a file. To use fullfeed with Liferea, while adding
# the feed, choose advanced options and set fullfeed as the conversion
# filter.
#
# Requires: Python 3 with readability-lxml and requests
#
import argparse
import os
import re
import requests
import sys
from hashlib import md5
from multiprocessing.pool import ThreadPool
from readability import Document
from urllib.parse import urlparse
from xml.etree.ElementTree import ElementTree, SubElement
# Number of threads.
NUM_THREADS = 10
# Minimum number of characters required in the text to
# determine if the text extraction was successful.
TEXT_THRESHOLD = 140
# Cache directory to store downloaded articles.
CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache/fullfeed")
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
# User agent.
USER_AGENT = "Mozilla/5.0 (compatible)"
def digest(string):
"""Return the MD5 digest of a string."""
return md5(string.encode("utf-8")).hexdigest()
def fulltext(url, force=False, strip_links=False):
"""Extract full text from the URL."""
# Attempt to see if we've already downloaded the article before.
cache_file = os.path.join(CACHE_DIR, digest(url) + ".html")
if os.path.exists(cache_file) and not force:
with open(cache_file, "r") as fd:
text = fd.read()
else:
html = requests.get(
url,
timeout=10,
headers={"User-Agent": USER_AGENT, "Referer": urlparse(url).netloc},
).text
text = Document(html).summary(html_partial=True)
if len(text) < TEXT_THRESHOLD:
return None
else:
with open(cache_file, "w") as fd:
fd.write(text)
if strip_links:
text = re.sub(r"</?a(?:(?= )[^>]*)?>", "", text)
return text
def fullfeed(args):
"""Extract full text by reading the feed from the file descriptor."""
tree = ElementTree()
tree.parse(args.file)
# First, extract all URLs from entries.
urls = []
for channel in tree.findall("channel"):
for item in channel.findall(".//item"):
link = item.find("link").text
urls.append(link)
# Restrict number of processed links if required.
if len(urls) >= args.num_items > 0:
break
# Now, visit each URL and extract full text.
entries = {}
update = lambda url: entries.update(
{digest(url): fulltext(url, force=args.force, strip_links=args.strip_links)}
)
pool = ThreadPool(NUM_THREADS)
pool.map(update, urls)
# Update feed with full text.
for channel in tree.findall("channel"):
for item in channel.findall(".//item"):
link = item.find("link").text
# If the description element is missing, create it.
description = item.find("description")
if description is None:
description = SubElement(item, "description")
text = entries.get(digest(link), description)
if text:
description.text = text
tree.write(sys.stdout, encoding="unicode")
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser(
prog="fullfeed", description="convert RSS/Atom partial feeds to full feeds"
)
arg_parser.add_argument(
"-f", "--force", action="store_true", help="force overwrite cache"
)
arg_parser.add_argument(
"-n", "--num-items", default=0, type=int, help="process first n items only"
)
arg_parser.add_argument(
"-s",
"--strip-links",
action="store_true",
default=False,
help="strip <a> tags from HTML",
)
arg_parser.add_argument(
"file",
help="file with the feed",
nargs="?",
type=argparse.FileType("r"),
default=sys.stdin,
)
args = arg_parser.parse_args()
sys.exit(fullfeed(args))