Nikolay Novik
Minsk Python Meetup Oct 2016
aiomysql, aioobc, aiogibsonaiohttp_debugtoolbar, aiobotocore,
aiohttp_mako, aiohttp_admin

SOA implies a lot of network communications.
from django.http import HttpResponse
def my_view(request):
# blocks thread
r = requests.get('https://s3-us-west-2.amazonaws.com/dataintake/{uid}')
data = r.json()
# ...
return HttpResponse(status=200)
from aiohttp import web
async def my_view(request):
session = request.app['session']
# context switch here
r = await session.get('https://s3-us-west-2.amazonaws.com/dataintake/{uid}')
data = await r.json()
return web.Response(status=200)
Amazon S3 API could return response in over 9000s! In async case
only one response blocked, in sync - entire thread.

aiopg, aiomysql, aioredis in any async framework!

Most popular and convenient asyncio mode in the wild.
import asyncio
from pyodbc import connect
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=4)
async def test_example():
dsn = 'Driver=SQLite;Database=sqlite.db'
conn = await loop.run_in_executor(executor, connect, dsn)
cursor = await loop.run_in_executor(executor, conn.cursor)
conn = await loop.run_in_executor(executor, cursor.execute,
'SELECT 42;')
loop.run_until_complete(test_example())
Easy to use but a bit strange interface,
default executor has 4 worker threads.
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=3)
def is_prime(n):
if n % 2 == 0: return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False
return True
async def go():
result = await loop.run_in_executor(
executor, is_prime, 112272535095293)
loop.run_until_complete(go(loop, executor))
ProcessPoolExecutor has same interface as
ThreadPoolExecutor
fork()Application may spin event loop at will, to perform IO heavy computations.
import asyncio, aiohttp
async def fetch(session, url, loop):
async with session.get(url) as resp:
data = await resp.text()
def collect_data(url_list):
loop = asyncio.get_event_loop()
session = aiohttp.ClientSession(loop=loop)
coros = [fetch(sessiong, u, loop) for u in url_list]
data = loop.run_until_complete(asyncio.gather(*coros, loop=loop))
loop.run_until_complete(session.close())
loop.close()
return data
def main():
url_list = db.fetch_urls()
data = collect_data(url_list)
process(data)
Scraping or concurrent upload to external server are most
popular use cases.
import aioredis
from flask import Flask
app = Flask(__name__)
loop = asyncio.get_event_loop()
redis = loop.run_until_complete(aioredis.create_redis(
('localhost', 6379), loop=loop))
@app.route("/")
def hello():
value = loop.run_until_complete(redis.get('my-key'))
return "Hello {}!".format(value)
if __name__ == "__main__":
app.run()
Using coroutines inside sync code is not always good idea.
In this particular case it slows down database access.
Application may delegate IO heavy tasks to dedicated loop in separate thread.
import asyncio, functools
from threading import Thread, Event
class AioThread(Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.loop, self.event = None, Event()
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self.loop.call_soon(self.event.set)
self.loop.run_forever()
def add_task(self, coro):
fut = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
return fut
def finalize(self):
self._loop.call_soon_threadsafe(self._loop.stop)
self.join()
Make sure you have means to signal that loop has been
started, and you can finalize thread properly
def main():
aiothread = AioThread()
aiothread.start()
aiothread.event.wait()
loop = aiothread.loop
coro = asyncio.sleep(1, loop=loop)
future = aiothread.add_task(coro)
try:
result = future.result(timeout)
except asyncio.TimeoutError:
print('The coroutine took too long, cancelling the task')
future.cancel()
except Exception as exc:
print('The coroutine raised an exception: {!r}'.format(exc))
Make sure you wait for loop to start. Calling future.cancel()
in main thread will cancel asyncio coroutine in background thread.
class TwistedConnection(Connection):
@classmethod
def initialize_reactor(cls):
cls._loop = TwistedLoop()
def add_connection(self):
# ...
def client_connection_made(self):
# ...
def handle_read(self):
self.process_io_buffer()
def push(self, data):
reactor.callFromThread(self.connector.transport.write, data)
Cassandra's python driver
is sync but connection objects spin event loop, in this case twisted's reactor

queue = janus.Queue(loop=loop)
await queue.async_q.get()
queue.sync_q.put(i)
import asyncio, janus
loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
def threaded(sync_q):
for i in range(100):
sync_q.put(i)
sync_q.join()
async def async_coro(async_q):
for i in range(100):
val = await async_q.get()
async_q.task_done()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
loop.run_until_complete(async_coro(queue.async_q))
loop.run_until_complete(fut)
janus has two APIs for same queue: sync like
queue.Queue and async like asyncio.Queue

Testing and debugging other big advantage of asyncio!
def test_raw_insert_with_executemany(self):
async def go():
conn = await self.connect()
await conn.execute(sql)
self.loop.run_until_complete(go())
@pytest.mark.run_loop
async def test_raw_insert_with_executemany(self):
conn = await self.connect()
await conn.execute(sql)
You need event loop to spin your coroutines.
from aiohttp import web
async def pong(request):
import pdb; pdb.set_trace()
return web.Response(body=b'pong')
app_pong = web.Application(loop=loop)
app_pong.router.add_route('GET', '/', pong)
async def ping(request):
import pdb; pdb.set_trace()
async with session.get("http://127.0.0.1:8080/pong") as resp:
data = await resp.text()
return web.Response(body=b'OK')
app_ping = web.Application(loop=loop)
app_ping.router.add_route('GET', '/', ping)
Both aiohttp applications and client work in same thread!
def main(loop, app_ping, app_pong):
run = loop.run_until_complete
handler1 = app_ping.make_handler()
srv1 = run(loop.create_server(handler1, host, unused_port))
handler2 = app_pong.make_handler()
srv2 = run(loop.create_server(
handler2, host, other_unused_port))
Run two or more servers in same thread is trivial task.
@pytest.mark.run_loop
async def test_two_apps(loop, ping_port, pong_port):
url = "http://127.0.0.1:{}/pong".format(ping_port)
async with session.get(url) as resp:
data = await resp.text()
assert data
Client works with same event loop and thread as other
aiohttp apps
import asyncio, signal
is_working = True
async def do_work(loop):
while is_working:
await asyncio.sleep(1, loop=loop)
def signal_handler(loop):
loop.remove_signal_handler(signal.SIGTERM)
is_working = False
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, signal_handler, loop)
loop.run_until_complete(do_work(loop))
asyncio will warn you with bunch of tracebacks
if you do not do proper shutdown.
def shutdown(loop):
loop.remove_signal_handler(signal.SIGTERM)
loop.stop()
loop.add_signal_handler(signal.SIGTERM, shutdown, loop)
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8080)
srv = loop.run_until_complete(f)
loop.run_forever()
# kill accepting sockets
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(handler.finish_connections(60.0))
loop.close()
Now you can be sure that all requests are safe and served
and new requests is not accepted
async def init(loop):
# setup application and extensions
app = web.Application(loop=loop)
# create connection to the database
pg = await init_postgres(conf['postgres'], loop)
async def close_pg(app):
pg.close()
await pg.wait_closed()
app.on_cleanup.append(close_pg)
# ...
aiohttp has handy signal on_cleanup for
database connections, as well as on_shutdown for
websockets
@asyncio.coroutine
def coro():
raise StopIteration('batman')
@asyncio.coroutine
def coro2():
i = iter(range(2))
next(i)
next(i)
next(i) # raise StopIteration
return 'finish'
@asyncio.coroutine
def go():
data = yield from coro() # batman
data = yield from coro2() # None
Raising StopIteration is same as returning
from coroutine, so this could lead to hard of find bugs.
from __future__ import generator_stop will raise
RuntimeError instead
def foo(loop):
yield from asyncio.sleep(1, loop=loop)
@asyncio.coroutine
def bar(loop):
yield from asyncio.sleep(1, loop=loop)
async def zap(loop):
await asyncio.sleep(1, loop=loop)
loop.run_until_complete(foo(loop))
loop.run_until_complete(bar(loop))
loop.run_until_complete(zap(loop))
All three examples work as expeted, but asyncio will not track
first generator as coroutine as result will NOT complain
if you forget to yield it.
js/html but have to
http://127.0.0.1:9000/admin
Supported Databases out of the box
ODBC databases (SQLite, MSSQL, Oracle) coming soon
usingaioodbc
Python 3.5+ - because async/await stuffaiohttp - as platform and REST frameworktrafaret - provides validation and model descriptionpython-dateutil - for simpler datetime manipulationaiohttp_jinja2 - for basic template rendering{aiomysql aiopg sqlalchemy motor} as means of database accessng-admin is frontend for aiohttp_admin
config.js
question.editionView()
.title('Edit question')
.actions(['list', 'show', 'delete'])
.fields([
nga.field('id')
.editable(false)
.label('id'),
question.creationView().fields(),
nga.field('choice', 'referenced_list')
.targetEntity(nga.entity('choice'))
.targetReferenceField('question_id')
.targetFields([
nga.field('id').isDetailLink(true),
nga.field('votes').label('Votes'),
nga.field('choice_text').label('Choise')
])
.sortField('votes')
.sortDir('DESC')
.listActions(['edit']),
]);