-
-
Notifications
You must be signed in to change notification settings - Fork 30.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
concurrent.futures.ProcessPoolExecutor
pool deadlocks when submitting many tasks
#105829
Comments
Scenario 1Running with 4 worker processes (this particular run hanged after submitting 380'000 tasks.$ ps -t pts/5 -o pid,s,command
PID S COMMAND
11534 S python3 hang.py
11536 S /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.resource_tracker import main;main(9)
11537 S /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=14) --multiprocessing-fork
11540 S /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=16) --multiprocessing-fork
11541 S /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=18) --multiprocessing-fork
11542 S /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=20) --multiprocessing-fork $ sudo py-spy dump -n -p 11534
Process 11534: python3 hang.py
Python v3.10.5 (/sw/Python/Ubuntu-18.04/3.10.5/bin/python3.10)
Thread 11534 (idle): "MainThread"
write (libpthread-2.27.so)
os_write (libpython3.10.so.1.0)
_send (multiprocessing/connection.py:373)
_send_bytes (multiprocessing/connection.py:416)
send_bytes (multiprocessing/connection.py:205)
wakeup (concurrent/futures/process.py:79)
submit (concurrent/futures/process.py:729)
submit_to_pool (hang.py:39)
main (hang.py:21)
<module> (hang.py:56)
Thread 11538 (idle): "Thread-1"
do_futex_wait.constprop.1 (libpthread-2.27.so)
__new_sem_wait_slow.constprop.0 (libpthread-2.27.so)
PyThread_acquire_lock_timed (libpython3.10.so.1.0)
lock_PyThread_acquire_lock (libpython3.10.so.1.0)
wait_result_broken_or_wakeup (concurrent/futures/process.py:396)
run (concurrent/futures/process.py:320)
_bootstrap_inner (threading.py:1016)
_bootstrap (threading.py:973)
thread_run (libpython3.10.so.1.0)
clone (libc-2.27.so)
Thread 11539 (idle): "QueueFeederThread"
do_futex_wait.constprop.1 (libpthread-2.27.so)
__new_sem_wait_slow.constprop.0 (libpthread-2.27.so)
PyThread_acquire_lock_timed (libpython3.10.so.1.0)
lock_PyThread_acquire_lock (libpython3.10.so.1.0)
wait (threading.py:320)
_feed (multiprocessing/queues.py:231)
run (threading.py:953)
_bootstrap_inner (threading.py:1016)
_bootstrap (threading.py:973)
thread_run (libpython3.10.so.1.0)
clone (libc-2.27.so) $ sudo py-spy dump -n -p 11536
Process 11536: /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.resource_tracker import main;main(9)
Python v3.10.5 (/sw/Python/Ubuntu-18.04/3.10.5/bin/python3.10)
Thread 11536 (idle): "MainThread"
read (libpthread-2.27.so)
main (multiprocessing/resource_tracker.py:197)
<module> (<string>:1) $ sudo py-spy dump -n -p 11537
Process 11537: /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=14) --multiprocessing-fork
Python v3.10.5 (/sw/Python/Ubuntu-18.04/3.10.5/bin/python3.10)
Thread 11537 (idle): "MainThread"
read (libpthread-2.27.so)
os_read (libpython3.10.so.1.0)
_recv (multiprocessing/connection.py:384)
_recv_bytes (multiprocessing/connection.py:419)
recv_bytes (multiprocessing/connection.py:221)
get (multiprocessing/queues.py:103)
_process_worker (concurrent/futures/process.py:240)
run (multiprocessing/process.py:108)
_bootstrap (multiprocessing/process.py:315)
_main (multiprocessing/spawn.py:129)
spawn_main (multiprocessing/spawn.py:116)
<module> (<string>:1) $ sudo py-spy dump -n -p 11540
Process 11540: /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=16) --multiprocessing-fork
Python v3.10.5 (/sw/Python/Ubuntu-18.04/3.10.5/bin/python3.10)
Thread 11540 (idle): "MainThread"
do_futex_wait.constprop.1 (libpthread-2.27.so)
__new_sem_wait_slow.constprop.0 (libpthread-2.27.so)
_multiprocessing_SemLock_acquire_impl (semaphore.c:355)
__enter__ (multiprocessing/synchronize.py:95)
get (multiprocessing/queues.py:102)
_process_worker (concurrent/futures/process.py:240)
run (multiprocessing/process.py:108)
_bootstrap (multiprocessing/process.py:315)
_main (multiprocessing/spawn.py:129)
spawn_main (multiprocessing/spawn.py:116)
<module> (<string>:1) $ sudo py-spy dump -n -p 11541
Process 11541: /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=18) --multiprocessing-fork
Python v3.10.5 (/sw/Python/Ubuntu-18.04/3.10.5/bin/python3.10)
Thread 11541 (idle): "MainThread"
do_futex_wait.constprop.1 (libpthread-2.27.so)
__new_sem_wait_slow.constprop.0 (libpthread-2.27.so)
_multiprocessing_SemLock_acquire_impl (semaphore.c:355)
__enter__ (multiprocessing/synchronize.py:95)
get (multiprocessing/queues.py:102)
_process_worker (concurrent/futures/process.py:240)
run (multiprocessing/process.py:108)
_bootstrap (multiprocessing/process.py:315)
_main (multiprocessing/spawn.py:129)
spawn_main (multiprocessing/spawn.py:116)
<module> (<string>:1) $ sudo py-spy dump -n -p 11542
Process 11542: /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=20) --multiprocessing-fork
Python v3.10.5 (/sw/Python/Ubuntu-18.04/3.10.5/bin/python3.10)
Thread 11542 (idle): "MainThread"
do_futex_wait.constprop.1 (libpthread-2.27.so)
__new_sem_wait_slow.constprop.0 (libpthread-2.27.so)
_multiprocessing_SemLock_acquire_impl (semaphore.c:355)
__enter__ (multiprocessing/synchronize.py:95)
get (multiprocessing/queues.py:102)
_process_worker (concurrent/futures/process.py:240)
run (multiprocessing/process.py:108)
_bootstrap (multiprocessing/process.py:315)
_main (multiprocessing/spawn.py:129)
spawn_main (multiprocessing/spawn.py:116)
<module> (<string>:1) Scenario 2Running with 4 worker processes (this particular run hanged after submitting 140'000 tasks.Task modified to take no arguments: diff --git a/hang.py b/hang.py
index 84c591be745..8eb138c8de1 100644
--- a/hang.py
+++ b/hang.py
@@ -35,14 +35,13 @@ def submit_to_pool(pool):
# Too much printing here makes the hang to go away!!!
print("\nsubmit", task_idx)
- task_name = f"task{task_idx}" * TASK_DATA
- future_results.append(pool.submit(task, task_idx, task_name))
+ future_results.append(pool.submit(task))
return future_results
-def task(task_idx, task_name):
- """ Do some dummy work """
+def task():
+ """ Do some dummy work """
s = ""
for i in range(TASK_SIZE):
s += str(i) $ ps -t pts/5 -o pid,s,command
PID S COMMAND
27058 S python3 hang.py
27060 S /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.resource_tracker import main;main(9)
27061 S /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=14) --multiprocessing-fork
27064 S /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=16) --multiprocessing-fork
27065 S /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=18) --multiprocessing-fork
27066 S /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=20) --multiprocessing-fork $ sudo py-spy dump -n -p 27058
Process 27058: python3 hang.py
Python v3.10.5 (/sw/Python/Ubuntu-18.04/3.10.5/bin/python3.10)
Thread 27058 (idle): "MainThread"
write (libpthread-2.27.so)
os_write (libpython3.10.so.1.0)
_send (multiprocessing/connection.py:373)
_send_bytes (multiprocessing/connection.py:416)
send_bytes (multiprocessing/connection.py:205)
wakeup (concurrent/futures/process.py:79)
submit (concurrent/futures/process.py:729)
submit_to_pool (hang.py:38)
main (hang.py:21)
<module> (hang.py:55)
Thread 27062 (idle): "Thread-1"
do_futex_wait.constprop.1 (libpthread-2.27.so)
__new_sem_wait_slow.constprop.0 (libpthread-2.27.so)
PyThread_acquire_lock_timed (libpython3.10.so.1.0)
lock_PyThread_acquire_lock (libpython3.10.so.1.0)
wait_result_broken_or_wakeup (concurrent/futures/process.py:396)
run (concurrent/futures/process.py:320)
_bootstrap_inner (threading.py:1016)
_bootstrap (threading.py:973)
thread_run (libpython3.10.so.1.0)
clone (libc-2.27.so)
Thread 27063 (idle): "QueueFeederThread"
do_futex_wait.constprop.1 (libpthread-2.27.so)
__new_sem_wait_slow.constprop.0 (libpthread-2.27.so)
PyThread_acquire_lock_timed (libpython3.10.so.1.0)
lock_PyThread_acquire_lock (libpython3.10.so.1.0)
wait (threading.py:320)
_feed (multiprocessing/queues.py:231)
run (threading.py:953)
_bootstrap_inner (threading.py:1016)
_bootstrap (threading.py:973)
thread_run (libpython3.10.so.1.0)
clone (libc-2.27.so) $ sudo py-spy dump -n -p 27060
Process 27060: /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.resource_tracker import main;main(9)
Python v3.10.5 (/sw/Python/Ubuntu-18.04/3.10.5/bin/python3.10)
Thread 27060 (idle): "MainThread"
read (libpthread-2.27.so)
main (multiprocessing/resource_tracker.py:197)
<module> (<string>:1) $ sudo py-spy dump -n -p 27061
Process 27061: /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=14) --multiprocessing-fork
Python v3.10.5 (/sw/Python/Ubuntu-18.04/3.10.5/bin/python3.10)
Thread 27061 (idle): "MainThread"
write (libpthread-2.27.so)
os_write (libpython3.10.so.1.0)
_send (multiprocessing/connection.py:373)
_send_bytes (multiprocessing/connection.py:416)
send_bytes (multiprocessing/connection.py:205)
put (multiprocessing/queues.py:377)
_sendback_result (concurrent/futures/process.py:211)
_process_worker (concurrent/futures/process.py:239)
run (multiprocessing/process.py:108)
_bootstrap (multiprocessing/process.py:315)
_main (multiprocessing/spawn.py:129)
spawn_main (multiprocessing/spawn.py:116)
<module> (<string>:1) $ sudo py-spy dump -n -p 27064
Process 27064: /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=16) --multiprocessing-fork
Python v3.10.5 (/sw/Python/Ubuntu-18.04/3.10.5/bin/python3.10)
Thread 27064 (idle): "MainThread"
do_futex_wait.constprop.1 (libpthread-2.27.so)
__new_sem_wait_slow.constprop.0 (libpthread-2.27.so)
_multiprocessing_SemLock_acquire_impl (semaphore.c:355)
__enter__ (multiprocessing/synchronize.py:95)
put (multiprocessing/queues.py:376)
_sendback_result (concurrent/futures/process.py:211)
_process_worker (concurrent/futures/process.py:239)
run (multiprocessing/process.py:108)
_bootstrap (multiprocessing/process.py:315)
_main (multiprocessing/spawn.py:129)
spawn_main (multiprocessing/spawn.py:116)
<module> (<string>:1) $ sudo py-spy dump -n -p 27065
Process 27065: /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=18) --multiprocessing-fork
Python v3.10.5 (/sw/Python/Ubuntu-18.04/3.10.5/bin/python3.10)
Thread 27065 (idle): "MainThread"
do_futex_wait.constprop.1 (libpthread-2.27.so)
__new_sem_wait_slow.constprop.0 (libpthread-2.27.so)
_multiprocessing_SemLock_acquire_impl (semaphore.c:355)
__enter__ (multiprocessing/synchronize.py:95)
put (multiprocessing/queues.py:376)
_sendback_result (concurrent/futures/process.py:211)
_process_worker (concurrent/futures/process.py:239)
run (multiprocessing/process.py:108)
_bootstrap (multiprocessing/process.py:315)
_main (multiprocessing/spawn.py:129)
spawn_main (multiprocessing/spawn.py:116)
<module> (<string>:1) $ sudo py-spy dump -n -p 27066
Process 27066: /sw/Python/Ubuntu-18.04/3.10.5/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=10, pipe_handle=20) --multiprocessing-fork
Python v3.10.5 (/sw/Python/Ubuntu-18.04/3.10.5/bin/python3.10)
Thread 27066 (idle): "MainThread"
do_futex_wait.constprop.1 (libpthread-2.27.so)
__new_sem_wait_slow.constprop.0 (libpthread-2.27.so)
_multiprocessing_SemLock_acquire_impl (semaphore.c:355)
__enter__ (multiprocessing/synchronize.py:95)
put (multiprocessing/queues.py:376)
_sendback_result (concurrent/futures/process.py:211)
_process_worker (concurrent/futures/process.py:239)
run (multiprocessing/process.py:108)
_bootstrap (multiprocessing/process.py:315)
_main (multiprocessing/spawn.py:129)
spawn_main (multiprocessing/spawn.py:116)
<module> (<string>:1) Sending one
Scenario 3Same as scenario 2 but other traceback.Sending ctrl-c after the hang stops immediately with this Traceback:
|
I think I have found the issue that is causing the deadlock. It seems class _ThreadWakeup:
def wakeup(self):
if not self._closed:
self._writer.send_bytes(b"")
def clear(self):
if not self._closed:
while self._reader.poll():
self._reader.recv_bytes() class ProcessPoolExecutor(_base.Executor):
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
...
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup() class _ExecutorManagerThread(threading.Thread):
def wait_result_broken_or_wakeup(self):
...
with self.shutdown_lock:
self.thread_wakeup.clear() The shutdown_lock must be taken for both reads and writes of the What seems to happen in this case is that we send so many wake-up To confirm this I tried the following patch: --- a/lib/python3.10/concurrent/futures/process.py
+++ b/lib/python3.10/concurrent/futures/process.py
@@ -67,7 +67,6 @@ class _ThreadWakeup:
def __init__(self):
self._closed = False
self._reader, self._writer = mp.Pipe(duplex=False)
+ os.set_blocking(self._writer.fileno(), False)
def close(self):
if not self._closed:
@@ -77,10 +76,7 @@ class _ThreadWakeup:
def wakeup(self):
if not self._closed:
- self._writer.send_bytes(b"")
+ try:
+ self._writer.send_bytes(b"")
+ except BlockingIOError:
+ pass With this change I can no longer reproduce the deadlock. It is a bit I'm not sure if relying on the shutdown_lock for making the reader and Something like this: --- a/lib/python3.10/concurrent/futures/process.py
+++ b/lib/python3.10/concurrent/futures/process.py
@@ -89,7 +89,7 @@ def _python_exit():
_global_shutdown = True
items = list(_threads_wakeups.items())
for _, thread_wakeup in items:
- # call not protected by ProcessPoolExecutor._shutdown_lock
+ # call not protected by ProcessPoolExecutor._wake_shutdown_lock
thread_wakeup.wakeup()
for t, _ in items:
t.join()
@@ -160,10 +160,10 @@ class _CallItem(object):
class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
- def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
+ def __init__(self, max_size=0, *, ctx, pending_work_items, wake_shutdown_lock,
thread_wakeup):
self.pending_work_items = pending_work_items
- self.shutdown_lock = shutdown_lock
+ self.wake_shutdown_lock = wake_shutdown_lock
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)
@@ -172,7 +172,7 @@ class _SafeQueue(Queue):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
- with self.shutdown_lock:
+ with self.wake_shutdown_lock:
self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this
# case, the executor_manager_thread fails all work_items
@@ -274,7 +274,8 @@ class _ExecutorManagerThread(threading.Thread):
# A _ThreadWakeup to allow waking up the queue_manager_thread from the
# main Thread and avoid deadlocks caused by permanently locked queues.
self.thread_wakeup = executor._executor_manager_thread_wakeup
- self.shutdown_lock = executor._shutdown_lock
+ self.wake_shutdown_lock = executor._wake_shutdown_lock
+ self.clear_shutdown_lock = executor._clear_shutdown_lock
# A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
# to determine if the ProcessPoolExecutor has been garbage collected
@@ -284,10 +285,10 @@ class _ExecutorManagerThread(threading.Thread):
# if there is no pending work item.
def weakref_cb(_,
thread_wakeup=self.thread_wakeup,
- shutdown_lock=self.shutdown_lock):
+ wake_shutdown_lock=executor._wake_shutdown_lock):
mp.util.debug('Executor collected: triggering callback for'
' QueueManager wakeup')
- with shutdown_lock:
+ with wake_shutdown_lock:
thread_wakeup.wakeup()
self.executor_reference = weakref.ref(executor, weakref_cb)
@@ -392,7 +393,7 @@ class _ExecutorManagerThread(threading.Thread):
elif wakeup_reader in ready:
is_broken = False
- with self.shutdown_lock:
+ with self.clear_shutdown_lock:
self.thread_wakeup.clear()
return result_item, is_broken, cause
@@ -513,7 +514,7 @@ class _ExecutorManagerThread(threading.Thread):
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
- with self.shutdown_lock:
+ with self.wake_shutdown_lock, self.clear_shutdown_lock:
self.thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
@@ -632,7 +633,8 @@ class ProcessPoolExecutor(_base.Executor):
# Shutdown is a two-step process.
self._shutdown_thread = False
- self._shutdown_lock = threading.Lock()
+ self._wake_shutdown_lock = threading.Lock()
+ self._clear_shutdown_lock = threading.Lock()
self._idle_worker_semaphore = threading.Semaphore(0)
self._broken = False
self._queue_count = 0
@@ -646,7 +648,9 @@ class ProcessPoolExecutor(_base.Executor):
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
#
- # _shutdown_lock must be locked to access _ThreadWakeup.
+ # _wake_shutdown_lock must be locked to access _ThreadWakeup.wakeup()
+ # _clear_shutdown_lock must be locked to access _ThreadWakeup.clear()
+ # both locks must be locked to access _ThreadWakeup.close()
self._executor_manager_thread_wakeup = _ThreadWakeup()
# Create communication channels for the executor
@@ -657,7 +661,7 @@ class ProcessPoolExecutor(_base.Executor):
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
- shutdown_lock=self._shutdown_lock,
+ wake_shutdown_lock=self._wake_shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
@@ -710,7 +714,7 @@ class ProcessPoolExecutor(_base.Executor):
self._processes[p.pid] = p
def submit(self, fn, /, *args, **kwargs):
- with self._shutdown_lock:
+ with self._wake_shutdown_lock:
if self._broken:
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
@@ -764,7 +768,7 @@ class ProcessPoolExecutor(_base.Executor):
return _chain_from_iterable_of_lists(results)
def shutdown(self, wait=True, *, cancel_futures=False):
- with self._shutdown_lock:
+ with self._wake_shutdown_lock:
self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True
if self._executor_manager_thread_wakeup is not None: Again, not entirely clear if this is valid and preserves all required Removing the exclusivity could maybe also bring some minor performance Regardless, I think the problem is fairly clear but the solution needs |
Hi @pitrou, @brianquinlan and @gpshead, have you seen this? |
Nice analysis and proposals @elfstrom. Indeed, it needs some thought to reason out. I kind of like the non-blocking pipe write hack and agree that a full pipe shouldn't be a problem as presumably that implies that the reader is set to discover that and process things anyways, accomplishing the same goal. But I'll need to dive into the code on this to understand the intents regardless. On the multiple locks front, also plausible, though we should be strict about the acquisition ordering that must be used, otherwise it's more potential for deadlock. In addition to more comments describing the requirement, this could probably even be helped by abstracting them for normal use cases with some defensive code such that acquisition of the second lock via the abstraction will fail when the first one is already held by the same thread. Another challenge with this kind of change is to come up with some form of easily executable likely deterministic regression test. i.e. something not too invasive that'd set up the otherwise infrequent state causing the existing code to fail that would no longer fail with the fix in place. |
Looks like both black and pylint are now tripping this bug in the right conditions, if I can find some time I'm going to have a look at the above... |
Looks like @vstinner introduced the usage of shutdown lock in this context, so tagging him here in case he has an opinion. |
We consistently run into this issue in our large-scale data processing pipeline (Python 3.9, 3.10, and 3.11) and can never run it to completion. Thanks @elfstrom for the analysis and suggested fixes. While I'm here, I should mention that |
If the management thread does not clear the wakeup pipe fast enough the wakeup code will block holding the shutdown lock causing deadlock. python#105829
This fixes issue python#105829, python#105829 The _ExecutorManagerThread wake-up code could deadlock if the wake-up pipe filled up and blocked. The relevant code looked like this: class _ThreadWakeup: def wakeup(self): if not self._closed: self._writer.send_bytes(b"") def clear(self): if not self._closed: while self._reader.poll(): self._reader.recv_bytes() class ProcessPoolExecutor(_base.Executor): def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: ... # Wake up queue management thread self._executor_manager_thread_wakeup.wakeup() class _ExecutorManagerThread(threading.Thread): def wait_result_broken_or_wakeup(self): ... with self.shutdown_lock: self.thread_wakeup.clear() The shutdown_lock must be taken for both reads and writes of the wake-up pipe. If a read or a write of the pipe blocks, the code will deadlock. It looks like reads can't block (a poll() is done before doing any reads) but writes have not protection against blocking. If the _ExecutorManagerThread cannot keep up and clear the wake-up pipe it will fill up and block. This seems to have been rather easy to do in the real world as long as the number of tasks is more than 100000 or so. With this change we make the writes to the wake-up pipe non blocking. If the pipe blocks we will simply skip the write. This should be OK since the reason for the problem is that both reader and writer must hold the shutdown_lock when accessing the pipe. That should imply that we don't need to send anything if the pipe is full, the reader can't be reading it concurrently, it will eventually wake up due to the data already in the pipe.
If I understand the code correctly then too many wakeups is fine so if clear sometimes leaves things in the pipe that should not be an issue. The thread will just wakeup, find nothing to do and go back to sleep again. I also posted a proof of concept PR, hopefully it can get the ball rolling on a fix. |
I agree that, given how the rest of the code is structured today, more wakeups is fine. A person poking at this code in the future may reasonably expect that I'm not suggesting an invasive fix – but maybe a comment capturing the semantics / warning that Thanks for your work on this, and for pushing things along! |
@gpshead - do you know of any way to control the size of the pipe created? Feels like making the pipe small would make this more easily reproducible? But I can't find a way to do that... |
After some more googling it seems like on linux it is possible to check and set the pipe size using fcntl (limited to the range [PAGE_SIZE../proc/sys/fs/pipe-max-size], in PAGE_SIZE increments). import multiprocessing
import os
import fcntl
def check_pipe_sizes():
for size in [2**i for i in range(10, 15)]:
b, a = multiprocessing.Pipe(False)
os.set_blocking(a.fileno(), False)
start_size = fcntl.fcntl(a.fileno(), fcntl.F_GETPIPE_SZ)
print("===============")
print(f"Changing pipe size from {start_size} to {size}: new size",
fcntl.fcntl(a.fileno(), fcntl.F_SETPIPE_SZ, size))
try:
for x in range(size):
a.send_bytes(b'')
print(f" Pipe did not block after {x} sends")
except BlockingIOError:
print(f" Pipe blocked after {x} sends, ({x*4} bytes)")
finally:
a.close()
b.close()
if __name__ == "__main__":
check_pipe_sizes() This gives:
So at least on linux we can lower the size if we want to. Not sure how this works on other platforms. |
cc @tomMoral |
After some analysis, it seems a simple fix is to simply not lock the ThreadWakeup object when reading from it: diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 301207f59d..5065a572a4 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -426,8 +426,12 @@ def wait_result_broken_or_wakeup(self):
elif wakeup_reader in ready:
is_broken = False
- with self.shutdown_lock:
- self.thread_wakeup.clear()
+ # No need to hold the _shutdown_lock here because:
+ # 1. we're the only thread to use the wakeup reader
+ # 2. we're also the only thread to call thread_wakeup.close()
+ # 3. we want to avoid a possible deadlock when both reader and writer
+ # would block (gh-105829)
+ self.thread_wakeup.clear()
return result_item, is_broken, cause
(however, the reproducer doesn't work here so I couldn't check this fixes the issue) |
@pitrou - if the reproducer doesn't work for you, then you may have a different issue? |
@cjw296 What do you mean? |
This also fixes the issue and I think I agree with the justification for why this is safe. |
Try to reduce the size of the pipe to make the test faster. The solution only works on Unix-like platforms, we fall back on a hardcoded size for other platforms.
I'll be able to give it a shot this week and will report back. |
Replying to #108513 (comment) here to try and keep the discussion in one place:
@pitrou / @elfstrom - curious, given the use case, if this
@sharvil 's testing should hopefully give us a good data point. I also have couple of heavy users of multiprocessing (black and pylint run on large mono-repo that were both triggering this bug), which would give us some empirical feedback. @gpshead - what are your views here? |
…adlock This reverts the previous fix and instead opts to remove the locking completely when clearing the wakeup pipe. We can do this because clear() and close() are both called from the same thread and nowhere else. In this version of this fix, the call to ProcessPoolExecutor.submit can still block on the wakeup pipe if it happens to fill up. This should not be an issue as there are already other cases where the submit call can block and if the wakeup pipe is full it implies there is already a lot of work items queued up. Co-authored-by: Antoine Pitrou <[email protected]>
A bit late to the party but I think the solution that removes the lock on the In |
Is loky private? Is it tested better than this implementation? |
It can be found here: https://github.com/joblib/loky It used to have more tests to check for some corner cases, and the most important ones have been ported in cpython. There are probably still some extra ones, but I have not followed actively the updates in |
Change test strategy. We now force the main thread to block during the wake-up call by mocking the wake-up object and artificially limiting to a single wake-up before blocking. This allows us to reduce some timeouts, number of tasks and lower the total runtime of the test. It should also guarantee a blocking main thread on all platforms, regardless of any pipe buffer sizes. The drawback is that the test is now a bit opinionated on how we fix this issue (i.e. just making the wake-up pipe non blocking would not satisfy this test even though it is a valid fix for the issue).
This fixes issue #105829, #105829 Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Antoine Pitrou <[email protected]> Co-authored-by: Chris Withers <[email protected]> Co-authored-by: Thomas Moreau <[email protected]>
…adlock (pythonGH-108513) This fixes issue pythonGH-105829, python#105829 Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Antoine Pitrou <[email protected]> Co-authored-by: Chris Withers <[email protected]> Co-authored-by: Thomas Moreau <[email protected]>. (cherry picked from commit 405b063) Co-authored-by: elfstrom <[email protected]>
…GH-108513) (#109783) This fixes issue GH-105829, #105829 (cherry picked from commit 405b063)
…ython#108513) This fixes issue python#105829, python#105829 Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Antoine Pitrou <[email protected]> Co-authored-by: Chris Withers <[email protected]> Co-authored-by: Thomas Moreau <[email protected]>
Remove test_gh105829_should_not_deadlock_if_wakeup_pipe_full() of test_concurrent_futures.test_deadlock. The test is no longer relevant.
Remove test_gh105829_should_not_deadlock_if_wakeup_pipe_full() of test_concurrent_futures.test_deadlock. The test is no longer relevant.
…p.wakeup() Remove test_gh105829_should_not_deadlock_if_wakeup_pipe_full() of test_concurrent_futures.test_deadlock. The test is no longer relevant.
…p.wakeup() Replace test_gh105829_should_not_deadlock_if_wakeup_pipe_full() test which was mocking too many concurrent.futures internals with a new test_wakeup() functional test. Co-Authored-By: elfstrom <[email protected]>
…p.wakeup() Replace test_gh105829_should_not_deadlock_if_wakeup_pipe_full() test which was mocking too many concurrent.futures internals with a new test_wakeup() functional test. Co-Authored-By: elfstrom <[email protected]>
…GH-108513) (#109784) This fixes issue GH-105829, #105829 (cherry picked from commit 405b063)
… (GH-108513) (#109784) This fixes issue GH-105829, python/cpython#105829 (cherry picked from commit 405b06375a8a4cdb08ff53afade09a8b66ec23d5) CPython-sync-commit: c2cadb0ec2d192c78d9e01c3c0c1cae12ea57392
Which python versions will this fix be released in? |
|
No, >=3.12.1. |
… (GH-108513) This fixes issue GH-105829, python/cpython#105829 Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Antoine Pitrou <[email protected]> Co-authored-by: Chris Withers <[email protected]> Co-authored-by: Thomas Moreau <[email protected]>. (cherry picked from commit 405b063) Co-authored-by: elfstrom <[email protected]>
…ython#108513) This fixes issue python#105829, python#105829 Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Antoine Pitrou <[email protected]> Co-authored-by: Chris Withers <[email protected]> Co-authored-by: Thomas Moreau <[email protected]>
Bug report
Submitting many tasks to a
concurrent.futures.ProcessPoolExecutor
pooldeadlocks with all three start methods.
When running the same example with
multiprocessing.pool.Pool
we have NOT beenable to cause a deadlock.
Different set of parameters affect how likely it is to get a deadlock
spawn
,fork
, andforkserver
exhibit the deadlock(the examples below are with
spawn
method)still cause a hang. (see example script)
DO_PRINT = False
for higher probability of hanging.Example stack trace excerpts in hanged scenarios
reading the queue:
writing the queue:
Example script exhibiting deadlock behavior
Environment
Details
Detailed stack traces in comments.
Linked PRs
The text was updated successfully, but these errors were encountered: