Nikolay Novik
PyConUA 2016
aiomysql, aioobc, aiogibson
aiohttp_debugtoolbar, aiobotocore,
aiohttp_mako, aiohttp_admin
SOA implies a lot of network communications.
from django.http import HttpResponse
def my_view(request):
# blocks thread
r = requests.get('http://graph.facebook.com/v2.5/{uid}')
data = r.json()
# ...
return HttpResponse(status=200)
from aiohttp import web
async def my_view(request):
session = request.app['session']
# context switch here
r = await session.get('http://graph.facebook.com/v2.5/{uid}')
data = await r.json()
return web.Response(status=200)
Amazon S3 API could return response in over 9000s! In async case
only one response blocked, in sync - entire thread.
Most popular and convenient asyncio mode in the wild.
import asyncio
from pyodbc import connect
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=4)
async def test_example():
dsn = 'Driver=SQLite;Database=sqlite.db'
conn = await loop.run_in_executor(executor, connect, dsn)
cursor = await loop.run_in_executor(executor, conn.cursor)
conn = await loop.run_in_executor(executor, cursor.execute,
'SELECT 42;')
loop.run_until_complete(test_example())
Easy to use but a bit strange interface,
default executor has 4 worker threads.
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=3)
def is_prime(n):
if n % 2 == 0: return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False
return True
async def go():
result = await loop.run_in_executor(
executor, is_prime, 112272535095293)
loop.run_until_complete(go(loop, executor))
ProcessPoolExecutor
has same interface as
ThreadPoolExecutor
fork()
Application may spin event loop at will, to perform IO heavy computations.
import asyncio, aiohttp
async def fetch(session, url, loop):
async with session.get(url) as resp:
data = await resp.text()
def collect_data(url_list):
loop = asyncio.get_event_loop()
session = aiohttp.ClientSession(loop=loop)
coros = [fetch(sessiong, u, loop) for u in url_list]
data = loop.run_until_complete(asyncio.gather(*coros, loop=loop))
loop.run_until_complete(session.close())
loop.close()
return data
def main():
url_list = db.fetch_urls()
data = collect_data(url_list)
process(data)
Scraping or concurrent upload to external server are most
popular use cases.
import aioredis
from flask import Flask
app = Flask(__name__)
loop = asyncio.get_event_loop()
redis = loop.run_until_complete(aioredis.create_redis(
('localhost', 6379), loop=loop))
@app.route("/")
def hello():
value = loop.run_until_complete(redis.get('my-key'))
return "Hello {}!".format(value)
if __name__ == "__main__":
app.run()
Using coroutines inside sync code is not always good idea.
In this particular case it slows down database access.
Application may delegate IO heavy tasks to dedicated loop in separate thread.
import asyncio, functools
from threading import Thread, Event
class AioThread(Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.loop = None
self.event = Event()
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self.loop.call_soon(self.event.set)
self.loop.run_forever()
def add_task(self, coro):
fut = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
return fut
def finalize(self, timeout=None):
self._loop.call_soon_threadsafe(self._loop.stop)
self.join(timeout=timeout)
Make sure you have means to signal that loop has been
started, and you can finalize thread properly
def main():
aiothread = AioThread()
aiothread.start()
aiothread.event.wait()
loop = aiothread.loop
coro = asyncio.sleep(1, loop=loop)
future = aiothread.add_task(coro)
try:
result = future.result(timeout)
except asyncio.TimeoutError:
print('The coroutine took too long, cancelling the task')
future.cancel()
except Exception as exc:
print('The coroutine raised an exception: {!r}'.format(exc))
future.cancel()
in main thread
will cancel asyncio coroutine in background thread.
class TwistedConnection(Connection):
@classmethod
def initialize_reactor(cls):
cls._loop = TwistedLoop()
def add_connection(self):
# ...
def client_connection_made(self):
# ...
def handle_read(self):
self.process_io_buffer()
def push(self, data):
reactor.callFromThread(self.connector.transport.write, data)
Cassandra's python driver
is sync but connection objects spin event loop, in this case twisted's reactor
queue = janus.Queue(loop=loop)
await queue.async_q.get()
queue.sync_q.put(i)
import asyncio, janus
loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
def threaded(sync_q):
for i in range(100):
sync_q.put(i)
sync_q.join()
async def async_coro(async_q):
for i in range(100):
val = await async_q.get()
async_q.task_done()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
loop.run_until_complete(async_coro(queue.async_q))
loop.run_until_complete(fut)
janus
has two APIs for same queue: sync like
queue.Queue
and async like asyncio.Queue
import asyncio
async def go(loop):
future = asyncio.Future(loop=loop)
future.set_result(None)
await asyncio.sleep(3.0, loop=loop)
await future
print("foo")
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
loop.close()
loop
argument is required for most asyncio APIs.
https://groups.google.com/forum/#!msg/python-tulip/hr1kPZfMX8U/9uqdlbRuRsoJIt would really be a pity if Tulip repeated our billion-dollar mistake [global reactor] ;-)
run_in_executor,
create_subprocess_exec, create_task
import asyncio, signal
is_working = True
async def do_work(loop):
while is_working:
await asyncio.sleep(1, loop=loop)
def signal_handler(loop):
loop.remove_signal_handler(signal.SIGTERM)
is_working = False
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, signal_handler, loop)
loop.run_until_complete(do_work(loop))
asyncio
will warn you with bunch of tracebacks
if you do not do proper shutdown.
loop = asyncio.get_event_loop()
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8080)
srv = loop.run_until_complete(f)
def shutdown(loop)
loop.remove_signal_handler(signal.SIGTERM)
loop.stop()
loop.add_signal_handler(signal.SIGTERM, shutdown, loop)
loop.run_forever()
srv.close() # finish socket listening
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.shutdown()) # close websockets
loop.run_until_complete(handler.finish_connections(60.0))
loop.run_until_complete(app.cleanup()) # doc registered cleanups
loop.close()
Now you can be sure that all requests are safe and served
and new requests is not accepted
aiohttp.ClientSession
is your friend
import asyncio
import aiohttp
async def go(loop):
session = aiohttp.ClientSession(loop=loop)
async with session.get('http://ua.pycon.org') as resp:
data = await resp.text()
print(data)
session.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
Connection pooling helps to save on expensive connection
creation.
async def init(loop):
# setup application and extensions
app = web.Application(loop=loop)
# create connection to the database
pg = await init_postgres(conf['postgres'], loop)
async def close_pg(app):
pg.close()
await pg.wait_closed()
app.on_cleanup.append(close_pg)
# ...
aiohttp
has handy signal on_cleanup for
database connections, as well as on_shutdown for
websockets
class Foo:
def __init__(self):
self._task = asyncio.create_task(self._do_task(),
loop=self._loop)
async def _do_task():
await self.set('foo', 'bar')
await self.set('baz', 'zap')
async def _stop_do_task(self):
await self._task
May be very tricky! Same approach as thread finalization
should be employed or task.cancel()
@asyncio.coroutine
def coro():
raise StopIteration('batman')
@asyncio.coroutine
def coro2():
i = iter(range(2))
next(i)
next(i)
next(i) # raise StopIteration
return 'finish'
@asyncio.coroutine
def go():
data = yield from coro() # batman
data = yield from coro2() # None
Fixed in python 3.5
@asyncio.coroutine
def foo(loop):
yield from asyncio.sleep(1, loop=loop)
def bar(loop):
yield from asyncio.sleep(1, loop=loop)
loop.run_until_complete(foo(loop))
loop.run_until_complete(bar(loop))
Fixed in python 3.5 with await syntax
async def foo(loop):
await update1()
await update2()
task = loop.ensure_future(foo(loop))
task.cancel()
Fixed in python 3.5 with await syntax