Tags: #python #tornado #sync #async #requests #urllib
import functools
def force_async(fn):
'''
turns a sync function to async function using threads
'''
from concurrent.futures import ThreadPoolExecutor
import asyncio
pool = ThreadPoolExecutor()
@functools.wraps(fn)
def wrapper(*args, **kwargs):
future = pool.submit(fn, *args, **kwargs)
return asyncio.wrap_future(future) # make it awaitable
return wrapper
def force_sync(fn):
'''
turn an async function to sync function
'''
import asyncio
@functools.wraps(fn)
def wrapper(*args, **kwargs):
res = fn(*args, **kwargs)
if asyncio.iscoroutine(res):
return asyncio.get_event_loop().run_until_complete(res)
return res
return wrapper
The below code snippets demonstrate how to make synchronous code asynchronous using a threadpool (in this example it’s specifically handled within a tornado application).
If you’re using the requests http client, then this can be made asynchronous using the same threadpool technique, although it can be better to use use an external library (such as requests-futures) if you need to do something a little more complex. For example, being able to utilise the requests library’s Session
feature would need extra work, and so an external library can help with that.
There are other alternative libraries for working with the requests library too:
Notes: Grequests appears to be much faster than requests-threads, but brings monkey patching and additional problems with dependencies. Using
ThreadPoolExecutor
appears to be on par with Grequests performance.
But I feel requests-futures
is the best option as it uses the Python native concurrent.futures
for its implementation.
from app.threadpool import run_on_executor
def fetch_s3_body(s3_resource, bucket, obj_key):
"""
Fetch the object from S3 and return it as a byte array
"""
try:
obj = s3_resource.Object(bucket, obj_key).get()
return obj["Body"].read()
except ClientError as error:
logger.error("error fetching s3 object", key=obj_key, bucket=bucket, error=error)
metrics.incr("s3_object_fetch", tags={"status": "failed"})
return None
response = await run_on_executor(fetch_s3_body, s3_resource, bucket_name, object_key)
"""
You don't have to use a threadpool.
If your use case is simple enough, then just manually co-ordinate some threads.
"""
import threading
import urllib.request
threads = []
requests = ['https://www.integralist.co.uk', 'https://google.com']
def open_url(uri):
response = urllib.request.urlopen(uri, timeout=600)
print(response.read())
"""
import urllib.parse
import urllib.request
url = 'http://www.example.com'
values = {'foo' : 'bar'}
data = urllib.parse.urlencode(values)
data = data.encode('ascii') # data should be bytes
req = urllib.request.Request(url, data)
with urllib.request.urlopen(req) as response:
the_page = response.read()
"""
for uri in requests:
t = threading.Thread(target=open_url, args=(uri,))
t.start()
threads.append(t)
for thread in threads:
thread.join()
from concurrent.futures import ThreadPoolExecutor
from tornado import gen
from bf_rig import settings
THREAD_POOL = ThreadPoolExecutor(settings.get("pool_max_workers"))
"""
Using the ProcessPoolExecutor can be useful, in cases where memory usage
per request is very high (large response) and cycling the interpretor
is required to release memory back to OS.
"""
@gen.coroutine
def run_on_executor(*args, **kwargs):
"""
ThreadPoolExecutor doesn't work with native coroutines unfortunately.
It will require asyncio.wrap_future which is not much better than using tornado's decorators.
"""
result = yield THREAD_POOL.submit(*args, **kwargs)
raise gen.Return(result)