– Rules of Async Club



– Rules of Async Club

0 1


kyivpy16


On Github jettify / kyivpy16

How to deal with blocking code within asyncio event loop

Nikolay Novik

http://github.com/jettify

I am ...

  • Software Engineer: at DataRobot Ukraine
  • Github: http://github.com/jettify
  • My Projects:
    • database clients:
      • aiomysql, aioobc, aiogibson
    • web and etc:
      • aiohttp_debugtoolbar, aiobotocore, aiohttp_mako, aiohttp_sse, aiogearman, aiomysql_replicatoin

Poll

You and asyncio:
I am using asyncio extensively I am using Twisted, Tornado, gevent etc. extensively I think async programming is kinda cool

Asyncio

  • The asyncio project was officially launched with the release of Python 3.4 in March 2014.
  • Bare: almost no library
  • One year later, asyncio has a strong community writing libraries on top of it.

But what to do when available libraries work in sync way, potentially blocking event loop?

Rules of Async Club

Rule #1

You do not block event loop

Rule #2

You never block event loop

Blocking calls in third party libraries

  • Network IO
    • API wrappers
    • Database clients
    • Message queues
  • FileSystem IO
  • CPU

Debugging blocking calls tip

Set environment variable PYTHONASYNCIODEBUG=1
import asyncio
import time

loop = asyncio.get_event_loop()
loop.slow_callback_duration = 0.01

async def sleeper():
    time.sleep(0.1)  # we block here

loop.run_until_complete(sleeper())
					

Executing <Task finished coro=<sleeper() done, defined at
code/debug_example.py:9> result=None created at
/usr/local/lib/python3.5/asyncio/base_events.py:323>
took 0.102 seconds
					

Approach #1 Is there any suitable library?

Search asyncio compatible library on:

google ~ 98k results pypi ~200 packages asyncio wiki page: https://github.com/python/asyncio/wiki/ThirdParty aio-libs: https://github.com/aio-libs
Third Party Libraries Pro Tip
Read the (f*g) source code of your libraries! Example of python code from OneDrive SDK
@asyncio.coroutine
def get_async(self):
    """Sends the GET request using an asyncio coroutine
    .... 
    """
    future = self._client._loop.run_in_executor(None,
                                                self.get)
    collection_response = yield from future
    return collection_response
					
Most of the time you want to do HTTP requests using event loop not thread pool.

Approach #2 Is REST API available?

Most hipsters databases use REST API as primary access method:

  • DynamoDB
  • Neo4j
  • Elasticsearch
  • HBase
  • HDFS
  • CouchDB
  • Riak
  • VoltDB
  • InfluxDB
  • ArangoDB

Easy to implement required subset of APIs.
REST Client Tip
aiohttp.ClientSession is your friend
import asyncio
import aiohttp

# carry the loop Luke!
loop = asyncio.get_event_loop()

async def go():
    session = aiohttp.ClientSession(loop=loop)
    async with session.get('http://python.org') as resp:
        data = await resp.text()
        print(data)
    session.close()

loop.run_until_complete(go())
					
Connection pooling helps to save on expensive connection creation. (PS: checkout new aiohttp 0.18.x release)

Approach #3 Is there simple text or binary protocol?

Example of databases and message queues with binary protocol:

  • redis
  • memcached
  • couchebase
  • grearman
  • beanstalkd
  • disque

Do not afraid to get your hands dirty.

Example: Simple binary protocol implementation
import asyncio, struct
from aiogibson import encode_command

async def read_from_connection(host, port, *, loop=None):
    reader, writer = await asyncio.open_connection(
    host, port, loop=loop)

    cmd = encode_command(b'GET', 'key')
    writer.write(cmd)
    
    header = await reader.readexactly(4 + 2 + 1)
    unpacked = struct.unpack(b'<HBI', header)
    code, gb_encoding, resp_size = unpacked
    payload = await reader.readexactly(resp_size)
    print(payload)
					
Simple but no protocol pipelining.

Protocol Pipelining

Most binary protocols support pipelining More info: http://tailhook.github.io/request-pipelining-presentation/ presentation/index.html
Example: Simple pipelined binary protocol implementation
def execute(self):
    cmd = encode_command(b'GET', 'key')
    self.writer.write(cmd)
    fut = asyncio.Future(loop=self._loop)
    self._queue.append(fut)
    return fut

async def reader_task(self):
    while True:
        header = await self.reader.readexactly(4 + 2 + 1)
        unpacked = struct.unpack(b'<HBI', header)
        code, gb_encoding, resp_size = unpacked
        # wait and read payload
        payload = await reader.readexactly(resp_size)
        future = self._queue.pop()
        future.set_result(payload)
					
See aioredis for reference implementation.

Approach #4 Is Sync Python Client available?

In good sync database clients IO decoupled from protocol parsers why not just rewrite IO part? Locate socket.recv() Replace with await reader.read() Make function coroutine with async def Call this function with await Call parent functions with await

Approach #5 Is there universal solution to all problems?

Yes. Make every blocking call in separate thread
import asyncio
from pyodbc import connect

loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=4)
async def test_example():
    dsn = 'Driver=SQLite;Database=sqlite.db'
    conn = await loop.run_in_executor(executor, connect, dsn)
    cursor = await loop.run_in_executor(executor, conn.cursor)
    conn = await loop.run_in_executor(executor, cursor.execute,
                                      'SELECT 42;')

loop.run_until_complete(test_example())
					

But how I know which method to call in thread?

For python code
requests.get()
					
For Cython
with nogil:
    [code to be executed with the GIL released]
					

For C extension

Py_BEGIN_ALLOW_THREADS
ret = SQLDriverConnect(hdbc, 0, szConnect, SQL_NTS,
                       0, 0, 0, SQL_DRIVER_NOPROMPT);
Py_END_ALLOW_THREADS

					

What about FileSystem IO?

asyncio does not support asynchronous operations on the filesystem due to OS limitations.

Only good way to use files asynchronously by using thread pools.

aiofiles library workaround

async def go():
    f = await aiofiles.open('filename', mode='r')
    try:
        data = await f.read()
    finally:
        await f.close()
    print(data)
loop.run_until_complete(go())
					
On background aiofiles uses ThreadPoolExecutor for blocking calls.

What about CPU intensive task?

loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=3)

def is_prime(n):
    if n % 2 == 0: return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0: return False
    return True

async def go():
    n = 112272535095293
    result = await loop.run_in_executor(executor, is_prime, n)
loop.run_until_complete(go(loop, executor))

					

Thanks!

How to deal with blocking code within asyncio event loop Nikolay Novik http://github.com/jettify