-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfetch_tweets.py
53 lines (37 loc) · 1.68 KB
/
fetch_tweets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from twarc import Twarc2, expansions
import datetime
import json
import sys
with open('tokens/bearer_token', 'r') as file:
BEARER_TOKEN = file.read()
client = Twarc2(bearer_token=BEARER_TOKEN)
def append_file(file_name, result):
with open(file_name, 'a+') as filehandle:
for tweet in result:
filehandle.write('%s\n' % json.dumps(tweet))
def main():
"""
Collect tweets from a time period (in recent 7 days) through a query,
or specify start and end date and time in UTC.
"""
# start_time = datetime.datetime(2022, 1, 14, 0, 0, 0, 0, datetime.timezone.utc)
# end_time = datetime.datetime(2022, 1, 15, 0, 0, 0, 0, datetime.timezone.utc)
# end_time = datetime.datetime.utcnow()
args = sys.argv[1:]
query = ' '.join(args)
# Use recent search endpoint
search_results = client.search_recent(query=query, max_results=10)
# search_results = client.search_recent(query=query, start_time=start_time, end_time=end_time, max_results=10)
# Examples of creating user timelines, finding mentions, followers/following etc.
# search_results = client.timeline(user="twitterdev")
# search_results = client.mentions(user="twitterdev")
# search_results = client.following(user="twitterdev")
# search_results = client.followers(user="twitterdev")
t_query = datetime.datetime.utcnow().strftime('%Y-%m-%d_%H-%M-%S')
file_name = f'{t_query}.txt'
# All Tweets that satisfy the query are returned in pages
for page in search_results:
result = expansions.flatten(page)
append_file(file_name, result)
if __name__ == '__main__':
main()