Nikolay Novik
aiomysql, aioodbc, aiogibson
aiohttp_debugtoolbar, aiobotocore,
aiohttp_mako, aiohttp_sse, aiogearman, aiomysql_replicatoin
But what to do when available libraries is not available?
import asyncio
async def go(loop):
future = asyncio.Future(loop=loop)
future.set_result(None)
await asyncio.sleep(3.0, loop=loop)
await future
print("foo")
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
loop.close()
https://groups.google.com/forum/#!msg/python-tulip/hr1kPZfMX8U/9uqdlbRuRsoJIt would really be a pity if Tulip repeated our billion-dollar mistake [global reactor] ;-)
run_in_executor,
create_subprocess_exec, create_task
PYTHONASYNCIODEBUG=1
import asyncio
import time
loop = asyncio.get_event_loop()
loop.slow_callback_duration = 0.01
async def sleeper():
time.sleep(0.1) # we block here
loop.run_until_complete(sleeper())
Executing <Task finished coro=<sleeper() done, defined at
code/debug_example.py:9> result=None created at
/usr/local/lib/python3.5/asyncio/base_events.py:323>
took 0.102 seconds
Use this trick only for debugging and testing purpose, debug mode introduces
huge performance impact
Save development time, but you have no idea what is going on in db client. Your PM will be happy.
@asyncio.coroutine
def get_async(self):
"""Sends the GET request using an asyncio coroutine
....
"""
future = self._client._loop.run_in_executor(None,
self.get)
collection_response = yield from future
return collection_response
Most of the time you want to do HTTP requests using event
loop not thread pool.
aiohttp.ClientSession
is your friend
import asyncio
import aiohttp
async def go(loop):
session = aiohttp.ClientSession(loop=loop)
async with session.get('http://by.pycon.org') as resp:
data = await resp.text()
print(data)
session.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
Connection pooling helps to save on expensive connection
creation.
Example of databases and message queues with binary protocol:
Do not afraid to get your hands dirty.
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, loop):
self.loop, self.message = loop, message
def connection_made(self, transport):
transport.write(self.message.encode())
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
loop = asyncio.get_event_loop()
factory = lambda: EchoClientProtocol(message, loop)
coro = loop.create_connection(factory,'127.0.0.1', 8888)
Too low level, usually you should not use it. Notice all method are not
coroutines.
import asyncio
async def tcp_client(message, loop):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888,
loop=loop)
writer.write(message.encode())
data = await reader.read(100)
print('Received: %r' % data.decode())
writer.close()
message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_client(message, loop))
loop.close()
async def create_connection(address, *, loop=None):
if isinstance(address, (list, tuple)):
host, port = address
reader, writer = yield from asyncio.open_connection(
host, port, loop=loop)
else:
reader, writer = yield from asyncio.open_unix_connection(
address, loop=loop)
conn = RedisConnection(reader, writer, encoding=encoding, loop=loop)
class RedisProtocol(asyncio.Protocol, metaclass=_RedisProtocolMeta):
def connection_made(self, transport):
...
def data_received(self, data):
...
def eof_received(self):
...
def connection_lost(self, exc):
...
import asyncio, struct
from aiogibson import encode_command
async def read_from_connection(host, port, *, loop=None):
reader, writer = await asyncio.open_connection(
host, port, loop=loop)
cmd = encode_command(b'GET', 'key')
writer.write(cmd)
header = await reader.readexactly(4 + 2 + 1)
unpacked = struct.unpack(b'<HBI', header)
code, gb_encoding, resp_size = unpacked
payload = await reader.readexactly(resp_size)
print(payload)
Simple but no protocol pipelining.
def execute(self):
cmd = encode_command(b'GET', 'key')
self._writer.write(cmd)
fut = asyncio.Future(loop=self._loop)
self._queue.append(fut)
return fut
async def reader_task(self):
while not self._reader.at_eof():
header = await self._reader.readexactly(4 + 2 + 1)
unpacked = struct.unpack(b'<HBI', header)
code, gb_encoding, resp_size = unpacked
# wait and read payload
payload = await reader.readexactly(resp_size)
future = self._queue.pop()
future.set_result(payload)
See aioredis
for reference implementation.
class Connection:
def __init__(self, reader, writer, host, port, loop=None):
self._reader, self._writer = reader, writer
self._reader_task = asyncio.Task(self._read_data(), loop=self._loop)
def execute(self, command, *args, data=None, cb=None):
...
return fut
async def reader_task(self):
while not self._reader.at_eof():
...
async def create_connection(host, port, queue=None, loop=None):
reader, writer = await asyncio.open_connection(
host, port, loop=loop)
conn = Connection(reader, writer, host, port, loop=loop)
return conn
self._reader_task = asyncio.create_task(
self._read_data(), loop=self._loop)
self._reader_task.cancel()
try:
await self._reader_task
except asyncio.CanceledError:
pass
Remember clean up after background tasks.
def read(self):
try:
first_packet = self.connection._read_packet()
if first_packet.is_ok_packet():
self._read_ok_packet(first_packet)
elif first_packet.is_load_local_packet():
self._read_load_local_packet(first_packet)
else:
self._read_result_packet(first_packet)
finally:
self.connection = None
@asyncio.coroutine
def read(self):
try:
first_packet = yield from self.connection._read_packet()
if first_packet.is_ok_packet():
self._read_ok_packet(first_packet)
elif first_packet.is_load_local_packet():
yield from self._read_load_local_packet(first_packet)
else:
yield from self._read_result_packet(first_packet)
finally:
self.connection = None
import asyncio
from pyodbc import connect
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=4)
async def test_example():
dsn = 'Driver=SQLite;Database=sqlite.db'
conn = await loop.run_in_executor(executor, connect, dsn)
cursor = await loop.run_in_executor(executor, conn.cursor)
conn = await loop.run_in_executor(executor, cursor.execute,
'SELECT 42;')
loop.run_until_complete(test_example())
pool = await aioodbc.create_pool(dsn=dsn, loop=loop)
async with pool.acquire() as conn:
cur = await conn.cursor()
await cur.execute("SELECT 42;")
r = await cur.fetchall()
print(r)
Ugly internals :)
def _execute(self, func, *args, **kwargs):
func = partial(func, *args, **kwargs)
future = self._loop.run_in_executor(self._executor, func)
return future
async def _connect(self):
f = self._execute(pyodbc.connect, self._dsn,
autocommit=self._autocommit, ansi=self._ansi,
timeout=self._timeout,
**self._kwargs)
self._conn = await f
requests.get()
For Cython
with nogil:
[code to be executed with the GIL released]
For C extension
Py_BEGIN_ALLOW_THREADS
ret = SQLDriverConnect(hdbc, 0, szConnect, SQL_NTS,
0, 0, 0, SQL_DRIVER_NOPROMPT);
Py_END_ALLOW_THREADS
@asyncio.coroutine
def coro():
raise StopIteration('batman')
@asyncio.coroutine
def coro2():
i = iter(range(2))
next(i)
next(i)
next(i) # raise StopIteration
return 'finish'
@asyncio.coroutine
def go():
data = yield from coro() # batman
data = yield from coro2() # None