Именно так, как вы хотите вызвать асинхронный метод из __init__
не получится - await
в __init__
использовать нельзя, т.к. это синхронный метод, и асинхронным он не должен быть (попытка сделать его асинхронным приведет к ошибке TypeError: __init__() should return None, not 'coroutine'
при создании объекта).
В простых случаях можно просто в асинхронной функции создавать объект, потом от него же вызывать асинхронный метод:
class RepoBase:
def __init__(self, path):
self.path = path
async def get_connect(self) -> aiosqlite.Connection:
con = await aiosqlite.connect(self.path)
return con
async def something():
connect = await RepoBase(path).get_connect()
Если дополнительно требуется освобождение ресурсов, можно превратить ваш класс в асинхронный менеджер контекста, использовать его с конструкцией async with
, в самом классе возвращать соединение через метод __aenter__
, в __aexit__
освобождать ресурсы:
class RepoBase:
def __init__(self, path):
self.path = path
async def get_connect(self) -> aiosqlite.Connection:
con = await aiosqlite.connect(self.path)
return con
async def __aenter__(self) -> aiosqlite.Connection:
return await self.get_connect()
async def __aexit__(self, *args):
# Тут можно добавить какой-то код,
# который будет выполняться при выходе из блока async with
pass
async def main():
async with RepoBase() as connection:
...
# Делаете что вам нужно с полученным соединением
Если нужно, чтобы из объекта можно было получать соединение несколько раз, и оно все время было одно и то же (не открывалось новое соединенеи), можно к методу get_connect
прикрутить кэширование, например с помощью модуля async_lru
:
import asyncio
import random
from async_lru import alru_cache
class RepoBase:
def __init__(self, path):
self.path = path
@alru_cache()
async def get_connect(self):
print("Obtained connection")
con = random.randint(0, 1000)
return con
async def __aenter__(self):
return await self.get_connect()
async def __aexit__(self, *args):
pass
def __hash__(self) -> int:
# При кэшировании равными будут объекты с одинаковым путём
print("hashing")
return hash(self.path)
async def main():
repo_base = RepoBase(12312)
print("Object created")
async with repo_base as connection:
print(connection)
async with repo_base as connection:
print(connection)
asyncio.run(main())
Вывод:
Object created
hashing
Obtained connection
324
hashing
324
По поводу запуска асинхронной функции из __init__
через asyncio.run
из соседнего ответа:
В этом коде self.conn
будет заполнено в неопределенный момент. В идеале, асинхронная функция должна сохранить результат в Future
, от которого потом нужно будет сделать await
, чтобы получить результат, примерно так :
# ПОЛУРАБОЧИЙ КОД, НЕ ИСПОЛЬЗОВАТЬ
class RepoBase:
def __init__(self, path):
self.path = path
self.__connect_future = asyncio.Future()
asyncio.run(self.init_connect())
async def init_connect(self):
con = random.randint(0, 1000)
self.__connect_future.set_result(con)
async def get_connect(self):
return await self.__connect_future
Но с этим кодом будут возникать разные интересные проблемы, начиная с того, что
если объект будет создаваться из асинхронной функции, то вылетит с ошибкой RuntimeError: asyncio.run() cannot be called from a running event loop
.
Для других вариантов создания объекта будут свои варианты отказа, которые можно долго и весело дебажить. Так что проще не смешивать синхронный и асинхронный код, особенно внутри __init__
, а использовать самый первый вариант из этого ответа.