On Github eburdaev / eburdaev.github.io
Created by Evgeniy Burdaev
RabbitMQ - это брокер сообщений, используя который сервисы принимают и отдают сообщения.
Producer
Exchange
Queue
Consumer
class Publisher
def self.publish(exchange, message = {})
x = channel.fanout("blog.#{exchange}")
x.publish(message.to_json)
end
def self.channel
@channel ||= connection.create_channel
end
def self.connection
@connection ||= Bunny.new.tap do |c|
c.start
end
end
#ALTERNATE SYNTAX
# def self.client
# @client ||= Bunny.new
# end
# def self.connection
# @connection || = client.start
# end
end
<BLOG>
# blog/app/controllers/posts_controller.rb
class PostsController < ApplicationController
# ...
def create
@post = Post.new(post_params)
if @post.save
# Publish post data
Publisher.publish("posts", @post.attributes)
redirect_to @post, notice: 'Post was successfully created.'
else
render :new
end
end
# ...
end
</>
class RecentPosts
KEY = "recent_posts" # redis key
STORE_LIMIT = 5 # how many posts should be kept
# Get list of recent posts from redis
# Since redis stores data in binary text format
# we need to parse each list item as JSON
def self.list(limit = STORE_LIMIT)
$redis.lrange(KEY, 0, limit-1).map do |raw_post|
JSON.parse(raw_post).with_indifferent_access
end
end
# Push new post to list and trim it's size
# to limit required storage space
# `raw_post` is already a JSON string
# so there is no need to encode it as JSON
def self.push(raw_post)
$redis.lpush(KEY, raw_post)
$redis.ltrim(KEY, 0, STORE_LIMIT-1)
end
end
# dashboard/app/controllers/home_controller.rb
class HomeController < ApplicationController
def index
@posts = RecentPosts.list
end
end
# dashboard/Gemfile gem 'redis-rails' gem 'redis-namespace' gem 'sneakers'
class PostsWorker
include Sneakers::Worker
# This worker will connect to "dashboard.posts" queue
# env is set to nil since by default the actuall queue name would be
# "dashboard.posts_development"
from_queue "dashboard.posts", env: nil
# work method receives message payload in raw format
# in our case it is JSON encoded string
# which we can pass to RecentPosts service without
# changes
def work(raw_post)
RecentPosts.push(raw_post)
ack! # we need to let queue know that message was received
end
end
Запуск воркеров
WORKERS=PostsWorker rake sneakers:run\>
#rake task
namespace :rabbitmq do
desc "Setup routing"
task :setup do
require "bunny"
puts 'rabbit mq subscription'
conn = Bunny.new
conn.start
ch = conn.create_channel
# get or create exchange
x = ch.fanout("blog.posts")
# get or create queue (note the durable setting)
queue = ch.queue("dashboard.posts", durable: true)
# bind queue to exchange
queue.bind("blog.posts")
conn.close
end
end
- Основная статья с блога CODETUNES - Документация к гему bunny, а также гайды