Python 프로세스 풀 비 데모닉? 수 없기 때문에 이것을 원합니다. 특히 다음과

데모닉이 아닌 파이썬 풀을 만들 수 있습니까? 풀이 내부에 다른 풀이있는 함수를 호출 할 수 있기를 원합니다.

데몬 프로세스는 프로세스를 만들 수 없기 때문에 이것을 원합니다. 특히 다음과 같은 오류가 발생합니다.

AssertionError: daemonic processes are not allowed to have children

예를 들어, function_a를 실행하는 풀이있는 실행 function_b중인 풀이있는 시나리오를 고려하십시오 function_c. 이 함수 체인은 function_b데몬 프로세스에서 실행 중이므로 실패 하며 데몬 프로세스는 프로세스를 만들 수 없습니다.



답변

multiprocessing.pool.Pool클래스는에서 작업자 프로세스 생성 __init__악마의를하게하고이를 시작 방법을, 그리고 자신의-다시 설정 할 수 없습니다 daemon에 속성을 False가 시작된다 (그리고 나중에 그것이 더 이상 허용하지 않는다) 전에. 그러나 당신은 multiprocesing.pool.Pool( multiprocessing.Pool그냥 래퍼 함수일 뿐이다)의 당신 자신의 하위 클래스를 만들고 당신 자신 의multiprocessing.Process 하위 클래스를 만들고 작업자 프로세스에 사용할 수 있도록 항상 비 데모닉 인 하위 클래스로 수 있습니다.

다음은이를 수행하는 방법에 대한 전체 예입니다. 중요한 부분은 두 개의 클래스입니다 NoDaemonProcessMyPool상단 및 호출 pool.close()하고 pool.join()MyPool끝에 인스턴스입니다.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()


답변

나는 파이썬 3.7에서 데모닉이 아닌 풀을 사용할 필요가 있었고 결국 받아 들여진 답변에 게시 된 코드를 수정했습니다. 아래는 비 데모닉 풀을 생성하는 스 니펫입니다.

import multiprocessing.pool

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)

의 현재 구현이 multiprocessing컨텍스트를 기반으로하도록 광범위하게 리팩토링되었으므로 as 속성 NoDaemonContext이 있는 클래스 를 제공해야 NoDaemonProcess합니다.NestablePool그러면 기본 컨텍스트 대신 해당 컨텍스트를 사용합니다.

즉,이 접근 방식에 대해 최소한 두 가지주의 사항이 있음을 경고해야합니다.

  1. 여전히 구현 세부 사항에 따라 다릅니다. multiprocessing 패키지 므로 언제든지 중단 될 수 있습니다.
  2. multiprocessing데모닉이 아닌 프로세스를 사용하기 어렵게 만든 타당한 이유 가 있습니다 . 그 중 많은 부분이 여기 에 설명되어 있습니다 . 내 생각에 가장 설득력있는 것은 :

하위 프로세스를 사용하여 자식 스레드가 자체 자식을 생성 할 수 있도록 허용하는 경우 하위 프로세스가 완료되고 반환되기 전에 부모 또는 자식 스레드가 종료되면 좀비 ‘손자’의 작은 군대를 만들 위험이 있습니다.


답변

멀티 프로세싱 모듈은 프로세스와 풀을 사용하기 좋은 인터페이스를 가지고 또는 스레드. 현재 사용 사례 multiprocessing.pool.ThreadPool에 따라 외부 풀에 사용 하는 것을 고려할 수 있으며, 이로 인해 스레드 가 생성됩니다 (내부에서 프로세스를 생성 할 수 있음). 아닌 됩니다.

GIL에 의해 제한 될 수 있지만, 저의 특별한 경우 (둘 다 테스트했습니다) , 여기Pool 에서 생성 된 외부 프로세스의 시작 시간 이 .ThreadPool


그것은 스왑 정말 쉽습니다 Processes위한 Threads. 여기 또는 여기 에서 ThreadPool솔루션 사용 방법에 대해 자세히 알아보십시오 .


답변

일부 Python 버전에서 표준 풀을 사용자 지정으로 대체하면 오류가 발생할 수 있습니다 AssertionError: group argument must be None for now..

여기 에서 도움이 될 수있는 솔루션을 찾았습니다.

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc


답변

concurrent.futures.ProcessPoolExecutor이 제한이 없습니다. 전혀 문제없이 중첩 된 프로세스 풀을 가질 수 있습니다.

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

위의 데모 코드는 Python 3.8로 테스트되었습니다.

ProcessPoolExecutor그러나 의 한계 는 maxtasksperchild. 이것이 필요한 경우 대신 Massimiliano답변을 고려하십시오 .

크레딧 : answer by jfs


답변

내가 만난 문제는 모듈간에 전역을 가져 오려고 할 때 ProcessPool () 줄이 여러 번 평가되는 것입니다.

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children

    The root cause is that importing this file from different modules causes this file to be reevalutated each time,
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

그런 다음 코드의 다른 곳에서 안전하게 가져옵니다.

from globals import Globals
Globals().shared_manager
Globals().shared_process_pool
Globals().shared_thread_pool
Globals().shared_lock

pathos.multiprocessing여기에 좀 더 확장 된 래퍼 클래스를 작성했습니다 .

참고로, 사용 사례에 성능 최적화로 비동기 다중 프로세스 맵만 필요한 경우 joblib는 모든 프로세스 풀을 백그라운드에서 관리하고 다음과 같은 매우 간단한 구문을 허용합니다.

squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )


답변

나는 데몬 프로세스가 자식을 낳을 수 있도록하는 당구 (다중 처리 풀 확장) 라는 celery포크를 사용하여이 문제를 다루는 사람들을 보았습니다 . 해결 방법은 다음과 같이 모듈 을 간단히 교체하는 것입니다.multiprocessingmultiprocessing

import billiard as multiprocessing