How to deal with blocking code within asyncio event loop

Nikolay Novik

http://github.com/jettify

## I am ... * **Software Engineer**: at DataRobot Ukraine * **Github**: http://github.com/jettify * **My Projects**: + database clients: - `aiomysql`, `aioobc`, `aiogibson` + web and etc: - `aiohttp_debugtoolbar`, `aiobotocore`, `aiohttp_mako`, `aiohttp_sse`, `aiogearman`, `aiomysql_replicatoin`
## Poll ##### You and asyncio: 1. I am using asyncio extensively 2. I am using Twisted, Tornado, gevent etc. extensively 3. I think async programming is kinda cool
## Asyncio * The asyncio project was officially launched with the release of Python 3.4 in March 2014. * Bare: almost no library * One year later, asyncio has a strong community writing libraries on top of it. But what to do when available libraries work in sync way, potentially blocking event loop?

Rules of Async Club

Rule #1

You do not block event loop

Rule #2

You never block event loop

### Blocking calls in third party libraries * Network IO - API wrappers - Database clients - Message queues * FileSystem IO * CPU

Debugging blocking calls tip

Set environment variable PYTHONASYNCIODEBUG=1

import asyncio
import time

loop = asyncio.get_event_loop()
loop.slow_callback_duration = 0.01

async def sleeper():
    time.sleep(0.1)  # we block here

loop.run_until_complete(sleeper())
					


Executing <Task finished coro=<sleeper() done, defined at
code/debug_example.py:9> result=None created at
/usr/local/lib/python3.5/asyncio/base_events.py:323>
took 0.102 seconds
					
### Approach #1 Is there any suitable library? Search asyncio compatible library on: 1. google ~ 98k results 2. pypi ~200 packages 3. asyncio wiki page: https://github.com/python/asyncio/wiki/ThirdParty 4. aio-libs: https://github.com/aio-libs
Third Party Libraries Pro Tip
Read the (f*g) source code of your libraries! Example of python code from OneDrive SDK

@asyncio.coroutine
def get_async(self):
    """Sends the GET request using an asyncio coroutine
    .... 
    """
    future = self._client._loop.run_in_executor(None,
                                                self.get)
    collection_response = yield from future
    return collection_response
					
Most of the time you want to do HTTP requests using event loop not thread pool.

Approach #2 Is REST API available?

Most hipsters databases use REST API as primary access method:

  • DynamoDB
  • Neo4j
  • Elasticsearch
  • HBase
  • HDFS
  • CouchDB
  • Riak
  • VoltDB
  • InfluxDB
  • ArangoDB

Easy to implement required subset of APIs.
REST Client Tip
aiohttp.ClientSession is your friend

import asyncio
import aiohttp

# carry the loop Luke!
loop = asyncio.get_event_loop()

async def go():
    session = aiohttp.ClientSession(loop=loop)
    async with session.get('http://python.org') as resp:
        data = await resp.text()
        print(data)
    session.close()

loop.run_until_complete(go())
					
Connection pooling helps to save on expensive connection creation. (PS: checkout new aiohttp 0.18.x release)

Approach #3 Is there simple text or binary protocol?

Example of databases and message queues with binary protocol:

  • redis
  • memcached
  • couchebase
  • grearman
  • beanstalkd
  • disque

Do not afraid to get your hands dirty.

Example: Simple binary protocol implementation

import asyncio, struct
from aiogibson import encode_command

async def read_from_connection(host, port, *, loop=None):
    reader, writer = await asyncio.open_connection(
    host, port, loop=loop)

    cmd = encode_command(b'GET', 'key')
    writer.write(cmd)
    
    header = await reader.readexactly(4 + 2 + 1)
    unpacked = struct.unpack(b'<HBI', header)
    code, gb_encoding, resp_size = unpacked
    payload = await reader.readexactly(resp_size)
    print(payload)
					
Simple but no protocol pipelining.

Protocol Pipelining

Most binary protocols support pipelining More info: http://tailhook.github.io/request-pipelining-presentation/ presentation/index.html
Example: Simple pipelined binary protocol implementation

def execute(self):
    cmd = encode_command(b'GET', 'key')
    self.writer.write(cmd)
    fut = asyncio.Future(loop=self._loop)
    self._queue.append(fut)
    return fut

async def reader_task(self):
    while True:
        header = await self.reader.readexactly(4 + 2 + 1)
        unpacked = struct.unpack(b'<HBI', header)
        code, gb_encoding, resp_size = unpacked
        # wait and read payload
        payload = await reader.readexactly(resp_size)
        future = self._queue.pop()
        future.set_result(payload)
					
See aioredis for reference implementation.

Approach #4 Is Sync Python Client available?

In good sync database clients IO decoupled from protocol parsers why not just rewrite IO part?
  1. Locate socket.recv()
  2. Replace with await reader.read()
  3. Make function coroutine with async def
  4. Call this function with await
  5. Call parent functions with await

Approach #5 Is there universal solution to all problems?

Yes. Make every blocking call in separate thread

import asyncio
from pyodbc import connect

loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=4)
async def test_example():
    dsn = 'Driver=SQLite;Database=sqlite.db'
    conn = await loop.run_in_executor(executor, connect, dsn)
    cursor = await loop.run_in_executor(executor, conn.cursor)
    conn = await loop.run_in_executor(executor, cursor.execute,
                                      'SELECT 42;')

loop.run_until_complete(test_example())
					

But how I know which method to call in thread?

For python code

requests.get()
					
For Cython

with nogil:
    [code to be executed with the GIL released]
					

For C extension


Py_BEGIN_ALLOW_THREADS
ret = SQLDriverConnect(hdbc, 0, szConnect, SQL_NTS,
                       0, 0, 0, SQL_DRIVER_NOPROMPT);
Py_END_ALLOW_THREADS

					

What about FileSystem IO?

asyncio does not support asynchronous operations on the filesystem due to OS limitations.

Only good way to use files asynchronously by using thread pools.

aiofiles library workaround


async def go():
    f = await aiofiles.open('filename', mode='r')
    try:
        data = await f.read()
    finally:
        await f.close()
    print(data)
loop.run_until_complete(go())
					
On background aiofiles uses ThreadPoolExecutor for blocking calls.

What about CPU intensive task?


loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=3)

def is_prime(n):
    if n % 2 == 0: return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0: return False
    return True

async def go():
    n = 112272535095293
    result = await loop.run_in_executor(executor, is_prime, n)
loop.run_until_complete(go(loop, executor))

					

Thanks!