On Github peterpeerdeman / queueing-jobs-with-rabbitmq-and-symfony2
--- - name: rabbitmq | add repository shell: echo 'deb http://www.rabbitmq.com/debian/ testing main' > /etc/apt/sources.list.d/rabbitmq.list creates=/etc/apt/sources.list.d/rabbitmq.list - name: rabbitmq | download key get_url: url=http://www.rabbitmq.com/rabbitmq-signing-key-public.asc dest=/tmp/rabbitmq-signing-key-public.asc - name: rabbitmq | add key sudo: yes command: apt-key add /tmp/rabbitmq-signing-key-public.asc - name: rabbitmq | update apt sudo: yes apt: update_cache=yes - name: rabbitmq | install sudo: yes apt: package=rabbitmq-server state=latest - name: rabbitmq | enable plugins shell: rabbitmq-plugins enable rabbitmq_management - name: rabbitmq | add admin user shell: rabbitmqctl add_user admin password ignore_errors: true - name: rabbitmq | set admin tags shell: rabbitmqctl set_user_tags admin administrator ignore_errors: true - name: rabbitmq | set admin permissions shell: rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" ignore_errors: true - name: rabbitmq | delete guest user shell: rabbitmqctl delete_user guest notify: rabbitmq | restart ignore_errors: true
public function updateFacebookShares() { $lastupdate = $this->findLastUpdateTimestampForType('shares'); $users = $this->em->getRepository('LifelyBundle:User')->findAllEnabledUsers(); foreach($users as $user) { //fetch facebook posts since last update $this->shareupdateservice->updateShares($user, $lastupdate); } }
public function updateFacebookShares() { $lastupdate = $this->findLastUpdateTimestampForType('shares'); $users = $this->em->getRepository('LifelyBundle:User')->findAllEnabledUsers(); foreach($users as $user) { //queue retrieval of facebook posts since last update $this->facebookjobproducer->queueFacebookShares($user, $lastupdate); } }
class FacebookJobProducer { private $sharesproducer; public function __construct(Producer $sharesproducer) { $this->sharesproducer = $sharesproducer; } public function queueFacebookShares($user, $lastupdate) { $message = array( 'user_id' => $user->getId(), 'lastupdate' => $lastupdate ); $this->sharesproducer->publish(serialize($message)); } }
class FacebookJobConsumer { private $em; private $shareupdateservice; public function __construct(EntityManager $entityManager, $shareupdateservice) { $this->em = $entityManager; $this->shareupdateservice = $shareupdateservice; } public function execute(AMQPMessage $msg) { $message = unserialize($msg->body); switch($msg->delivery_info['exchange']) { case 'shares': $this->processShares($message); break; } } private function processShares($message) { $user = $this->em->getRepository('LifelyTFSSiteBundle:User')->findOneById($message['user_id']); $this->shareupdateservice->updateShares($user, $message['lastupdate']); }
old_sound_rabbit_mq: connections: default: host: "%rabbitmq_host%" port: "%rabbitmq_port%" user: "%rabbitmq_user%" password: "%rabbitmq_password%" vhost: "%rabbitmq_vhost%" lazy: true producers: shares: connection: default exchange_options: name: 'shares' type: direct consumers: shares: connection: default exchange_options: { name: 'shares', type: direct } queue_options: { name: 'shares' } callback: tfs.consumer.facebookjob idle_timeout: 5
[{ "memory": 1038768, "message_stats": { "ack": 4816716, "ack_details": { "rate": 0 }, "deliver": 9460951, "deliver_details": { "rate": 0 }, "deliver_get": 9460951, "deliver_get_details": { "rate": 0 }, "publish": 4817957, "publish_details": { "rate": 0 }, "redeliver": 4644385, "redeliver_details": { "rate": 0 } }, "messages": 1391, "messages_details": { "rate": 0 }, "messages_ready": 1391, "messages_ready_details": { "rate": 0 }, "messages_unacknowledged": 0, "messages_unacknowledged_details": { "rate": 0 }, "idle_since": "2014-10-14 23:46:05", "consumer_utilisation": "", "policy": "", "exclusive_consumer_tag": "", "consumers": 0, "backing_queue_status": { "q1": 0, "q2": 0, "delta": ["delta", "undefined", 0, "undefined"], "q3": 0, "q4": 1391, "len": 1391, "pending_acks": 0, "target_ram_count": "infinity", "ram_msg_count": 1391, "ram_ack_count": 0, "next_seq_id": 15680549, "persistent_count": 1391, "avg_ingress_rate": 162.49305731486965, "avg_egress_rate": 8.60980992660947e-21, "avg_ack_ingress_rate": 8.60980992660947e-21, "avg_ack_egress_rate": 1.427091082672728e-13 }, "state": "running", "name": "sharelikes", "vhost": "tfs", "durable": true, "auto_delete": false, "arguments": {}, "node": "rabbit@lfy-production" }, { ... }]