-
Notifications
You must be signed in to change notification settings - Fork 5
Avoiding flood limits
MessageQueue
in its current form is deprecated and will be reinvented in a future release. See #2139 for a list of known bugs.
Considering Telegram's Bot documentation, currently the maximum amount of messages being sent by bots is limited to 30 messages/second for all ordinary messages and 20 messages/minute for group messages. According to @BotSupport the limit for group also applies to channels (this is not confirmed by Telegram in their documentation however). When your bot hits spam limits, it starts to get 429 errors from Telegram API. And assuming that error handling in such case usually is coded as simple retrials, the running machine would spend a lot of CPU time retrying (or got locked down, depending on bot implementation details). And constantly retrying to send messages while ignoring API errors could result in your bot being banned for some time.
That means, if you're making a production-ready bot, which should serve numerous users it's always a good idea to use throughput limiting mechanism for messages being sent. This way you could be sure that all messages would be delivered to end-users as soon as possible in ordered way.
The previously described throughput-limiting mechanism is provided in the telegram.ext.messagequeue
submodule. It contains a MessageQueue
main class which passes sendmessage callbacks through one or two delay queues (instances of DelayQueue
class) depending on message type (ordinary or group message). The figure below illustrates how it works:
Each DelayQueue
instance runs in a separate thread waiting for new callbacks to appear on internal python's queue.Queue
and calculating the required delays before running these callbacks according to time-window scheme. For providing delays, it relies on time.sleep
and on threading.Queue
locking when there are no new callbacks to dispatch, therefore it's a ridiculously CPU-efficient implementation.
However, you should be aware that callbacks always run in non-main (DelayQueue) thread. That's not a problem for Python-Telegram-Bot lib itself, but in rare cases you may still need to provide additional locking etc to thread-shared resources. So, just keep that fact in mind.
The described time-window delay calculation scheme has many similarities with windowing in DSP theory and is better illustrated on the figure below.
Internally, DelayQueue
performs windowing by keeping track of messages being sent times in a list (at most M-sized), dynamically shrinking and expanding it as needed which is a very robust solution. The throughput limit at M messages/N milliseconds is set as M burst_limit
and N time_limit_ms
arguments. However, be aware that 30 messages/1 second is not the same as 60 messages/2 seconds as the latter would allow sends of 60 messages in a small-time bulk (in this case, should be smaller than 2 seconds), which could quickly lead you directly into banlist. As a result, you should never specify burst_limit
higher than 30 messages (and 20 for group-type message delay).
To compare apples to apples, send times are calculated on the bot side, but the actual spam limiting occurs on Telegram's side. There are numerous additional delays, jitter and buffering effects happening: on the OS networking scheduler, global Internet networking hardware, on the Telegram's servers processing API requests etc. If you're not willing to push all the jam out of Telegram's spam control, we'd recommend specifying stricter MessageQueue
limits than Telegram currently has. Ordinary message throughput limit at 29 messages/1017 milliseconds would work like a charm and give you the 5% safety margin.
Each message being sent is encapsulated in a callback to the corresponding send method. The DelayQueue
class could process any callables with delays, not only encapsulated send callbacks, so it's generic and broadly useful even outside of it's current scope. We use and recommend using telegram.utils.promise.Promise
for that as it allows the delayed exception handling and has a convenient interface.
If you need more details on MQ implementation, follow its docs or read the source (it's short, self-explaining and well-commented). In fact, it's easier to understand its implementation details in Python, than in English ;). Now let's move on to the usage example.
MessageQueue
in its current form is deprecated and will be reinvented in a future release. See #2139 for a list of known bugs.
MessageQueue
module includes a convenient @queuedmessage
decorator, which allows to delegate the required send method calls to MQ. However, it requires you to do a little work by hand, mainly create a telegram.Bot
subclass and decorate those methods.
Below is listed the example of its usage, which is based on echo bot from our Tutorial. Trace through it (it's self-explanatory enough) and try experimenting with it on your own. Don't forget to look at the MessageQueue
docs at the same time to clarify the dark corners. It's important that you properly understand how MQ works before using it.
#!/usr/bin/env python3
# encoding=utf-8
'''
MessageQueue usage example with @queuedmessage decorator.
Provide your bot token with `TOKEN` environment variable or list it in
file `token.txt`
'''
import telegram.bot
from telegram.ext import messagequeue as mq
class MQBot(telegram.bot.Bot):
'''A subclass of Bot which delegates send method handling to MQ'''
def __init__(self, *args, is_queued_def=True, mqueue=None, **kwargs):
super(MQBot, self).__init__(*args, **kwargs)
# below 2 attributes should be provided for decorator usage
self._is_messages_queued_default = is_queued_def
self._msg_queue = mqueue or mq.MessageQueue()
def __del__(self):
try:
self._msg_queue.stop()
except:
pass
@mq.queuedmessage
def send_message(self, *args, **kwargs):
'''Wrapped method would accept new `queued` and `isgroup`
OPTIONAL arguments'''
return super(MQBot, self).send_message(*args, **kwargs)
if __name__ == '__main__':
from telegram.ext import MessageHandler, Filters
from telegram.utils.request import Request
import os
token = os.environ.get('TOKEN') or open('token.txt').read().strip()
# for test purposes limit global throughput to 3 messages per 3 seconds
q = mq.MessageQueue(all_burst_limit=3, all_time_limit_ms=3000)
# set connection pool size for bot
request = Request(con_pool_size=8)
testbot = MQBot(token, request=request, mqueue=q)
upd = telegram.ext.updater.Updater(bot=testbot, use_context=True)
def reply(update, context):
# tries to echo 10 msgs at once
chatid = update.message.chat_id
msgt = update.message.text
print(msgt, chatid)
for ix in range(10):
context.bot.send_message(chat_id=chatid, text='%s) %s' % (ix + 1, msgt))
hdl = MessageHandler(Filters.text, reply)
upd.dispatcher.add_handler(hdl)
upd.start_polling()
Which produces the following results (notice the delays happening, but be aware the timings on the figure don't precisely reflect the actual API delays):
![]() |
---|
Recommendations:
As stated in@queuedmessage
docs, for now the user needs to provide theisgroup
boolean argument to wrapped methods or rely onFalse
default. If you need to use MQ with group-type messages, you could determine the message type by checkingchat_id
(for group-type messages it would be < 0 ). However, this is not officially documented in Telegram's Bot docs and therefore prone to change in future. Use it on your own risk. The more reliable way is to make a request to API to determine chat type before sending message and cache the result. We're working on implementing this approach, so stay tuned.
Example: Using the following wrapper before the @mq.queuedmessage will set the isgroup parameter automatically.
def auto_group(method):
@functools.wraps(method)
def wrapped(self, *args, **kwargs):
chat_id = 0
if "chat_id" in kwargs:
chat_id = kwargs["chat_id"]
elif len(args) > 0:
chat_id = args[0]
if type(chat_id) is str:
is_group = (chat_id == CHANNEL_ID)
else:
is_group = (chat_id < 0)
return method(self, *args, **kwargs, isgroup=is_group)
Feel free to ask on our Telegram Group.
Or you may directly ask the MQ responsive dev:
thodnev @ Telegram (support in English or Russian; please, only MQ-related questions)
Wiki of python-telegram-bot
© Copyright 2015-2022 – Licensed by Creative Commons
- Types of Handlers
- Advanced Filters
- Storing data
- Making your bot persistent
- Adding Defaults
- Exception Handling
- Job Queue
- Arbitrary
callback_data
- Avoiding flood limits
- Frequently requested design patterns
- Code snippets
- Performance Optimizations
- Webhooks
- Telegram Passport
- Bots built with PTB
- Automated Bot Tests