Python multiprocessing queue
📅2023-10-17🧐115
Yesterday, I had a coding challenge in an interview. The question was to implement a multiprocessing program that allows creating a queue and gracefully shutting down the process after all tasks have been completed.
Since the biggest challenge is how to enqueue without blocking, there are three ways to implement it.
- Using main thread perform enqueue
from multiprocessing import Process, Queue
from random import randint
from queue import Empty
from time import sleep
id_list = list(range(1, 16))
q_size = 5
queue = Queue(maxsize=q_size)
def mp_worker(queue):
while True:
try:
item = queue.get()
print('Processing: {}'.format(item))
sleep(randint(1, 2)) # Simulate different processing time
print('Done: {}'.format(item))
except Empty:
sleep(1)
if __name__ == '__main__':
# init child processes
processes = [Process(target=mp_worker, args=(queue,)) for _ in range(q_size)]
# start the processes, because the following loop will block
for process in processes:
process.start()
# start feeding the queue
while id_list:
# since the queue can be blocked, we check if it's full
if queue.full() is False:
item = id_list.pop(0)
queue.put(item)
print('Queued: {}'.format(item))
else:
pass
# wait for the queue to empty (all tasks are done)
while queue.empty() is False:
sleep(10)
# terminate all processes
for process in processes:
process.terminate()
# gracefully shutdown
print('The program is done')
## Output:
"""
root@d4e38f17331d:/code# python ./q.py
Queued: 1
Processing: 1
Queued: 2
Queued: 3
Queued: 4
Processing: 2
Queued: 5
Queued: 6
Queued: 7
Processing: 3
Queued: 8
Queued: 9
Processing: 4
Queued: 10
Processing: 5
Done: 2
Processing: 6
Queued: 11
Done: 5
Processing: 7
Queued: 12
Done: 1
Processing: 8
Queued: 13
Done: 3
Processing: 9
Queued: 14
Done: 4
Processing: 10
Queued: 15
Done: 7
Processing: 11
Done: 6
Done: 9
Processing: 12
Processing: 13
Done: 8
Processing: 14
Done: 10
Processing: 15
Done: 11
Done: 14
Done: 12
Done: 13
Done: 15
The program is done
"""
2. Using a spare child process to enqueue
from multiprocessing import Process, Queue
from random import randint
from queue import Empty
from time import sleep
id_list = list(range(1, 16))
q_size = 5
queue = Queue(maxsize=q_size)
def mp_worker(queue):
while True:
try:
item = queue.get()
print('Processing: {}'.format(item))
sleep(randint(1, 2)) # Simulate different processing time
print('Done: {}'.format(item))
except Empty:
sleep(1)
def mp_queue_putter(queue):
"""
since put can block the main process, we put it in a separate process
"""
for item in id_list:
queue.put(item)
print('Queued: {}'.format(item))
if __name__ == '__main__':
# init child processes (queue putter)
queue_putter = Process(target=mp_queue_putter, args=(queue,))
# start to feed the queue
queue_putter.start()
# init child processes (workers)
processes = [Process(target=mp_worker, args=(queue,)) for _ in range(q_size)]
# start the processes
for process in processes:
process.start()
# wait for the queue to empty (all tasks are done)
while queue.empty() is False:
sleep(10)
# terminate all processes (workers)
for process in processes:
process.terminate()
# terminate the queue putter
queue_putter.terminate()
# gracefully shutdown
print('The program is done')
## Output:
"""
root@d4e38f17331d:/code# python ./q_enqueue_mp.py
Queued: 1
Processing: 1
Queued: 2
Queued: 3
Queued: 4
Queued: 5
Queued: 6
Queued: 7
Processing: 2
Processing: 3
Queued: 8
Processing: 4
Queued: 9
Processing: 5
Queued: 10
Done: 1
Processing: 6
Queued: 11
Done: 2
Done: 3
Processing: 7
Queued: 12
Processing: 8
Done: 4
Queued: 13
Processing: 9
Done: 5
Queued: 14
Processing: 10
Queued: 15
Done: 6
Done: 7
Processing: 11
Processing: 12
Done: 8
Processing: 13
Done: 12
Processing: 14
Done: 9
Done: 10
Done: 13
Processing: 15
Done: 11
Done: 14
Done: 15
The program is done
"""
3. Using asyncio (eventloop) to enqueue
from multiprocessing import Process, Queue
from random import randint
from queue import Empty
from time import sleep
import asyncio
id_list = list(range(1, 16))
q_size = 5
queue = Queue(maxsize=q_size)
def mp_worker(queue):
while True:
try:
item = queue.get()
print('Processing: {}'.format(item))
sleep(randint(1, 2)) # Simulate different processing time
print('Done: {}'.format(item))
except Empty:
sleep(1)
async def main():
# init child processes
processes = [Process(target=mp_worker, args=(queue,)) for _ in range(q_size)]
# call the event loop
loop = asyncio.get_event_loop()
for i in id_list:
# enqueue the task by the event loop, so it would not block
loop.run_in_executor(None, queue.put, i)
# start the processes after the queue is filled asynchronously
for process in processes:
process.start()
# wait for the queue to empty (all tasks are done)
while queue.empty() is False:
sleep(10)
# terminate all processes
for process in processes:
process.terminate()
# gracefully shutdown
print('The program is done')
if __name__ == '__main__':
asyncio.run(main())
"""
output:
root@d4e38f17331d:/code# python ./q_asyncio.py
Processing: 1
Processing: 2
Processing: 3
Processing: 5
Processing: 4
Done: 2
Processing: 8
Done: 5
Processing: 7
Done: 4
Processing: 6
Done: 1
Processing: 9
Done: 8
Done: 3
Processing: 10
Processing: 11
Done: 6
Processing: 12
Done: 9
Processing: 13
Done: 11
Processing: 14
Done: 7
Processing: 15
Done: 10
Done: 12
Done: 13
Done: 14
Done: 15
The program is done
"""