Как дождаться завершения потока в асинхронном коде

Рейтинг: 2Ответов: 1Опубликовано: 17.06.2023

У меня есть телеграмм бот, написанный на aiogram взаимодействующий с БД - aiomysql. И функция, которая принимает нажатие кнопки, имеет подпоток, в котором for loop обходит массив, создает конечное сообщение и отправляет его обратно в функцию callback_query_keyboard:

from aiogram import Bot, Dispatcher, executor, types
import keyboards as kb
from multiprocessing import Process
from threading import Thread
import asyncio
import queue
import parser
import db

...

def create_message(response, out_queue):
    message = ''

    for i in response:
        message += f'• <a href="{i.get("link")}">{i.get("article")}</a>\n\n'

    out_queue.put(message)



@dp.callback_query_handler()
async def callback_query_keyboard(callback_query: types.CallbackQuery):
    out_queue = queue.Queue()

    await connection.create_pool()
    response = await connection.get_data(callback_query.data)

    Thread(target=create_message, args=[response, out_queue]).start() #funciton create_message is run

    await bot.send_message(callback_query.from_user.id, text=f'{out_queue.get(block=False)}', parse_mode='HTML', disable_web_page_preview=True)

...

def start_parsing():
    asyncio.run(parser.main())


if __name__ == '__main__':
    Process(target=start_parsing).start()
    executor.start_polling(dp, skip_updates=True)

Проблема заключается в том, что после запуска потока функция продолжает работать, а когда интерпретатор доходит до строки, где пользователь получает сообщение, ничего не происходит, и до завершения потока бот перестает отвечать. Я исправил эту ошибку, добавив await asyncio.sleep():

 Thread(target=create_message, args=[response, out_queue]).start()

    await asyncio.sleep(0.2)  <--

    await bot.send_message(callback_query.from_user.id, text=f'{out_queue.get(block=False)}', parse_mode='HTML', disable_web_page_preview=True)

Но я думаю, что это не лучшее решение проблемы, ведь:

  1. Функция не работает, если поток работает более 0,2 секунды.

  2. Функция простаивает, если подпоток выполнился раньше времени, указанного в await asyncio.sleep(0.2)

Вопрос: Как заменить await asyncio.sleep(0.2) на что-то лучшее или как переписать эту часть кода?

Ответы

▲ 3Принят
# 1
def create_message(response):
    message = ''
    for i in response:
        message += f'• <a href="{i.get("link")}">{i.get("article")}</a>\n\n'
    return message

@dp.callback_query_handler()
async def callback_query_keyboard(callback_query: types.CallbackQuery):
    await connection.create_pool()
    response = await connection.get_data(callback_query.data)

    message = await asyncio.to_thread(create_message, response)
    await bot.send_message(callback_query.from_user.id, text=message, parse_mode='HTML', disable_web_page_preview=True)

# 2 в чем проблема этого кода?
@dp.callback_query_handler()
async def callback_query_keyboard(callback_query: types.CallbackQuery):
    await connection.create_pool()
    response = await connection.get_data(callback_query.data)

    message = ''
    for i in response:
        message += f'• <a href="{i.get("link")}">{i.get("article")}</a>\n\n'
    await bot.send_message(callback_query.from_user.id, text=message)