0 0


pyconby2016


On Github jettify / pyconby2016

Tips and Tricks Writing Database Clients With Asyncio

Nikolay Novik

http://github.com/jettify

I am ...

  • Software Engineer: at DataRobot Ukraine
  • Github: https://github.com/jettify
  • Twitter: @isinf
  • My Projects:
    • database clients: aiomysql, aioodbc, aiogibson
    • web and etc: aiohttp_debugtoolbar, aiobotocore, aiohttp_mako, aiohttp_sse, aiogearman, aiomysql_replicatoin
Poll: you and asyncio
I am using asyncio/aiohttp extensively I am using Twisted, Tornado, gevent etc. extensively I think async programming is kinda cool

Asyncio

  • The asyncio project was officially launched with the release of Python 3.4 in March 2014.
  • Bare: almost no library
  • One year later, asyncio has a strong community writing libraries on top of it.

But what to do when available libraries is not available?

asyncio Internet impact?

Search asyncio compatible library on:

Before we start writing any asyncio code lets talk about explicit event loop

import asyncio
async def go(loop):
    future = asyncio.Future(loop=loop)
    future.set_result(None)
    await asyncio.sleep(3.0, loop=loop)
    await future
    print("foo")

loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
loop.close()
					

Glyph on global event loop

It would really be a pity if Tulip repeated our billion-dollar mistake [global reactor] ;-)

--Glyph Lefkowitz / author of Twisted https://groups.google.com/forum/#!msg/python-tulip/hr1kPZfMX8U/9uqdlbRuRsoJ

Convenience of explicit loop

  • Increase testability (Hello Twisted!)
  • Fast access to bunch of useful methods: run_in_executor, create_subprocess_exec, create_task
  • Easier to reason code when you have nonstandard case like: two threads two event loops, or main thread is sync second is async

Debugging blocking calls tip

Set env variable PYTHONASYNCIODEBUG=1
import asyncio
import time

loop = asyncio.get_event_loop()
loop.slow_callback_duration = 0.01

async def sleeper():
    time.sleep(0.1)  # we block here

loop.run_until_complete(sleeper())
					

Executing <Task finished coro=<sleeper() done, defined at
code/debug_example.py:9> result=None created at
/usr/local/lib/python3.5/asyncio/base_events.py:323>
took 0.102 seconds

					
Use this trick only for debugging and testing purpose, debug mode introduces huge performance impact
Approach #1 So here super secret algorithm to write db client
Search db client on github.com/aio-libs Make commit ++5 --1000 ???? Profit!

Save development time, but you have no idea what is going on in db client. Your PM will be happy.

Third party libary pro tip!

Read the (f*g) source code of your libraries! Example of python code from OneDrive SDK
@asyncio.coroutine
def get_async(self):
    """Sends the GET request using an asyncio coroutine
    ....
    """
    future = self._client._loop.run_in_executor(None,
                                                self.get)
    collection_response = yield from future
    return collection_response
					
Most of the time you want to do HTTP requests using event loop not thread pool.
Approach #2 Database supports rest api
Most hipster's databases use REST API as primary access method:

  • DynamoDB
  • Neo4j
  • Elasticsearch
  • HBase
  • HDFS
  • CouchDB
  • Riak
  • VoltDB
  • InfluxDB
  • ArangoDB

Easy to implement required subset of APIs.
REST Client Tip
aiohttp.ClientSession is your friend
import asyncio
import aiohttp


async def go(loop):
    session = aiohttp.ClientSession(loop=loop)
    async with session.get('http://by.pycon.org') as resp:
        data = await resp.text()
        print(data)
    session.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
					
Connection pooling helps to save on expensive connection creation.

Approach #3 Is there simple text or binary protocol?

Example of databases and message queues with binary protocol:

  • redis
  • memcached
  • couchebase
  • grearman
  • beanstalkd
  • disque

Do not afraid to get your hands dirty.

Simple TCP client using low level API

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.loop, self.message = loop, message

    def connection_made(self, transport):
        transport.write(self.message.encode())

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')

loop = asyncio.get_event_loop()
factory = lambda: EchoClientProtocol(message, loop)
coro = loop.create_connection(factory,'127.0.0.1', 8888)
					
Too low level, usually you should not use it. Notice all method are not coroutines.

Simple TCP connection using Streams

import asyncio

async def tcp_client(message, loop):
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888,
                                                   loop=loop)
    writer.write(message.encode())
    data = await reader.read(100)
    print('Received: %r' % data.decode())
    writer.close()

message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_client(message, loop))
loop.close()
					

aioredis vs asyncio_redis (Stremas vs Protocols)

async def create_connection(address, *, loop=None):
    if isinstance(address, (list, tuple)):
        host, port = address
        reader, writer = yield from asyncio.open_connection(
            host, port, loop=loop)
    else:
        reader, writer = yield from asyncio.open_unix_connection(
            address, loop=loop)
    conn = RedisConnection(reader, writer, encoding=encoding, loop=loop)
					
class RedisProtocol(asyncio.Protocol, metaclass=_RedisProtocolMeta):
    def connection_made(self, transport):
        ...
    def data_received(self, data):
        ...
    def eof_received(self):
        ...
    def connection_lost(self, exc):
        ...

					
Example: Simple binary protocol implementation
import asyncio, struct
from aiogibson import encode_command

async def read_from_connection(host, port, *, loop=None):
    reader, writer = await asyncio.open_connection(
    host, port, loop=loop)

    cmd = encode_command(b'GET', 'key')
    writer.write(cmd)

    header = await reader.readexactly(4 + 2 + 1)
    unpacked = struct.unpack(b'<HBI', header)
    code, gb_encoding, resp_size = unpacked
    payload = await reader.readexactly(resp_size)
    print(payload)
					
Simple but no protocol pipelining.

Protocol Pipelining

Most binary protocols support pipelining More info: http://tailhook.github.io/request-pipelining-presentation/ presentation/index.html
Example: Simple pipelined binary protocol implementation
def execute(self):
    cmd = encode_command(b'GET', 'key')
    self._writer.write(cmd)
    fut = asyncio.Future(loop=self._loop)
    self._queue.append(fut)
    return fut

async def reader_task(self):
    while not self._reader.at_eof():
        header = await self._reader.readexactly(4 + 2 + 1)
        unpacked = struct.unpack(b'<HBI', header)
        code, gb_encoding, resp_size = unpacked
        # wait and read payload
        payload = await reader.readexactly(resp_size)
        future = self._queue.pop()
        future.set_result(payload)
					
See aioredis for reference implementation.
Use Factories For Connection Objects
class Connection:
    def __init__(self, reader, writer, host, port, loop=None):
        self._reader, self._writer = reader, writer
        self._reader_task = asyncio.Task(self._read_data(), loop=self._loop)
    def execute(self, command, *args, data=None, cb=None):
        ...
        return fut
   async def reader_task(self):
        while not self._reader.at_eof():
            ...
async def create_connection(host, port, queue=None, loop=None):
    reader, writer = await asyncio.open_connection(
        host, port, loop=loop)
    conn = Connection(reader, writer, host, port, loop=loop)
    return conn
					

How to Stop Background Task?

self._reader_task = asyncio.create_task(
    self._read_data(), loop=self._loop)
self._reader_task.cancel()

try:
    await self._reader_task
except asyncio.CanceledError:
    pass
					
Remember clean up after background tasks.

Approach #4 Is Sync Python Client available?

In good sync database clients IO decoupled from protocol parsers why not just rewrite IO part? Locate socket.recv() Replace with await reader.read() Make function coroutine with async def Call this function with await Call parent functions with await

aiomysql vs PyMySQL

def read(self):
    try:
        first_packet = self.connection._read_packet()
        if first_packet.is_ok_packet():
            self._read_ok_packet(first_packet)
        elif first_packet.is_load_local_packet():
            self._read_load_local_packet(first_packet)
        else:
            self._read_result_packet(first_packet)
    finally:
        self.connection = None
                    
@asyncio.coroutine
def read(self):
    try:
        first_packet = yield from self.connection._read_packet()
        if first_packet.is_ok_packet():
            self._read_ok_packet(first_packet)
        elif first_packet.is_load_local_packet():
            yield from self._read_load_local_packet(first_packet)
        else:
            yield from self._read_result_packet(first_packet)
    finally:
        self.connection = None
                    

Approach #5 Is there universal solution to all problems?

Yes. Make every blocking call in separate thread
import asyncio
from pyodbc import connect

loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=4)
async def test_example():
    dsn = 'Driver=SQLite;Database=sqlite.db'
    conn = await loop.run_in_executor(executor, connect, dsn)
    cursor = await loop.run_in_executor(executor, conn.cursor)
    conn = await loop.run_in_executor(executor, cursor.execute,
                                      'SELECT 42;')

loop.run_until_complete(test_example())
					

What a second someone said threads?

  • GIL is not a issue of IO bound tasks
  • Thread context switching is not slow
  • Thread-per-connection does scale in our case, we have limited db resource
  • Threads does not take up too much memory, if you have few of them
http://techspot.zzzeek.org/2015/02/15/asynchronous-python-and-databases/

aioodbc example

Nice public (PEP-0279) like API
pool = await aioodbc.create_pool(dsn=dsn, loop=loop)

async with pool.acquire() as conn:
    cur = await conn.cursor()
    await cur.execute("SELECT 42;")
    r = await cur.fetchall()
    print(r)
					
Ugly internals :)
    def _execute(self, func, *args, **kwargs):
        func = partial(func, *args, **kwargs)
        future = self._loop.run_in_executor(self._executor, func)
        return future

    async def _connect(self):
        f = self._execute(pyodbc.connect, self._dsn,
                          autocommit=self._autocommit, ansi=self._ansi,
                          timeout=self._timeout,
                          **self._kwargs)
        self._conn = await f
					

But how I know which method to call in thread?

For python code
requests.get()
					
For Cython
with nogil:
    [code to be executed with the GIL released]
					

For C extension

Py_BEGIN_ALLOW_THREADS
ret = SQLDriverConnect(hdbc, 0, szConnect, SQL_NTS,
                       0, 0, 0, SQL_DRIVER_NOPROMPT);
Py_END_ALLOW_THREADS

					

Keep an eye on StopIteration in python 3.4

@asyncio.coroutine
def coro():
    raise StopIteration('batman')

@asyncio.coroutine
def coro2():
    i = iter(range(2))
    next(i)
    next(i)
    next(i)  # raise StopIteration
    return 'finish'

@asyncio.coroutine
def go():
    data = yield from coro()  # batman
    data = yield from coro2() # None
					

Recap

  • asyncio is fun
  • Implementing database clients is easy
  • Write code with asyncio is harder but result more correct
  • Finalizers more important then in sync code
  • Check out github.com/aio-libs

THE END

http://jettify.github.io/pyconby2016

Tips and Tricks Writing Database Clients With Asyncio Nikolay Novik http://github.com/jettify