Wes Mason
"gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libevent event loop."
from gevent.wsgi import WSGIServer from gevent.socket import tcp_listener from multiprocessing import Process, cpu_count from serverdensity.proxy.app import app def run(app, listener): http_server = WSGIServer(listener, app) http_server.serve_forever()
listener = tcp_listener(('127.0.0.1', 8000)) for i in xrange((cpu_count() - 1)): Process( target=run, args=(app, listener) ).start() run(app, listener)
from gevent import spawn, joinall, sleep def g(): print('foo') sleep(0) print('bar') joinall([spawn(g), spawn(g)])
foo foo bar bar
# -> HTTP request mongo.payloads.insert(payloads) tasks.process_payload.apply_async(payload_id) # -> celery task (inside a greenlet) @task def process_payload(payload_id): payload = mongo.payloads.findOne({...}) process = PayloadProcessor(payload) ... return process_alerts.apply_async(payload_id)
# inside PayloadProcessor # (still within a greenlet, controlled by Celery def send_metrics(self): payload = copy(self.payload) # remove some fields we don't want to send, # turn some to strings gevent.spawn(self._handle_metrics_request, [payload])
def _handle_metrics_request(self, payload): resp = requests.post(METRICS_URL, data=json.dumps(payload)) if resp.status_code == 200: logging.info(...) return True else: logging.error(...) return False
def update_check(self, response): # update collections in Mongo from response ... def update_metrics(self): # send a request to the metrics service ... def update_inventory(self): # send a request to the inventory service ...
# run from a celery task def health_check(self): # get HTTP response from actors ... update_greenlet = spawn(self.update_check, [response]) update_greenlet.link(spawn(self.update_metrics)) update_greenlet.link(spawn(self.update_inventory))