Python multiprocessing queue

📅2023-10-17🧐80

Yesterday, I had a coding challenge in an interview. The question was to implement a multiprocessing program that allows creating a queue and gracefully shutting down the process after all tasks have been completed.

Since the biggest challenge is how to enqueue without blocking, there are three ways to implement it.

  1. Using main thread perform enqueue
from multiprocessing import Process, Queue
from random import randint
from queue import Empty
from time import sleep

id_list = list(range(1, 16))
q_size = 5

queue = Queue(maxsize=q_size)

def mp_worker(queue):
    while True:
        try:
            item = queue.get()
            print('Processing: {}'.format(item))
            sleep(randint(1, 2)) # Simulate different processing time
            print('Done: {}'.format(item))
        except Empty:
            sleep(1)

if __name__ == '__main__':
    # init child processes
    processes = [Process(target=mp_worker, args=(queue,)) for _ in range(q_size)]

    # start the processes, because the following loop will block
    for process in processes:
        process.start()

    # start feeding the queue
    while id_list:
        # since the queue can be blocked, we check if it's full
        if queue.full() is False:
            item = id_list.pop(0)
            queue.put(item)
            print('Queued: {}'.format(item))
        else:
            pass

    # wait for the queue to empty (all tasks are done)
    while queue.empty() is False:
        sleep(10)

    # terminate all processes
    for process in processes:
        process.terminate()
    
    # gracefully shutdown
    print('The program is done')


## Output:
"""
root@d4e38f17331d:/code# python ./q.py 
Queued: 1
Processing: 1
Queued: 2
Queued: 3
Queued: 4
Processing: 2
Queued: 5
Queued: 6
Queued: 7
Processing: 3
Queued: 8
Queued: 9
Processing: 4
Queued: 10
Processing: 5
Done: 2
Processing: 6
Queued: 11
Done: 5
Processing: 7
Queued: 12
Done: 1
Processing: 8
Queued: 13
Done: 3
Processing: 9
Queued: 14
Done: 4
Processing: 10
Queued: 15
Done: 7
Processing: 11
Done: 6
Done: 9
Processing: 12
Processing: 13
Done: 8
Processing: 14
Done: 10
Processing: 15
Done: 11
Done: 14
Done: 12
Done: 13
Done: 15
The program is done
"""

2. Using a spare child process to enqueue

from multiprocessing import Process, Queue
from random import randint
from queue import Empty
from time import sleep

id_list = list(range(1, 16))
q_size = 5

queue = Queue(maxsize=q_size)

def mp_worker(queue):
    while True:
        try:
            item = queue.get()
            print('Processing: {}'.format(item))
            sleep(randint(1, 2)) # Simulate different processing time
            print('Done: {}'.format(item))
        except Empty:
            sleep(1)

def mp_queue_putter(queue):
    """
    since put can block the main process, we put it in a separate process
    """
    for item in id_list:
        queue.put(item)
        print('Queued: {}'.format(item))

if __name__ == '__main__':
    # init child processes (queue putter)
    queue_putter = Process(target=mp_queue_putter, args=(queue,))

    # start to feed the queue
    queue_putter.start()

    # init child processes (workers)
    processes = [Process(target=mp_worker, args=(queue,)) for _ in range(q_size)]

    # start the processes
    for process in processes:
        process.start()

    # wait for the queue to empty (all tasks are done)
    while queue.empty() is False:
        sleep(10)

    # terminate all processes (workers)
    for process in processes:
        process.terminate()

    # terminate the queue putter
    queue_putter.terminate()
    
    # gracefully shutdown
    print('The program is done')


## Output:
"""
root@d4e38f17331d:/code# python ./q_enqueue_mp.py 
Queued: 1
Processing: 1
Queued: 2
Queued: 3
Queued: 4
Queued: 5
Queued: 6
Queued: 7
Processing: 2
Processing: 3
Queued: 8
Processing: 4
Queued: 9
Processing: 5
Queued: 10
Done: 1
Processing: 6
Queued: 11
Done: 2
Done: 3
Processing: 7
Queued: 12
Processing: 8
Done: 4
Queued: 13
Processing: 9
Done: 5
Queued: 14
Processing: 10
Queued: 15
Done: 6
Done: 7
Processing: 11
Processing: 12
Done: 8
Processing: 13
Done: 12
Processing: 14
Done: 9
Done: 10
Done: 13
Processing: 15
Done: 11
Done: 14
Done: 15
The program is done
"""

3. Using asyncio (eventloop) to enqueue

from multiprocessing import Process, Queue
from random import randint
from queue import Empty
from time import sleep
import asyncio

id_list = list(range(1, 16))
q_size = 5

queue = Queue(maxsize=q_size)

def mp_worker(queue):
    while True:
        try:
            item = queue.get()
            print('Processing: {}'.format(item))
            sleep(randint(1, 2)) # Simulate different processing time
            print('Done: {}'.format(item))
        except Empty:
            sleep(1)

async def main():
    # init child processes
    processes = [Process(target=mp_worker, args=(queue,)) for _ in range(q_size)]

    # call the event loop
    loop = asyncio.get_event_loop()

    for i in id_list:
        # enqueue the task by the event loop, so it would not block
        loop.run_in_executor(None, queue.put, i)

    # start the processes after the queue is filled asynchronously
    for process in processes:
        process.start()

    # wait for the queue to empty (all tasks are done)
    while queue.empty() is False:
        sleep(10)

    # terminate all processes
    for process in processes:
        process.terminate()
    
    # gracefully shutdown
    print('The program is done')

if __name__ == '__main__':
    asyncio.run(main())


"""
output:
root@d4e38f17331d:/code# python ./q_asyncio.py 
Processing: 1
Processing: 2
Processing: 3
Processing: 5
Processing: 4
Done: 2
Processing: 8
Done: 5
Processing: 7
Done: 4
Processing: 6
Done: 1
Processing: 9
Done: 8
Done: 3
Processing: 10
Processing: 11
Done: 6
Processing: 12
Done: 9
Processing: 13
Done: 11
Processing: 14
Done: 7
Processing: 15
Done: 10
Done: 12
Done: 13
Done: 14
Done: 15
The program is done
"""