Nikolay Novik
PyConUA 2016
SOA implies a lot of network communications.
from django.http import HttpResponse def my_view(request): # blocks thread r = requests.get('http://graph.facebook.com/v2.5/{uid}') data = r.json() # ... return HttpResponse(status=200)
from aiohttp import web async def my_view(request): session = request.app['session'] # context switch here r = await session.get('http://graph.facebook.com/v2.5/{uid}') data = await r.json() return web.Response(status=200)Amazon S3 API could return response in over 9000s! In async case only one response blocked, in sync - entire thread.
Most popular and convenient asyncio mode in the wild.
import asyncio from pyodbc import connect loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=4) async def test_example(): dsn = 'Driver=SQLite;Database=sqlite.db' conn = await loop.run_in_executor(executor, connect, dsn) cursor = await loop.run_in_executor(executor, conn.cursor) conn = await loop.run_in_executor(executor, cursor.execute, 'SELECT 42;') loop.run_until_complete(test_example())Easy to use but a bit strange interface, default executor has 4 worker threads.
loop = asyncio.get_event_loop() executor = ProcessPoolExecutor(max_workers=3) def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True async def go(): result = await loop.run_in_executor( executor, is_prime, 112272535095293) loop.run_until_complete(go(loop, executor))ProcessPoolExecutor has same interface as ThreadPoolExecutor
Application may spin event loop at will, to perform IO heavy computations.
import asyncio, aiohttp async def fetch(session, url, loop): async with session.get(url) as resp: data = await resp.text() def collect_data(url_list): loop = asyncio.get_event_loop() session = aiohttp.ClientSession(loop=loop) coros = [fetch(sessiong, u, loop) for u in url_list] data = loop.run_until_complete(asyncio.gather(*coros, loop=loop)) loop.run_until_complete(session.close()) loop.close() return data def main(): url_list = db.fetch_urls() data = collect_data(url_list) process(data)Scraping or concurrent upload to external server are most popular use cases.
import aioredis from flask import Flask app = Flask(__name__) loop = asyncio.get_event_loop() redis = loop.run_until_complete(aioredis.create_redis( ('localhost', 6379), loop=loop)) @app.route("/") def hello(): value = loop.run_until_complete(redis.get('my-key')) return "Hello {}!".format(value) if __name__ == "__main__": app.run()Using coroutines inside sync code is not always good idea. In this particular case it slows down database access.
Application may delegate IO heavy tasks to dedicated loop in separate thread.
import asyncio, functools from threading import Thread, Event class AioThread(Thread): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.loop = None self.event = Event() def run(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) self.loop.call_soon(self.event.set) self.loop.run_forever() def add_task(self, coro): fut = asyncio.run_coroutine_threadsafe(coro, loop=self.loop) return fut def finalize(self, timeout=None): self._loop.call_soon_threadsafe(self._loop.stop) self.join(timeout=timeout)Make sure you have means to signal that loop has been started, and you can finalize thread properly
def main(): aiothread = AioThread() aiothread.start() aiothread.event.wait() loop = aiothread.loop coro = asyncio.sleep(1, loop=loop) future = aiothread.add_task(coro) try: result = future.result(timeout) except asyncio.TimeoutError: print('The coroutine took too long, cancelling the task') future.cancel() except Exception as exc: print('The coroutine raised an exception: {!r}'.format(exc))
class TwistedConnection(Connection): @classmethod def initialize_reactor(cls): cls._loop = TwistedLoop() def add_connection(self): # ... def client_connection_made(self): # ... def handle_read(self): self.process_io_buffer() def push(self, data): reactor.callFromThread(self.connector.transport.write, data)Cassandra's python driver is sync but connection objects spin event loop, in this case twisted's reactor
queue = janus.Queue(loop=loop) await queue.async_q.get() queue.sync_q.put(i)
import asyncio, janus loop = asyncio.get_event_loop() queue = janus.Queue(loop=loop) def threaded(sync_q): for i in range(100): sync_q.put(i) sync_q.join() async def async_coro(async_q): for i in range(100): val = await async_q.get() async_q.task_done() fut = loop.run_in_executor(None, threaded, queue.sync_q) loop.run_until_complete(async_coro(queue.async_q)) loop.run_until_complete(fut)janus has two APIs for same queue: sync like queue.Queue and async like asyncio.Queue
import asyncio async def go(loop): future = asyncio.Future(loop=loop) future.set_result(None) await asyncio.sleep(3.0, loop=loop) await future print("foo") loop = asyncio.get_event_loop() loop.run_until_complete(go(loop)) loop.close()loop argument is required for most asyncio APIs.
It would really be a pity if Tulip repeated our billion-dollar mistake [global reactor] ;-)
--Glyph Lefkowitz / author of Twisted https://groups.google.com/forum/#!msg/python-tulip/hr1kPZfMX8U/9uqdlbRuRsoJimport asyncio, signal is_working = True async def do_work(loop): while is_working: await asyncio.sleep(1, loop=loop) def signal_handler(loop): loop.remove_signal_handler(signal.SIGTERM) is_working = False loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGTERM, signal_handler, loop) loop.run_until_complete(do_work(loop))asyncio will warn you with bunch of tracebacks if you do not do proper shutdown.
loop = asyncio.get_event_loop() handler = app.make_handler() f = loop.create_server(handler, '0.0.0.0', 8080) srv = loop.run_until_complete(f) def shutdown(loop) loop.remove_signal_handler(signal.SIGTERM) loop.stop() loop.add_signal_handler(signal.SIGTERM, shutdown, loop) loop.run_forever() srv.close() # finish socket listening loop.run_until_complete(srv.wait_closed()) loop.run_until_complete(app.shutdown()) # close websockets loop.run_until_complete(handler.finish_connections(60.0)) loop.run_until_complete(app.cleanup()) # doc registered cleanups loop.close()Now you can be sure that all requests are safe and served and new requests is not accepted
import asyncio import aiohttp async def go(loop): session = aiohttp.ClientSession(loop=loop) async with session.get('http://ua.pycon.org') as resp: data = await resp.text() print(data) session.close() loop = asyncio.get_event_loop() loop.run_until_complete(go(loop))Connection pooling helps to save on expensive connection creation.
async def init(loop): # setup application and extensions app = web.Application(loop=loop) # create connection to the database pg = await init_postgres(conf['postgres'], loop) async def close_pg(app): pg.close() await pg.wait_closed() app.on_cleanup.append(close_pg) # ...aiohttp has handy signal on_cleanup for database connections, as well as on_shutdown for websockets
class Foo: def __init__(self): self._task = asyncio.create_task(self._do_task(), loop=self._loop) async def _do_task(): await self.set('foo', 'bar') await self.set('baz', 'zap') async def _stop_do_task(self): await self._taskMay be very tricky! Same approach as thread finalization should be employed or task.cancel()
@asyncio.coroutine def coro(): raise StopIteration('batman') @asyncio.coroutine def coro2(): i = iter(range(2)) next(i) next(i) next(i) # raise StopIteration return 'finish' @asyncio.coroutine def go(): data = yield from coro() # batman data = yield from coro2() # NoneFixed in python 3.5
@asyncio.coroutine def foo(loop): yield from asyncio.sleep(1, loop=loop) def bar(loop): yield from asyncio.sleep(1, loop=loop) loop.run_until_complete(foo(loop)) loop.run_until_complete(bar(loop))Fixed in python 3.5 with await syntax
async def foo(loop): await update1() await update2() task = loop.ensure_future(foo(loop)) task.cancel()Fixed in python 3.5 with await syntax