데모닉이 아닌 파이썬 풀을 만들 수 있습니까? 풀이 내부에 다른 풀이있는 함수를 호출 할 수 있기를 원합니다.
데몬 프로세스는 프로세스를 만들 수 없기 때문에 이것을 원합니다. 특히 다음과 같은 오류가 발생합니다.
AssertionError: daemonic processes are not allowed to have children
예를 들어, function_a
를 실행하는 풀이있는 실행 function_b
중인 풀이있는 시나리오를 고려하십시오 function_c
. 이 함수 체인은 function_b
데몬 프로세스에서 실행 중이므로 실패 하며 데몬 프로세스는 프로세스를 만들 수 없습니다.
답변
multiprocessing.pool.Pool
클래스는에서 작업자 프로세스 생성 __init__
악마의를하게하고이를 시작 방법을, 그리고 자신의-다시 설정 할 수 없습니다 daemon
에 속성을 False
가 시작된다 (그리고 나중에 그것이 더 이상 허용하지 않는다) 전에. 그러나 당신은 multiprocesing.pool.Pool
( multiprocessing.Pool
그냥 래퍼 함수일 뿐이다)의 당신 자신의 하위 클래스를 만들고 당신 자신 의multiprocessing.Process
하위 클래스를 만들고 작업자 프로세스에 사용할 수 있도록 항상 비 데모닉 인 하위 클래스로 수 있습니다.
다음은이를 수행하는 방법에 대한 전체 예입니다. 중요한 부분은 두 개의 클래스입니다 NoDaemonProcess
및 MyPool
상단 및 호출 pool.close()
하고 pool.join()
온 MyPool
끝에 인스턴스입니다.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time
from random import randint
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t
def work(num_procs):
print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = multiprocessing.Pool(num_procs)
result = pool.map(sleepawhile,
[randint(1, 5) for x in range(num_procs)])
# The following is not really needed, since the (daemon) workers of the
# child's pool are killed when the child is terminated, but it's good
# practice to cleanup after ourselves anyway.
pool.close()
pool.join()
return result
def test():
print("Creating 5 (non-daemon) workers and jobs in main process.")
pool = MyPool(5)
result = pool.map(work, [randint(1, 5) for x in range(5)])
pool.close()
pool.join()
print(result)
if __name__ == '__main__':
test()
답변
나는 파이썬 3.7에서 데모닉이 아닌 풀을 사용할 필요가 있었고 결국 받아 들여진 답변에 게시 된 코드를 수정했습니다. 아래는 비 데모닉 풀을 생성하는 스 니펫입니다.
import multiprocessing.pool
class NoDaemonProcess(multiprocessing.Process):
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, value):
pass
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(NestablePool, self).__init__(*args, **kwargs)
의 현재 구현이 multiprocessing
컨텍스트를 기반으로하도록 광범위하게 리팩토링되었으므로 as 속성 NoDaemonContext
이 있는 클래스 를 제공해야 NoDaemonProcess
합니다.NestablePool
그러면 기본 컨텍스트 대신 해당 컨텍스트를 사용합니다.
즉,이 접근 방식에 대해 최소한 두 가지주의 사항이 있음을 경고해야합니다.
- 여전히 구현 세부 사항에 따라 다릅니다.
multiprocessing
패키지 므로 언제든지 중단 될 수 있습니다. multiprocessing
데모닉이 아닌 프로세스를 사용하기 어렵게 만든 타당한 이유 가 있습니다 . 그 중 많은 부분이 여기 에 설명되어 있습니다 . 내 생각에 가장 설득력있는 것은 :
하위 프로세스를 사용하여 자식 스레드가 자체 자식을 생성 할 수 있도록 허용하는 경우 하위 프로세스가 완료되고 반환되기 전에 부모 또는 자식 스레드가 종료되면 좀비 ‘손자’의 작은 군대를 만들 위험이 있습니다.
답변
멀티 프로세싱 모듈은 프로세스와 풀을 사용하기 좋은 인터페이스를 가지고 또는 스레드. 현재 사용 사례 multiprocessing.pool.ThreadPool
에 따라 외부 풀에 사용 하는 것을 고려할 수 있으며, 이로 인해 스레드 가 생성됩니다 (내부에서 프로세스를 생성 할 수 있음). 아닌 됩니다.
GIL에 의해 제한 될 수 있지만, 저의 특별한 경우 (둘 다 테스트했습니다) , 여기Pool
에서 생성 된 외부 프로세스의 시작 시간 이 .ThreadPool
그것은 스왑 정말 쉽습니다 Processes
위한 Threads
. 여기 또는 여기 에서 ThreadPool
솔루션 사용 방법에 대해 자세히 알아보십시오 .
답변
일부 Python 버전에서 표준 풀을 사용자 지정으로 대체하면 오류가 발생할 수 있습니다 AssertionError: group argument must be None for now
..
여기 에서 도움이 될 수있는 솔루션을 찾았습니다.
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, val):
pass
class NoDaemonProcessPool(multiprocessing.pool.Pool):
def Process(self, *args, **kwds):
proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
proc.__class__ = NoDaemonProcess
return proc
답변
concurrent.futures.ProcessPoolExecutor
이 제한이 없습니다. 전혀 문제없이 중첩 된 프로세스 풀을 가질 수 있습니다.
from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time
def pid():
return current_process().pid
def _square(i): # Runs in inner_pool
square = i ** 2
time.sleep(i / 10)
print(f'{pid()=} {i=} {square=}')
return square
def _sum_squares(i, j): # Runs in outer_pool
with Pool(max_workers=2) as inner_pool:
squares = inner_pool.map(_square, (i, j))
sum_squares = sum(squares)
time.sleep(sum_squares ** .5)
print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
return sum_squares
def main():
with Pool(max_workers=3) as outer_pool:
for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
print(f'{pid()=} {sum_squares=}')
if __name__ == "__main__":
main()
위의 데모 코드는 Python 3.8로 테스트되었습니다.
ProcessPoolExecutor
그러나 의 한계 는 maxtasksperchild
. 이것이 필요한 경우 대신 Massimiliano 의 답변을 고려하십시오 .
크레딧 : answer by jfs
답변
내가 만난 문제는 모듈간에 전역을 가져 오려고 할 때 ProcessPool () 줄이 여러 번 평가되는 것입니다.
globals.py
from processing import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading import ThreadPool
class SingletonMeta(type):
def __new__(cls, name, bases, dict):
dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
return super(SingletonMeta, cls).__new__(cls, name, bases, dict)
def __init__(cls, name, bases, dict):
super(SingletonMeta, cls).__init__(name, bases, dict)
cls.instance = None
def __call__(cls,*args,**kw):
if cls.instance is None:
cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
return cls.instance
def __deepcopy__(self, item):
return item.__class__.instance
class Globals(object):
__metaclass__ = SingletonMeta
"""
This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
The root cause is that importing this file from different modules causes this file to be reevalutated each time,
thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug
"""
def __init__(self):
print "%s::__init__()" % (self.__class__.__name__)
self.shared_manager = Manager()
self.shared_process_pool = ProcessPool()
self.shared_thread_pool = ThreadPool()
self.shared_lock = Lock() # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin
그런 다음 코드의 다른 곳에서 안전하게 가져옵니다.
from globals import Globals
Globals().shared_manager
Globals().shared_process_pool
Globals().shared_thread_pool
Globals().shared_lock
pathos.multiprocessing
여기에 좀 더 확장 된 래퍼 클래스를 작성했습니다 .
참고로, 사용 사례에 성능 최적화로 비동기 다중 프로세스 맵만 필요한 경우 joblib는 모든 프로세스 풀을 백그라운드에서 관리하고 다음과 같은 매우 간단한 구문을 허용합니다.
squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )
답변
나는 데몬 프로세스가 자식을 낳을 수 있도록하는 당구 (다중 처리 풀 확장) 라는 celery
포크를 사용하여이 문제를 다루는 사람들을 보았습니다 . 해결 방법은 다음과 같이 모듈 을 간단히 교체하는 것입니다.multiprocessing
multiprocessing
import billiard as multiprocessing