Wes Mason
"gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libevent event loop."
from gevent.wsgi import WSGIServer
from gevent.socket import tcp_listener
from multiprocessing import Process, cpu_count
from serverdensity.proxy.app import app
def run(app, listener):
http_server = WSGIServer(listener, app)
http_server.serve_forever()
listener = tcp_listener(('127.0.0.1', 8000))
for i in xrange((cpu_count() - 1)):
Process(
target=run,
args=(app, listener)
).start()
run(app, listener)
from gevent import spawn, joinall, sleep
def g():
print('foo')
sleep(0)
print('bar')
joinall([spawn(g), spawn(g)])
foo foo bar bar
# -> HTTP request
mongo.payloads.insert(payloads)
tasks.process_payload.apply_async(payload_id)
# -> celery task (inside a greenlet)
@task
def process_payload(payload_id):
payload = mongo.payloads.findOne({...})
process = PayloadProcessor(payload)
...
return process_alerts.apply_async(payload_id)
# inside PayloadProcessor
# (still within a greenlet, controlled by Celery
def send_metrics(self):
payload = copy(self.payload)
# remove some fields we don't want to send,
# turn some to strings
gevent.spawn(self._handle_metrics_request, [payload])
def _handle_metrics_request(self, payload):
resp = requests.post(METRICS_URL,
data=json.dumps(payload))
if resp.status_code == 200:
logging.info(...)
return True
else:
logging.error(...)
return False
def update_check(self, response):
# update collections in Mongo from response
...
def update_metrics(self):
# send a request to the metrics service
...
def update_inventory(self):
# send a request to the inventory service
...
# run from a celery task
def health_check(self):
# get HTTP response from actors
...
update_greenlet = spawn(self.update_check, [response])
update_greenlet.link(spawn(self.update_metrics))
update_greenlet.link(spawn(self.update_inventory))