Patterns for asyncio applications

Nikolay Novik

https://github.com/jettify

Minsk Python Meetup Oct 2016

I am ...

Poll: you and web frameworks
  1. I am using asyncio/aiohttp extensively
  2. I am using Twisted, Tornado, gevent etc. extensively
  3. I think async programming is kinda cool
Agenda
  1. Motivation, what problem does asynchronous programming solve?
  2. Asyncio application design choices
  3. Tips and tricks and things to consider in your app
  4. Asyncio pitfalls
Motivation. Why we might want to write asynchronous code?
  • One can do useful work instead of blocking
  • Simpler code due to explicit context switches
  • Idle protocols (websockets) work out of box
  • Encourage good development practices
  • Bunch of shiny new frameworks
Also we do SOA (microservices) this days...

SOA implies a lot of network communications.

Sync does not scale (TM)

from django.http import HttpResponse
def my_view(request):
    # blocks thread
    r = requests.get('https://s3-us-west-2.amazonaws.com/dataintake/{uid}')
    data = r.json()
    # ...
    return HttpResponse(status=200)
                    


from aiohttp import web
async def my_view(request):
    session = request.app['session']
    # context switch here
    r = await session.get('https://s3-us-west-2.amazonaws.com/dataintake/{uid}')
    data = await r.json()
    return web.Response(status=200)
                    
Amazon S3 API could return response in over 9000s! In async case only one response blocked, in sync - entire thread.
Why asyncio? One event loop to rule them all!

  • Tornado and Twisted support asyncio event loop
  • Just imaging: you can use aiopg, aiomysql, aioredis in any async framework!
asyncio is mature framework

Library is used by big companies like Facebook and WG
Ultra fast implementation of asyncio event loop on top of libuv

asyncio could be very fast https://github.com/MagicStack/uvloop
Asyncio application design choices
  • Event loop are spins in main thread, it schedules blocking tasks using thread pool.
  • Embedded event loop, main thread gives control to the event loop for finite amount of time, and then executes regular sync code.
  • Sync code executed in main thread, but event loop spinning in separate thread.
Asyncio is in main thread

Most popular and convenient asyncio mode in the wild.

Asyncio is in main thread. Blocking calls are in ThreadPool

import asyncio
from pyodbc import connect

loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=4)
async def test_example():
    dsn = 'Driver=SQLite;Database=sqlite.db'
    conn = await loop.run_in_executor(executor, connect, dsn)
    cursor = await loop.run_in_executor(executor, conn.cursor)
    conn = await loop.run_in_executor(executor, cursor.execute,
                                      'SELECT 42;')

loop.run_until_complete(test_example())
					
Easy to use but a bit strange interface, default executor has 4 worker threads.
ThreadPool notes
  • Convenient workaround for blocking calls
  • No way to kill thread if task stuck inside
  • C extensions could consumer a lot of virtual memory due to thread arenas, may cause problems with docker/yarn
  • Pool size in most cases should be proportional to number of cores of machine that hosts resource with blocking client
Asyncio is in main thread. Blocking code in ProcessPool


loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=3)

def is_prime(n):
    if n % 2 == 0: return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False
    return True

async def go():
    result = await loop.run_in_executor(
        executor, is_prime, 112272535095293)
loop.run_until_complete(go(loop, executor))

					
ProcessPoolExecutor has same interface as ThreadPoolExecutor
ProcessPool Notes
  • Processes easy to terminate
  • Processes consume a lot of memory
  • IPC is expensive because of serialization issues
  • Process should be created and warmed as soon as possible in order not to copy memory due to fork()
Embedded Asyncio loop

Application may spin event loop at will, to perform IO heavy computations.

Embedded Asyncio loop. Example

import asyncio, aiohttp

async def fetch(session, url, loop):
    async with session.get(url) as resp:

        data = await resp.text()

def collect_data(url_list):
    loop = asyncio.get_event_loop()

    session = aiohttp.ClientSession(loop=loop)
    coros = [fetch(sessiong, u, loop) for u in url_list]

    data = loop.run_until_complete(asyncio.gather(*coros, loop=loop))
    loop.run_until_complete(session.close())
    loop.close()
    return data

def main():
    url_list = db.fetch_urls()
    data = collect_data(url_list)
    process(data)
					
Scraping or concurrent upload to external server are most popular use cases.
Do not try this at home!

import aioredis
from flask import Flask
app = Flask(__name__)

loop = asyncio.get_event_loop()
redis = loop.run_until_complete(aioredis.create_redis(
    ('localhost', 6379), loop=loop))

@app.route("/")
def hello():
    value = loop.run_until_complete(redis.get('my-key'))
    return "Hello {}!".format(value)

if __name__ == "__main__":
    app.run()
					
Using coroutines inside sync code is not always good idea. In this particular case it slows down database access.
Asyncio loop has own separate thread

Application may delegate IO heavy tasks to dedicated loop in separate thread.

Asyncio loop has own separate thread. Example

import asyncio, functools
from threading import Thread, Event

class AioThread(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.loop, self.event = None, Event()

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self._loop)
        self.loop.call_soon(self.event.set)
        self.loop.run_forever()

    def add_task(self, coro):
        fut = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
        return fut
    def finalize(self):
        self._loop.call_soon_threadsafe(self._loop.stop)
        self.join()
					
Make sure you have means to signal that loop has been started, and you can finalize thread properly
Communication with background event loop

def main():
    aiothread = AioThread()
    aiothread.start()
    aiothread.event.wait()

    loop = aiothread.loop
    coro = asyncio.sleep(1, loop=loop)
    future = aiothread.add_task(coro)
    try:
        result = future.result(timeout)
    except asyncio.TimeoutError:
        print('The coroutine took too long, cancelling the task')
        future.cancel()
    except Exception as exc:
        print('The coroutine raised an exception: {!r}'.format(exc))
					
Make sure you wait for loop to start. Calling future.cancel() in main thread will cancel asyncio coroutine in background thread.
Real world example: Cassandra python driver

class TwistedConnection(Connection):

    @classmethod
    def initialize_reactor(cls):
        cls._loop = TwistedLoop()

    def add_connection(self):
        # ...
    def client_connection_made(self):
        # ...

    def handle_read(self):
        self.process_io_buffer()

    def push(self, data):
        reactor.callFromThread(self.connector.transport.write, data)
					
Cassandra's python driver is sync but connection objects spin event loop, in this case twisted's reactor
Bidirectional sync/async communication. Janus queue


queue = janus.Queue(loop=loop)
await queue.async_q.get()
queue.sync_q.put(i)
					
Janus queue example

import asyncio, janus

loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)

def threaded(sync_q):
    for i in range(100):
        sync_q.put(i)
    sync_q.join()

async def async_coro(async_q):
    for i in range(100):
        val = await async_q.get()
        async_q.task_done()

fut = loop.run_in_executor(None, threaded, queue.sync_q)
loop.run_until_complete(async_coro(queue.async_q))
loop.run_until_complete(fut)
					
janus has two APIs for same queue: sync like queue.Queue and async like asyncio.Queue
Testing asyncio application

Testing and debugging other big advantage of asyncio!

  • Coroutines are debuggable thanks to asyncio
  • It is possible to prototype and tests all services in one thread
  • Functional tests almost as cheap as unit tests
Basic asyncio tests

def test_raw_insert_with_executemany(self):
    async def go():
        conn = await self.connect()
        await conn.execute(sql)
    self.loop.run_until_complete(go())
                    


@pytest.mark.run_loop
async def test_raw_insert_with_executemany(self):
    conn = await self.connect()
    await conn.execute(sql)
                    
You need event loop to spin your coroutines.
Ok. Show me that trick with two servers?

from aiohttp import web

async def pong(request):
    import pdb; pdb.set_trace()
    return web.Response(body=b'pong')

app_pong = web.Application(loop=loop)
app_pong.router.add_route('GET', '/', pong)
                    


async def ping(request):
    import pdb; pdb.set_trace()
    async with session.get("http://127.0.0.1:8080/pong") as resp:
        data = await resp.text()
    return web.Response(body=b'OK')

app_ping = web.Application(loop=loop)
app_ping.router.add_route('GET', '/', ping)

                    
Both aiohttp applications and client work in same thread!
Spin two aiohttp servers in one loop

def main(loop, app_ping, app_pong):
    run = loop.run_until_complete

    handler1 = app_ping.make_handler()
    srv1 = run(loop.create_server(handler1, host, unused_port))

    handler2 = app_pong.make_handler()
    srv2 = run(loop.create_server(
        handler2, host, other_unused_port))
					
Run two or more servers in same thread is trivial task.
Test case for two apps in same thread

@pytest.mark.run_loop
async def test_two_apps(loop, ping_port, pong_port):
    url = "http://127.0.0.1:{}/pong".format(ping_port)
    async with session.get(url) as resp:
        data = await resp.text()
        assert data

					
Client works with same event loop and thread as other aiohttp apps
Asyncio tips, tricks and pitfalls
Asyncio graceful shutdown

import asyncio, signal

is_working = True
async def do_work(loop):
    while is_working:
        await asyncio.sleep(1, loop=loop)

def signal_handler(loop):
    loop.remove_signal_handler(signal.SIGTERM)
    is_working = False

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, signal_handler, loop)
loop.run_until_complete(do_work(loop))
					
asyncio will warn you with bunch of tracebacks if you do not do proper shutdown.
Graceful shutdown aiohttp edition


def shutdown(loop):
    loop.remove_signal_handler(signal.SIGTERM)
    loop.stop()

loop.add_signal_handler(signal.SIGTERM, shutdown, loop)

handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8080)
srv = loop.run_until_complete(f)

loop.run_forever()

# kill accepting sockets
srv.close()
loop.run_until_complete(srv.wait_closed())

loop.run_until_complete(handler.finish_connections(60.0))
loop.close()
                    
Now you can be sure that all requests are safe and served and new requests is not accepted
Be a good citizen! Shutdown db connection pools in your aiohttp app

async def init(loop):
    # setup application and extensions
    app = web.Application(loop=loop)

    # create connection to the database
    pg = await init_postgres(conf['postgres'], loop)

    async def close_pg(app):
        pg.close()
        await pg.wait_closed()

    app.on_cleanup.append(close_pg)
    # ...
					
aiohttp has handy signal on_cleanup for database connections, as well as on_shutdown for websockets
Keep an eye on StopIteration in python 3.4

@asyncio.coroutine
def coro():
    raise StopIteration('batman')

@asyncio.coroutine
def coro2():
    i = iter(range(2))
    next(i)
    next(i)
    next(i)  # raise StopIteration
    return 'finish'

@asyncio.coroutine
def go():
    data = yield from coro()  # batman
    data = yield from coro2() # None
					
Raising StopIteration is same as returning from coroutine, so this could lead to hard of find bugs.

from __future__ import generator_stop will raise RuntimeError instead

Missed coroutine decorators in python 3.4

def foo(loop):
    yield from asyncio.sleep(1, loop=loop)

@asyncio.coroutine
def bar(loop):
    yield from asyncio.sleep(1, loop=loop)

async def zap(loop):
    await asyncio.sleep(1, loop=loop)

loop.run_until_complete(foo(loop))
loop.run_until_complete(bar(loop))
loop.run_until_complete(zap(loop))

					
All three examples work as expeted, but asyncio will not track first generator as coroutine as result will NOT complain if you forget to yield it.

Thank you!

and check out aio-libs!

https://github.com/aio-libs


slides https://jettify.github.io/minskpy

Introducing aiohttp_admin

lightning talk
Motivation. Why we might want to have "automatic" admin interface?
  • For small web applications or micro services, where custom admin interface is overkill.
  • To give a manager something to play with while proper admin interface is not ready.
  • Could be solution if you absolutely hate to write a lot of js/html but have to
aiohttp_admin overview (Demo)
http://127.0.0.1:9000/admin
aiohttp_admin architecture overview
Database Layer

Supported Databases out of the box

ODBC databases (SQLite, MSSQL, Oracle) coming soon

using aioodbc
REST Layer
  • Python 3.5+ - because async/await stuff
  • aiohttp - as platform and REST framework
  • trafaret - provides validation and model description
  • python-dateutil - for simpler datetime manipulation
  • aiohttp_jinja2 - for basic template rendering
  • {aiomysql aiopg sqlalchemy motor} as means of database access
Frontend Layer

ng-admin is frontend for aiohttp_admin

  • ng-admin could be plugged to your RESTFul API to get a complete administration interface (datagrid, filters, multi-model relationships, dashboard) in no time.
  • Beyond simple CRUD, ng-admin lets you build sophisticated GUIs without getting in your way.
Example Edition View declaration
Declare edit views for entity in config.js

question.editionView()
    .title('Edit question')
    .actions(['list', 'show', 'delete'])
    .fields([
        nga.field('id')
        .editable(false)
        .label('id'),
        question.creationView().fields(),
        nga.field('choice', 'referenced_list')
            .targetEntity(nga.entity('choice'))
            .targetReferenceField('question_id')
            .targetFields([
                nga.field('id').isDetailLink(true),
                nga.field('votes').label('Votes'),
                nga.field('choice_text').label('Choise')
            ])
            .sortField('votes')
            .sortDir('DESC')
            .listActions(['edit']),
    ]);