Как запустить повторяющуюся блокирующую функцию в разных threads с помощью asyncio, python

Рейтинг: 0Ответов: 2Опубликовано: 23.01.2023

Есть блокирующая функция, для примера возьмем:

def blockme(n):
    x = random.random() * 2.0
    time.sleep(x)
    return n, x

(На самом деле мне нужны реквесты по API, но в контексте данного вопроса это не важно)

Мне надо запускать эту функцию в бесконечном цикле, причем каждый следующий запуск не должен дожидаться окончания работы предыдущих.

Как это сделать с помощью модуля threading я, в общих чертах, понимаю.

if __name__ == '__main__':
    i = 0
    while True:
        i += 1
        threading.Thread(target=blockme, args=(i,)).start()

Но у меня возникли вопросы, которые скорее носят исследовательский характер нежели практический:

  1. как это сделать с помощью asyncio, так как asyncio также поддерживает треды и прочие concurrent.futures
  2. что не менее важно, а стоит ли это делать с помощью asyncio? Что даст asyncio? Простоту синтаксиса? Более читаемый код?
  3. а вообще это возможно?

Я уже задал этот вопрос на Stackoverflow.com, правда с момента, как я его задал я уже получил ряд промежуточных знаний и поэтому вопрос модифицировался и отличается от оригинального. На данный момент, обсуждение на SO.com продолжается, задача не решена, но я получил следующую проблему. Один из участников предложил решение:

import asyncio
import random
import time

def blockme(n):
    x = random.random() * 2.0
    time.sleep(x)
    return n, x

def cb(fut):
    print("Result", fut.result())
    
async def main():
    loop = asyncio.get_event_loop()
    futs = []
    for n in range(20):
        fut = loop.run_in_executor(None, blockme, n)
        fut.add_done_callback(cb)
        futs.append(fut)
    await asyncio.gather(*futs)
    # await asyncio.sleep(10)

asyncio.run(main())

Тут он еще добавил футуры, подразумевая, что нам еще нужны результаты работы (для моей задачи это не обязательно, если ваше решение будет без футур - мне тоже подойдет). И в таком виде, как он написал - все как-будто работает. Но тут есть нюанс: он утверждает, что если заменить for n in range(20) на while True, то все тоже будет работать, но у меня так не работает. Код что-то начинает делать в бесконечном цикле, но никакого вывода результатов не происходит. У меня есть подозрение, что то, что лежит в теле цикла не запускает функцию, а лишь определяет и собирает футуры в список. И без await asyncio.gather(*futs) работать не будет, а значит, задачу не решает, так как бесконечный цикл невозможен.

Так что есть еще вопросы:

  1. А у вас этот код работает, если заменить ограниченый цикл на бесконечный?
  2. Если - "да", то есть варианты почему он не работает у меня? Проблема в окружении (версии модулей, настройки ОС и т.д.)?

PS. На русском SO я нашел еще вот такой вопрос, он похож, но во-первых, нет речи про бесконечный цикл, а, во-вторых, предложенное решение, все равно использовало threading в явном виде и коллеги критиковали использование asyncio, но одно дело - критиковать, а другое - можно ли это в принципе сделать.

Ответы

▲ 0Принят

Футуры выполняются при вызове await другой функции или при выходе в другую асинхронную функцию.

while True: записывай как while await asyncio.sleep(0, result=True): и тогда треды будут запускаться на каждом круге.

Экзекутор нужно определить чтоб ограничить количество тредов. По умолчанию количество ядер умноженное на 5.

pool = concurrent.futures.ThreadPoolExecutor(max_workers=40)

...

while  await asyncio.sleep(0, result=True):
    fut = loop.run_in_executor(pool, blockme, n)
    ....

Но при этом футуры будут всё ещё набиваться в очередь. Не знаю что стоит за причиной запускать бесконечное число потоков, но тут надо бы какой семафор поставить чтоб не забивать. Хотя бы тупой

if len(futs)>40:
   completed, futs = await asyncio.wait(futs, timeout=5)
▲ 0

С определенными оговорками ответ выглядит так

import asyncio
from asyncio import FIRST_COMPLETED
import random
import time
import concurrent

def blockme(n):
    x = random.random() * 2.0
    time.sleep(x)
    return n, x

def cb(fut):
    print("Result", fut.result())

async def main():
    #Требуется контролировать количество потоков
    pool = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    loop = asyncio.get_event_loop()
    futs_set = set()
    n = 0

    #Требуется контролировать количество активных футур
    futures_limit = 40

    #Требуется контролировать частоту запуска блокирующих функций
    delay = 0.15

    while True:
        await asyncio.sleep(delay)
        fut = loop.run_in_executor(pool, blockme, n)
        fut.add_done_callback(cb)
        futs_set.append(fut)

        #Требуется организовать логику, 
        #для того, чтобы футуры не копились бесконечно, например:
        if len(fut_set) >= futures_limit:
            completed, fut_set = await asyncio.wait(
                                       fut_set, 
                                       timeout=5, 
                                       return_when=FIRST_COMPLETED
                                       )

asyncio.run(main())