multiprocessing is a convenient library to take the advantage of multiple cores easily found in modern processes. The typical pattern is to spawn a bunch of worker processes, and let them consume the data from a queue. However, when debugging, I usually found myself attempting to terminate the script using Ctrl-C yet to find it has no effect.

Working Example

Here is a typical pattern to use multiprocssing.Process and multiprocess.Queue.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import multiprocessing


class Worker(multiprocssing.Process):
    def __init__(self, queue, *args, **kwargs):
        self.queue = queue

    def run(self):
        while True:
            item = self.queue.get()
            if item is None:
                break

            # process item here

queue = multiprocess.Queue()
workers = [Worker(queue) for _ in range(multiprocssing.cpu_count())]
for w in workers:
    w.start()

queue.put([i for i in items])
for w in workers:
    queue.put(None)

queue.close()
queue.join_thread()

Here we spawn a number of workers, and let each of them consume input from the queue. Normally the main process gets stuck at the queue.join_thread() function. When you press Ctrl-C while the script is running, the subprocesses will not be terminated properly.

First Attempt

My first try is to catch the KeyboardInterrupt and the manually terminate the processes.

1
2
3
4
5
try:
    queue.join_thread()
except KeyboardInterrupt:
    for w in workers:
        w.terminate()

However, this won't work most of the time, especially when you have some serious computing going on in each process.

Solution

Then I noticed the daemon flag in the Process document.

When a process exits, it attempts to terminate all of its daemonic child processes.

So I set each child process's daemon attribute to be True: they are not creating sub-subprocesses anyway.

Note that daemon flag must be set BEFORE calling the processes' start function. Also, once the daemon flag is set, queue.join_thread() does not work anymore: you'll have to call join for each worker.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
for w in workers:
    w.daemon = True
    w.start()

try:
    for w in workers:
        w.join()
except KeyboardInterrupt:
    for w in workers:
        w.terminate()