multiprocessing is a convenient library to take the advantage of multiple cores easily found in modern processes. The typical pattern is to spawn a bunch of worker processes, and let them consume the data from a queue. However, when debugging, I usually found myself attempting to terminate the script using Ctrl-C yet to find it has no effect.
Working Example
Here is a typical pattern to use multiprocssing.Process and multiprocess.Queue.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | import multiprocessing
class Worker(multiprocssing.Process):
def __init__(self, queue, *args, **kwargs):
self.queue = queue
def run(self):
while True:
item = self.queue.get()
if item is None:
break
# process item here
queue = multiprocess.Queue()
workers = [Worker(queue) for _ in range(multiprocssing.cpu_count())]
for w in workers:
w.start()
queue.put([i for i in items])
for w in workers:
queue.put(None)
queue.close()
queue.join_thread()
|
Here we spawn a number of workers, and let each of them consume input from the queue. Normally the main process gets stuck at the queue.join_thread() function. When you press Ctrl-C while the script is running, the subprocesses will not be terminated properly.
First Attempt
My first try is to catch the KeyboardInterrupt and the manually terminate the processes.
1 2 3 4 5 | try:
queue.join_thread()
except KeyboardInterrupt:
for w in workers:
w.terminate()
|
However, this won't work most of the time, especially when you have some serious computing going on in each process.
Solution
Then I noticed the daemon flag in the Process document.
When a process exits, it attempts to terminate all of its daemonic child processes.
So I set each child process's daemon attribute to be True: they are not creating sub-subprocesses anyway.
Note that daemon flag must be set BEFORE calling the processes' start function. Also, once the daemon flag is set, queue.join_thread() does not work anymore: you'll have to call join for each worker.
1 2 3 4 5 6 7 8 9 10 | for w in workers:
w.daemon = True
w.start()
try:
for w in workers:
w.join()
except KeyboardInterrupt:
for w in workers:
w.terminate()
|