« Back to Index

Taken from “Python for Programmers” https://leanpub.com/pythonforprogrammers

View original Gist on GitHub

Parallelism.md

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor()

def slow_cpu_bound(secs): 
    time.sleep(secs)
    return "some slowly returned value"

def wrapper(secs):
    # run_in_executor returns a coroutine function
    return loop.run_in_executor(executor, slow_cpu_bound, secs)

async def main():
    resp1, resp2 = await asyncio.gather(wrapper(5), wrapper(5)) 
    print("{}, {}".format(resp1, resp2))

loop.run_until_complete(main())

In the above example we:

Now this example code is ultimately calling a function that sleeps for the specified number of seconds. So we call it twice with 5 both times, meaning the overall time would typically be ten seconds if we were using a single thread (as the thread would be blocked for five seconds, then again for five seconds).

Note: if you change this example to use ThreadPoolExecutor instead of ProcessPoolExecutor that is exactly what you’ll see: the overall execution time is ten seconds

Running this example reveals that the running time is actually only five seconds. This is because both tasks were started within a forked process and not a thread. If they were using threads, then they would have been restricted by the Python’s GIL.

The benefit of using the asyncio module to handle forked process execution is because it ties in nicely with the other asyncio functionality you are likely going to be already using. It’s much simpler at the point to use than the directly running the lower level subprocess module.

Now you might also be interested to see yet another way to do this (still using ProcessPoolExecutor but swapping asyncio.gather for asyncio.wait):

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor()

def slow_cpu_bound(secs):
    time.sleep(secs)
    return "another example"

async def main(executor):
  tasks = [
      loop.run_in_executor(executor, slow_cpu_bound, 5),
      loop.run_in_executor(executor, slow_cpu_bound, 5)
  ]
  completed, pending = await asyncio.wait(tasks)
  print(pending) # this would only show tasks that didn't complete
  results = [f.result() for f in completed]
  print('results: {!r}'.format(results))

loop.run_until_complete(main(executor))

One other interesting aspect of these ‘executors’ is that their abstract base class concurrent.futures.Executor defines specific functional behaviour such as a map function.

Which is an asynchronous equivalent to the standard library’s own map function.