« Back to Index

[Convert synchronous external Python code into asynchronous code]

View original Gist on GitHub

Tags: #python #tornado #sync #async #requests #urllib

1. Quicker Implementation.py

import functools


def force_async(fn):
    '''
    turns a sync function to async function using threads
    '''
    from concurrent.futures import ThreadPoolExecutor
    import asyncio
    pool = ThreadPoolExecutor()

    @functools.wraps(fn)
    def wrapper(*args, **kwargs):
        future = pool.submit(fn, *args, **kwargs)
        return asyncio.wrap_future(future)  # make it awaitable

    return wrapper


def force_sync(fn):
    '''
    turn an async function to sync function
    '''
    import asyncio

    @functools.wraps(fn)
    def wrapper(*args, **kwargs):
        res = fn(*args, **kwargs)
        if asyncio.iscoroutine(res):
            return asyncio.get_event_loop().run_until_complete(res)
        return res

    return wrapper

Convert Sync external code into Async.md

The below code snippets demonstrate how to make synchronous code asynchronous using a threadpool (in this example it’s specifically handled within a tornado application).

If you’re using the requests http client, then this can be made asynchronous using the same threadpool technique, although it can be better to use use an external library (such as requests-futures) if you need to do something a little more complex. For example, being able to utilise the requests library’s Session feature would need extra work, and so an external library can help with that.

There are other alternative libraries for working with the requests library too:

Notes: Grequests appears to be much faster than requests-threads, but brings monkey patching and additional problems with dependencies. Using ThreadPoolExecutor appears to be on par with Grequests performance.

But I feel requests-futures is the best option as it uses the Python native concurrent.futures for its implementation.

S3 Example.py

from app.threadpool import run_on_executor

def fetch_s3_body(s3_resource, bucket, obj_key):
    """
    Fetch the object from S3 and return it as a byte array
    """
    try:
        obj = s3_resource.Object(bucket, obj_key).get()
        return obj["Body"].read()
    except ClientError as error:
        logger.error("error fetching s3 object", key=obj_key, bucket=bucket, error=error)
        metrics.incr("s3_object_fetch", tags={"status": "failed"})
        return None

response = await run_on_executor(fetch_s3_body, s3_resource, bucket_name, object_key)

Threading with manual co-ordination.py

"""
You don't have to use a threadpool. 

If your use case is simple enough, then just manually co-ordinate some threads.
"""

import threading
import urllib.request

threads = []
requests = ['https://www.integralist.co.uk', 'https://google.com']

def open_url(uri):
    response = urllib.request.urlopen(uri, timeout=600)
    print(response.read())
    
    """
    import urllib.parse
    import urllib.request

    url = 'http://www.example.com'
    values = {'foo' : 'bar'}

    data = urllib.parse.urlencode(values)
    data = data.encode('ascii') # data should be bytes
    req = urllib.request.Request(url, data)
    with urllib.request.urlopen(req) as response:
	    the_page = response.read()
    """

for uri in requests:
    t = threading.Thread(target=open_url, args=(uri,))
    t.start()
    threads.append(t)

for thread in threads:
    thread.join()

Threadpool.py

from concurrent.futures import ThreadPoolExecutor

from tornado import gen
from bf_rig import settings


THREAD_POOL = ThreadPoolExecutor(settings.get("pool_max_workers"))

"""
Using the ProcessPoolExecutor can be useful, in cases where memory usage 
per request is very high (large response) and cycling the interpretor 
is required to release memory back to OS.
"""


@gen.coroutine
def run_on_executor(*args, **kwargs):
    """
    ThreadPoolExecutor doesn't work with native coroutines unfortunately.
    It will require asyncio.wrap_future which is not much better than using tornado's decorators.
    """
    result = yield THREAD_POOL.submit(*args, **kwargs)
    raise gen.Return(result)