Ок, я пришёл к этому решению (с демострацией вызова):
from queue import Queue, Empty
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
async def main():
def concrete_callback(process):
# UI, log or something update
logger.info(f'Iteration complete with: {process}...')
def concrete_done():
# result and errors handling
logger.info('All iterations complete!')
class Operation:
def __init__(self_operation, progress_callback, done_callback):
self_operation.progress = progress_callback
self_operation.done = done_callback
@staticmethod
def progress(*args, **kwargs):
pass
@staticmethod
def done():
pass
def __call__(self_operation, cls):
class AsyncGeneratorWrapper():
def __init__(self, *args, **kwargs):
self.running = False
self.canceled = False
self._execute_queue = Queue(256)
self.generator = cls(*args, **kwargs)
def __iter__(self):
# deconstruction hack for convenient syntax
self._execute_queue.put(self._step_execute_task)
yield from (self._run, self.suspend, self.cancel)
async def _run(self):
self.running = True
try:
while True:
try:
task = self._execute_queue.get(block=False)
except Empty:
break
else:
if await task():
break
self._execute_queue.task_done()
finally:
self.running = False
async def __aenter__(self):
self._execute_queue.put(self._step_execute_task)
self.task = asyncio.create_task(self._run())
return self.suspend, self.cancel
async def __aexit__(self, except_type, except_value, traceback):
await self.task
async def _step_execute_task(self):
if not self.canceled:
try:
target = await (anext(self.generator))
except StopAsyncIteration:
self_operation.done()
return True
else:
self_operation.progress(target)
self._execute_queue.put(self._step_execute_task)
async def _suspend_task(self):
if not self.canceled:
if self.running:
logger.debug('Suspend!')
# break _run loop
return True
else:
raise self.InvalidOperationStateError('Operation is not running.', self)
else:
raise self.InvalidOperationStateError('Operation is canceled.', self)
def suspend(self):
self._execute_queue.put(self._suspend_task)
return self.resume
def _resume_task(self):
if not self.canceled:
if not self.running:
self._execute_queue.put(self._step_execute_task)
logger.debug('Resume!')
return self._run
else:
raise self.InvalidOperationStateError('Operation in progress', self)
else:
raise self.InvalidOperationStateError('Operation is canceled.', self)
def resume(self):
self._execute_queue.put(self._resume_task())
return self._run()
async def _cancel_task(self):
if not self.canceled:
self.canceled = True
logger.debug('Cancel!')
# break tasks
try:
while True:
_ = self._execute_queue.get(block=False)
self._execute_queue.task_done()
except Empty:
pass
else:
raise self.InvalidOperationStateError('Operation is canceled.', self)
def cancel(self):
self._execute_queue.put(self._cancel_task)
class InvalidOperationStateError(RuntimeError):
'''Raised when an operation enters an invalid state'''
pass
return AsyncGeneratorWrapper
@Operation(progress_callback=concrete_callback, done_callback=concrete_done)
class AsyncGenerator:
def __init__(self, start, stop):
self.current = start
self.start = start
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current < self.stop:
self.current += 1
await asyncio.sleep(1)
return self.current
else:
raise StopAsyncIteration
async def simulate_pause(suspend_callback):
await asyncio.sleep(2)
# get asynchronously resume callback and suspend operation
resume_callback = suspend_callback()
# wait
await asyncio.sleep(2)
# comeback
await resume_callback()
async def simulate_stop(cancel_callback):
await asyncio.sleep(6)
cancel_callback()
# Examples
async with AsyncGenerator(0, 5) as (suspend, cancel):
logger.info('__')
logger.info('ContextManager: Test Suspend, Resume')
await asyncio.sleep(1)
# get asynchronously resume callback and suspend operation
resume_callback = suspend()
# wait
await asyncio.sleep(2)
# comeback
await resume_callback()
async with AsyncGenerator(0, 5) as (suspend, cancel):
logger.info('__')
logger.info('ContextManager: Test Suspend only')
await asyncio.sleep(1)
# get asynchronously resume callback and suspend operation
resume_callback = suspend()
async with AsyncGenerator(0, 5) as (suspend, cancel):
logger.info('__')
logger.info('ContextManager: Test Cancel')
await asyncio.sleep(2)
cancel()
async with AsyncGenerator(0, 5) as (suspend, cancel):
logger.info('__')
logger.info('ContextManager: Test run other code')
await asyncio.sleep(2)
print('text printed')
async with AsyncGenerator(0, 5) as (suspend, cancel):
logger.info('__')
logger.info('ContextManager: Test run only without control')
logger.info('__')
logger.info('Without ContextManager: Test Suspend, Resume, Cancel:')
run1, suspend1, cancel1 = AsyncGenerator(0, 10)
_ = await asyncio.wait([asyncio.create_task(coro) for coro in [run1(), simulate_pause(suspend1), simulate_stop(cancel1)]])
logger.info('__')
logger.info('Without ContextManager: Run only without control:')
run2, *_ = AsyncGenerator(0, 5)
await run2()
logger.info('__')
logger.info('main done')
asyncio.run(main())
Потокобезопасен ли декоратор Operation
?
Насколько плох текущий код?