Это сделано умышленно, дабы в случае ошибки не породилось бесконечное дерево процессов.
Но обойти ограничение возможно. Для этого нужно сделать два трюка:
- создать тип процессов, которые игнорируют
daemon
атрибут
- перехватить создание нового процесса для пула.
import logging
from multiprocess.pool import Pool as PythonPool
import multiprocess.context
import pathos.multiprocessing as mp
DefaultProcessFactory = PythonPool.Process
_clazz_cache = {}
class NoDaemonMixin:
def _get_daemon(self):
logging.getLogger("nodaemon.class").debug("NoDaemon._get_daemon()")
return False
def _set_daemon(self, value):
logging.getLogger("nodaemon.class").debug("NoDaemon._set_daemon(%s) - ignored", value)
daemon = property(_get_daemon, _set_daemon)
if hasattr(multiprocess.context, "SpawnProcess"):
class NoDaemonSpawnProcess(NoDaemonMixin, multiprocess.context.SpawnProcess):
pass
_clazz_cache[multiprocess.context.SpawnProcess] = NoDaemonSpawnProcess
if hasattr(multiprocess.context, "ForkProcess"):
class NoDaemonForkProcess(NoDaemonMixin, multiprocess.context.ForkProcess):
pass
_clazz_cache[multiprocess.context.ForkProcess] = NoDaemonForkProcess
if hasattr(multiprocess.context, "ForkServerProcess"):
class NoDaemonForkServerProcess(NoDaemonMixin, multiprocess.context.ForkServerProcess):
pass
_clazz_cache[multiprocess.context.ForkServerProcess] = NoDaemonForkServerProcess
Этот код создаёт недемонические обёртки для классов, которые используются в стандартной библиотеке для разных стратегий порождения подпроцессов. Приходится проверять наличие классов, так как для разных ОС набор классов в multiprocess.context
различается.
Теперь как перехватить порождение подпроцессов.
Внутри pathos.multiprocessing
использует multiprocess.pool.Pool
, поэтому нужно перехватить вызов статического метода Pool.Process
который порождает новый процесс. Почему-то простой подмены контекста оказалось недостаточно - перестал работать маршалер параметров для pool.map
from multiprocess.pool import Pool as PythonPool
DefaultProcessFactory = PythonPool.Process
def _process(_, ctx, *args, **kwargs):
log = logging.getLogger("nodaemon.factory")
log.debug("MyPool.Process(): ctx=%s, args=%s, kwargs=%s", ctx, args, kwargs)
if ctx.Process in _clazz_cache:
wrap_clz = _clazz_cache[ctx.Process]
else:
raise TypeError(f"Unsupported process class: {ctx.Process}")
try:
log.debug("Creating process instance: process class %s", wrap_clz)
return wrap_clz(*args, **kwargs)
finally:
log.debug("MyPool.Process() done")
class MyProcessingPool(mp.ProcessingPool):
def _serve(self, nodes=None):
log = logging.getLogger("nodaemon.pool")
log.debug("MyProcessingPool._serve(nodes=%s)", nodes)
restore = PythonPool.Process is not _process
PythonPool.Process = _process
try:
return super(MyProcessingPool, self)._serve(nodes)
finally:
if restore:
log.debug("Restoring PythonPool.Process factory")
PythonPool.Process = DefaultProcessFactory
def restart(self, force = False):
log = logging.getLogger("nodaemon.pool")
log.debug("MyProcessingPool.restart(force=%s)", force)
restore = PythonPool.Process is not _process
PythonPool.Process = _process
try:
return super().restart(force=force)
finally:
if restore:
log.debug("Restoring PythonPool.Process factory")
PythonPool.Process = DefaultProcessFactory
Как оно работает: (полный пример)
def sub_worker(x):
print(f"sub_worker({x=})")
return x * 2
def worker(max_num):
print(f"worker({max_num=})")
sub_pool = MyProcessingPool()
try:
return sub_pool.map(sub_worker, range(max_num))
finally:
sub_pool.close()
sub_pool.join()
def main():
# logging.basicConfig(level=logging.DEBUG)
# logging.getLogger("nodaemon").setLevel(logging.INFO)
pool = MyProcessingPool()
result = pool.map(worker, [1, 2, 3, 4])
print(result)
pool.close()
pool.join()
вывод:
worker(max_num=1)
worker(max_num=2)
worker(max_num=3)
worker(max_num=4)
sub_worker(x=0)
sub_worker(x=0)
sub_worker(x=1)
sub_worker(x=2)
sub_worker(x=3)
sub_worker(x=0)
sub_worker(x=1)
sub_worker(x=0)
sub_worker(x=1)
sub_worker(x=2)
[[0], [0, 2], [0, 2, 4], [0, 2, 4, 6]]
Тем не менее, несмотря на возможность порождения недемонических процессов в пуле, этим следует пользоваться очень, очень осторожно.
Во-первых, перехват фабрики проссов - это очень хрупко. Что-нибудь обязательно пойдёт не так, если несколько видов пулов будут использоваться одновременно.
Во-вторых, есть риск при небрежном порождении деток заспамить всю систему до полнейших тормозов или даже отправить её в перезагрузку.