-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaio_bot_parser.py
196 lines (159 loc) · 6.5 KB
/
aio_bot_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""
Пример создания асинхронного бота Telegram с функцией получения новостей от 3dnews.
Стэк: aiogram, aiosqlite, aiohttp, apscheduler, beatifulsoap
Алгоритм работы
Каждые 30 минут получаем новости с https://3dnews.ru/news/.
Парсим их и записываем в базу со статусом непрочитано.
По команде /news отправляем в канал 5 постов и меняем им статус на прочитано.
Если все посты прочитаны, то говорим, что нет постов.
"""
import logging
import os
from datetime import datetime
import aiosqlite
from aiogram import Bot, Dispatcher, executor, types
from aiogram.utils.markdown import hlink
from aiohttp import ClientSession
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from bs4 import BeautifulSoup
from dotenv import load_dotenv
load_dotenv()
TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN')
bot = Bot(token=TELEGRAM_TOKEN) # Объект бота
dp = Dispatcher(bot) # Диспетчер для бота
# Включаем логирование. Пишем логи в файл task_aio_bot_parser.log
logging.basicConfig(
filename='task_aio_bot_parser.log',
level=logging.DEBUG,
)
def parse_posts(raw_text: str) -> dict:
"""Парсим посты с 3dnews.ru
:parm: html as str
:return: dict[post_number]: (post_text, post_href, post_img)
"""
try:
data = BeautifulSoup(raw_text, features='html.parser')
posts = data.find_all('div', class_='article-entry')
all_posts = {}
for post in posts:
post_number = int(post.get('id'))
post_text = post.find('a', class_='entry-header')
post_href = post_text.get("href")
if post_href[:5] != 'https':
post_href = f'https://3dnews.ru{post_href}'
# post_img = post.find('img', class_='imageInAllFeed').get('src')
# post_img = f'https://3dnews.ru{post_img}'
all_posts[post_number] = (
post_text.text,
post_href,
# post_img
)
return all_posts
except Exception as e:
raise Exception(
f'Не удалось распарсить посты: {e}')
async def get_news() -> str:
"""
Запрашиваем новости с 3dnews.ru и возвращаем HTML
:return: str
"""
async with ClientSession() as session:
url = 'https://3dnews.ru/news/'
async with session.get(url=url) as response:
return await response.text()
async def get_and_parse_news() -> dict:
"""Парсим HTML и возвращаем dict
:return: dict[post_number] = (
post_text.text,
post_href)
"""
posts_html = await get_news()
return parse_posts(posts_html)
async def create_table() -> None:
"""
Создаем таблицу для хранения постов
"""
async with aiosqlite.connect('3dnews.db') as db:
await db.execute('CREATE TABLE IF NOT EXISTS posts'
'(post_id INTEGER PRIMARY KEY, title TEXT, url TEXT, status INTEGER, date TEXT)')
await db.commit()
async def save_to_db(post_id, title, url) -> None:
"""
Сохраняем пост в базу
"""
async with aiosqlite.connect('3dnews.db') as db:
try:
await db.execute('INSERT INTO posts VALUES (?, ?, ?, ?, ?)',
(post_id, title, url, 0, datetime.now()))
await db.commit()
except:
pass
async def load_new_posts() -> dict:
"""
Загружаем 5 новых постов из базы и ставим им статус просмотренных
:return: None or dict[post_number]: (post_text, post_href)
"""
result = {}
async with aiosqlite.connect('3dnews.db') as db:
async with db.execute('SELECT * FROM posts WHERE status = 0 LIMIT 5') as cursor:
async for row in cursor:
result[row[0]] = (row[1], row[2])
if result == {}:
return None
for item in result:
await set_post_status(item, 1)
return result
async def set_post_status(post_id, status: int) -> None:
"""
Поменять статус поста на status
"""
async with aiosqlite.connect('3dnews.db') as db:
await db.execute('UPDATE posts SET status = ? WHERE post_id = ?',
(status, post_id))
await db.commit()
async def sched_get_news_to_db() -> None:
"""
Функция для запроса постов и сохранения их в базе.
Одинаковые посты не сохраняются
"""
posts_dict = await get_and_parse_news()
for key, value in posts_dict.items():
await save_to_db(key, value[0], value[1])
# Хэндлер на команду /news
@dp.message_handler(commands='news')
async def cmd_news(message: types.Message):
new_posts = await load_new_posts()
if new_posts is None:
await message.answer("Новых постов еще нет")
else:
for key, value in new_posts.items():
await message.answer(hlink(value[0], value[1]),
parse_mode="HTML",
disable_web_page_preview=False
)
@dp.message_handler(commands="start")
async def cmd_start(message: types.Message):
keyboard = types.ReplyKeyboardMarkup(resize_keyboard=True, selective=True)
keyboard.add("/news")
await message.answer(
"Привет! Это CrazyBot, давай почитаем новости с 3DNews!"
"Отправь команду /news для получения новых постов",
reply_markup=keyboard
)
@dp.message_handler(commands="help")
async def cmd_help(message: types.Message):
await cmd_start(message=message)
async def on_startup_init(dp) -> None:
"""
Создаем базу, если ее нет
"""
await create_table()
await sched_get_news_to_db()
if __name__ == "__main__":
scheduler = AsyncIOScheduler() # Запись постов в БД по расписанию
scheduler.add_job(sched_get_news_to_db, 'interval', minutes=30)
scheduler.start()
# Запуск бота
executor.start_polling(dp,
skip_updates=True,
on_startup=on_startup_init)