Nikolay Novik
But what to do when available libraries is not available?
import asyncio async def go(loop): future = asyncio.Future(loop=loop) future.set_result(None) await asyncio.sleep(3.0, loop=loop) await future print("foo") loop = asyncio.get_event_loop() loop.run_until_complete(go(loop)) loop.close()
It would really be a pity if Tulip repeated our billion-dollar mistake [global reactor] ;-)
--Glyph Lefkowitz / author of Twisted https://groups.google.com/forum/#!msg/python-tulip/hr1kPZfMX8U/9uqdlbRuRsoJimport asyncio import time loop = asyncio.get_event_loop() loop.slow_callback_duration = 0.01 async def sleeper(): time.sleep(0.1) # we block here loop.run_until_complete(sleeper())
Executing <Task finished coro=<sleeper() done, defined at code/debug_example.py:9> result=None created at /usr/local/lib/python3.5/asyncio/base_events.py:323> took 0.102 secondsUse this trick only for debugging and testing purpose, debug mode introduces huge performance impact
Save development time, but you have no idea what is going on in db client. Your PM will be happy.
@asyncio.coroutine def get_async(self): """Sends the GET request using an asyncio coroutine .... """ future = self._client._loop.run_in_executor(None, self.get) collection_response = yield from future return collection_responseMost of the time you want to do HTTP requests using event loop not thread pool.
import asyncio import aiohttp async def go(loop): session = aiohttp.ClientSession(loop=loop) async with session.get('http://by.pycon.org') as resp: data = await resp.text() print(data) session.close() loop = asyncio.get_event_loop() loop.run_until_complete(go(loop))Connection pooling helps to save on expensive connection creation.
Example of databases and message queues with binary protocol:
Do not afraid to get your hands dirty.
class EchoClientProtocol(asyncio.Protocol): def __init__(self, message, loop): self.loop, self.message = loop, message def connection_made(self, transport): transport.write(self.message.encode()) def data_received(self, data): print('Data received: {!r}'.format(data.decode())) def connection_lost(self, exc): print('The server closed the connection') loop = asyncio.get_event_loop() factory = lambda: EchoClientProtocol(message, loop) coro = loop.create_connection(factory,'127.0.0.1', 8888)Too low level, usually you should not use it. Notice all method are not coroutines.
import asyncio async def tcp_client(message, loop): reader, writer = await asyncio.open_connection('127.0.0.1', 8888, loop=loop) writer.write(message.encode()) data = await reader.read(100) print('Received: %r' % data.decode()) writer.close() message = 'Hello World!' loop = asyncio.get_event_loop() loop.run_until_complete(tcp_client(message, loop)) loop.close()
async def create_connection(address, *, loop=None): if isinstance(address, (list, tuple)): host, port = address reader, writer = yield from asyncio.open_connection( host, port, loop=loop) else: reader, writer = yield from asyncio.open_unix_connection( address, loop=loop) conn = RedisConnection(reader, writer, encoding=encoding, loop=loop)
class RedisProtocol(asyncio.Protocol, metaclass=_RedisProtocolMeta): def connection_made(self, transport): ... def data_received(self, data): ... def eof_received(self): ... def connection_lost(self, exc): ...
import asyncio, struct from aiogibson import encode_command async def read_from_connection(host, port, *, loop=None): reader, writer = await asyncio.open_connection( host, port, loop=loop) cmd = encode_command(b'GET', 'key') writer.write(cmd) header = await reader.readexactly(4 + 2 + 1) unpacked = struct.unpack(b'<HBI', header) code, gb_encoding, resp_size = unpacked payload = await reader.readexactly(resp_size) print(payload)Simple but no protocol pipelining.
def execute(self): cmd = encode_command(b'GET', 'key') self._writer.write(cmd) fut = asyncio.Future(loop=self._loop) self._queue.append(fut) return fut async def reader_task(self): while not self._reader.at_eof(): header = await self._reader.readexactly(4 + 2 + 1) unpacked = struct.unpack(b'<HBI', header) code, gb_encoding, resp_size = unpacked # wait and read payload payload = await reader.readexactly(resp_size) future = self._queue.pop() future.set_result(payload)See aioredis for reference implementation.
class Connection: def __init__(self, reader, writer, host, port, loop=None): self._reader, self._writer = reader, writer self._reader_task = asyncio.Task(self._read_data(), loop=self._loop) def execute(self, command, *args, data=None, cb=None): ... return fut async def reader_task(self): while not self._reader.at_eof(): ... async def create_connection(host, port, queue=None, loop=None): reader, writer = await asyncio.open_connection( host, port, loop=loop) conn = Connection(reader, writer, host, port, loop=loop) return conn
self._reader_task = asyncio.create_task( self._read_data(), loop=self._loop) self._reader_task.cancel() try: await self._reader_task except asyncio.CanceledError: passRemember clean up after background tasks.
def read(self): try: first_packet = self.connection._read_packet() if first_packet.is_ok_packet(): self._read_ok_packet(first_packet) elif first_packet.is_load_local_packet(): self._read_load_local_packet(first_packet) else: self._read_result_packet(first_packet) finally: self.connection = None
@asyncio.coroutine def read(self): try: first_packet = yield from self.connection._read_packet() if first_packet.is_ok_packet(): self._read_ok_packet(first_packet) elif first_packet.is_load_local_packet(): yield from self._read_load_local_packet(first_packet) else: yield from self._read_result_packet(first_packet) finally: self.connection = None
import asyncio from pyodbc import connect loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=4) async def test_example(): dsn = 'Driver=SQLite;Database=sqlite.db' conn = await loop.run_in_executor(executor, connect, dsn) cursor = await loop.run_in_executor(executor, conn.cursor) conn = await loop.run_in_executor(executor, cursor.execute, 'SELECT 42;') loop.run_until_complete(test_example())
pool = await aioodbc.create_pool(dsn=dsn, loop=loop) async with pool.acquire() as conn: cur = await conn.cursor() await cur.execute("SELECT 42;") r = await cur.fetchall() print(r)Ugly internals :)
def _execute(self, func, *args, **kwargs): func = partial(func, *args, **kwargs) future = self._loop.run_in_executor(self._executor, func) return future async def _connect(self): f = self._execute(pyodbc.connect, self._dsn, autocommit=self._autocommit, ansi=self._ansi, timeout=self._timeout, **self._kwargs) self._conn = await f
requests.get()For Cython
with nogil: [code to be executed with the GIL released]
For C extension
Py_BEGIN_ALLOW_THREADS ret = SQLDriverConnect(hdbc, 0, szConnect, SQL_NTS, 0, 0, 0, SQL_DRIVER_NOPROMPT); Py_END_ALLOW_THREADS
@asyncio.coroutine def coro(): raise StopIteration('batman') @asyncio.coroutine def coro2(): i = iter(range(2)) next(i) next(i) next(i) # raise StopIteration return 'finish' @asyncio.coroutine def go(): data = yield from coro() # batman data = yield from coro2() # None