Проблема массовой рассылки в Телеграмме 2.0

Рейтинг: 0Ответов: 1Опубликовано: 08.08.2023

Вводные данные

Я пишу телеграмм бота на aiogram, его главная функция - отправка уведомлений пользователям. Это сообщение об мероприятиях. Пользователей ожидается много(от 100 до 5000), хочу чтобы все было отказоустойчивым. База данных асинхронная.

Проблема

Как отправить допустим 1 сообщений 200 пользователям моментально и не попасть в лимит телеграмма (20 или 30 сообщений в секунду)

Мои мысли

  1. Очередь с приоритетом на отправку сообщений сразу приходит на ум. Но как её организовать? Думаю можно заменить во всем коде dp.bot.send_message() своей функцией, допустим messagequerry.add(chat_id: int, text:str, priority:int). Сделать связанный список, держать его в бд и постепенно отсылать сообщения, не выходя из лимита 20 сообщений/сек, т.е. asyncio.sleep(0.1). Но 200 сообщений вызовут как минимум 400 обращений к бд. Это не особо страшно, более того они растянуты во времени. Но хотелось бы оптимизировать. Как это оптимизировать?
  2. Сделать рандомный timeout? - но есть вероятность попасть в лимит при большом кол-ве участников и маленьком range

Похожий вопрос к сожалению не дал ответа, хотя проблема по логике частая, возможно искал по неправильным тегам.

Ответы

▲ 1

В общем наступил блок времени работы и из-за отсутствия идей решил на отчаянный метод - ChatGPT. Но на самом деле ответ более менее приемлемый. Может кому-то тоже поможет данный код.

import asyncio import random from collections import deque

class MessageQueue:
    def __init__(self):
        self.queue = deque()
        self.running = False
        self.max_messages_per_second = 20
        self.message_interval = 1 / self.max_messages_per_second

    async def _send_message(self, message):
        # Здесь будет ваша логика отправки сообщения через Telegram API
        # dp.bot.send_message(chat_id=message['chat_id'], text=message['text'])
        print(f"Sending message: {message['text']} to chat {message['chat_id']}")
        await asyncio.sleep(random.uniform(0.1, 0.2))  # Замените на реальную отправку

    async def _message_consumer(self):
        while self.queue:
            message = self.queue.popleft()
            await self._send_message(message)
            await asyncio.sleep(self.message_interval)

    def add_message(self, chat_id, text, priority=0):
        self.queue.append({'chat_id': chat_id, 'text': text, 'priority': priority})
        self.queue = deque(sorted(self.queue, key=lambda x: x['priority'], reverse=True))

        if not self.running:
            self.running = True
            asyncio.create_task(self._message_consumer())

# Пример использования async def main():
    message_queue = MessageQueue()

    for i in range(200):
        # В реальном приложении, вы бы получали chat_id и текст из базы данных
        chat_id = i + 1
        text = f"This is message {i + 1}"
        priority = random.randint(1, 10)
        message_queue.add_message(chat_id, text, priority)

    await asyncio.sleep(60)  # Подождать некоторое время, чтобы все сообщения отправились

asyncio.run(main())

Даа, сам задал вопрос, сам ответил(