Экзекьюторы предназначены в первую очередь для запуска синхронного кода асинхронно (в отдельном потоке или процессе). Экзекьютор первым параметром должен принимать обычный callable объект, например, синхронную функцию, а вы передаете асинхронную.
Асинхронная функция при обычном вызове (без await
) возвращает не результат, а awaitable объект, но в дочернем процессе await
от этого объекта не вызывается, из-за этого фактически функция не запускается и выходит предупреждение, что она was never awaited
.
В целом, если убрать весь асинхронный синтаксис и заменить await asyncio.sleep
на time.sleep
, все заработает:
import time
from concurrent.futures import ProcessPoolExecutor
def test1(text: str):
for _ in range(2):
time.sleep(1)
print(text.upper())
def test2(text: str):
while True:
time.sleep(1)
print(text.lower())
def test(text: str):
while True:
time.sleep(1)
print(text.lower())
def main():
futures = []
with ProcessPoolExecutor(max_workers=2) as executor:
future1 = executor.submit(test1, "lored")
future2 = executor.submit(test2, "hello")
future3 = executor.submit(test, "nono")
futures.extend([future1, future2, future3])
if __name__ == "__main__":
main()
Вывод:
LORED
hello
LORED
hello
hello
nono
...
Чтобы работало в асинхронном main
с асинхронным синтаксисом, нужно использовать loop.run_in_executor
вместо submit
(тогда можно будет, например, принудительно дожидаться завершения задачи через await future1
). Но опять же, функции test1
, test2
, test
остаются синхронными.
import asyncio
from concurrent.futures import ProcessPoolExecutor
import time
def test1(text: str):
for _ in range(2):
time.sleep(1)
print(text.upper())
def test2(text: str):
while True:
time.sleep(1)
print(text.lower())
def test(text: str):
while True:
time.sleep(1)
print(text.lower())
async def main():
loop = asyncio.get_running_loop()
futures = []
with ProcessPoolExecutor(max_workers=2) as executor:
future1 = loop.run_in_executor(executor, test1, "lored")
future2 = loop.run_in_executor(executor, test2, "hello")
future3 = loop.run_in_executor(executor, test, "nono")
futures.extend([future1, future2, future3])
if __name__ == "__main__":
asyncio.run(main())
Чтобы запускать асинхронные функции из экзекьютора, их нужно запустить через asyncio.run
(точно так же, как вы запускаете main
из основного процесса). Можно написать общую запускающую синхронную функцию, и из нее уже вызвать асинхронные:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import time
async def test1(text: str):
for _ in range(2):
await asyncio.sleep(1)
print(text.upper())
async def test2(text: str):
while True:
await asyncio.sleep(1)
print(text.lower())
async def test(text: str):
while True:
await asyncio.sleep(1)
print(text.lower())
def wrapper(async_func, *args):
asyncio.run(async_func(*args))
async def main():
loop = asyncio.get_running_loop()
futures = []
with ProcessPoolExecutor(max_workers=2) as executor:
future1 = loop.run_in_executor(executor, wrapper, test1, "lored")
future2 = loop.run_in_executor(executor, wrapper, test2, "hello")
future3 = loop.run_in_executor(executor, wrapper, test, "nono")
futures.extend([future1, future2, future3])
if __name__ == "__main__":
asyncio.run(main())