Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion action_subscriber.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "bundler", ">= 1.6"
spec.add_development_dependency "pry-coolline"
spec.add_development_dependency "pry-nav"
spec.add_development_dependency "rabbitmq_http_api_client", "~> 1.2.0"
spec.add_development_dependency "rabbitmq_http_api_client", "~> 1.9.0"
spec.add_development_dependency "rspec", "~> 3.0"
spec.add_development_dependency "rake"
end
64 changes: 39 additions & 25 deletions lib/action_subscriber/bunny/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ def cancel_consumers!
end
end

def set_bunny_consumers(consumers)
@bunny_consumers = consumers
end

def setup_subscriptions!
fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty?
routes.each do |route|
Expand All @@ -26,32 +30,42 @@ def setup_subscriptions!

def start_subscribers!
subscriptions.each do |subscription|
route = subscription[:route]
queue = subscription[:queue]
channel = queue.channel
threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name)
channel.prefetch(route.prefetch) if route.acknowledgements?
consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?)
consumer.on_delivery do |delivery_info, properties, encoded_payload|
::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
properties = {
:action => route.action,
:channel => queue.channel,
:content_type => properties.content_type,
:delivery_tag => delivery_info.delivery_tag,
:exchange => delivery_info.exchange,
:headers => properties.headers,
:message_id => properties.message_id,
:routing_key => delivery_info.routing_key,
:queue => queue.name,
:uses_acknowledgements => route.acknowledgements?,
}
env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
run_env(env, threadpool)
end
bunny_consumers << consumer
queue.subscribe_with(consumer)
start_subscription!(subscription)
end
end

def start_subscription!(subscription)
route = subscription[:route]
queue = subscription[:queue]
channel = queue.channel
threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name)
channel.prefetch(route.prefetch) if route.acknowledgements?
consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?)
consumer.on_cancellation do |_basic_cancel|
set_bunny_consumers(bunny_consumers.reject { |bunny_consumer| bunny_consumer == consumer })
subscription[:queue] = setup_queue(route)
start_subscription!(subscription)
end

consumer.on_delivery do |delivery_info, properties, encoded_payload|
::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
properties = {
:action => route.action,
:channel => queue.channel,
:content_type => properties.content_type,
:delivery_tag => delivery_info.delivery_tag,
:exchange => delivery_info.exchange,
:headers => properties.headers,
:message_id => properties.message_id,
:routing_key => delivery_info.routing_key,
:queue => queue.name,
:uses_acknowledgements => route.acknowledgements?,
}
env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
run_env(env, threadpool)
end
bunny_consumers << consumer
queue.subscribe_with(consumer)
end

private
Expand Down
63 changes: 41 additions & 22 deletions lib/action_subscriber/march_hare/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ def cancel_consumers!
def march_hare_consumers
@march_hare_consumers ||= []
end

def set_march_hare_consumers(consumers)
@march_hare_consumers = consumers
end

def setup_subscriptions!
fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty?
Expand All @@ -25,31 +29,46 @@ def setup_subscriptions!
end

def start_subscribers!
route_set = self
subscriptions.each do |subscription|
route = subscription[:route]
queue = subscription[:queue]
queue.channel.prefetch = route.prefetch if route.acknowledgements?
threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name)
consumer = queue.subscribe(route.queue_subscription_options) do |metadata, encoded_payload|
::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
properties = {
:action => route.action,
:channel => queue.channel,
:content_type => metadata.content_type,
:delivery_tag => metadata.delivery_tag,
:exchange => metadata.exchange,
:headers => _normalized_headers(metadata),
:message_id => metadata.message_id,
:routing_key => metadata.routing_key,
:queue => queue.name,
:uses_acknowledgements => route.acknowledgements?,
}
env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
run_env(env, threadpool)
end
start_subscription! subscription
end
end

def start_subscription!(subscription)
route = subscription[:route]
queue = subscription[:queue]
queue.channel.prefetch = route.prefetch if route.acknowledgements?
threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name)

cancel = {
:on_cancellation => Proc.new { |channel, consumer|
channel.close
set_march_hare_consumers(march_hare_consumers.reject { |march_hare_consumer| march_hare_consumer == consumer })

march_hare_consumers << consumer
subscription[:queue] = setup_queue(route)
start_subscription!(subscription)
}
}
consumer = queue.subscribe(cancel.merge(route.queue_subscription_options)) do |metadata, encoded_payload|
::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name
properties = {
:action => route.action,
:channel => queue.channel,
:content_type => metadata.content_type,
:delivery_tag => metadata.delivery_tag,
:exchange => metadata.exchange,
:headers => _normalized_headers(metadata),
:message_id => metadata.message_id,
:routing_key => metadata.routing_key,
:queue => queue.name,
:uses_acknowledgements => route.acknowledgements?,
}
env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
run_env(env, threadpool)
end

march_hare_consumers << consumer
end

private
Expand Down
34 changes: 34 additions & 0 deletions spec/integration/consumer_cancel_notify_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
require "rabbitmq/http/client"

class ZombieSubscriber < ActionSubscriber::Base
def groan
$messages << payload
end
end

describe "Rebuilds subscription after receiving consumer_cancel_notify", :integration => true, :slow => true do
let(:draw_routes) do
::ActionSubscriber.draw_routes do
default_routes_for ZombieSubscriber
end
end
let(:http_client) { RabbitMQ::HTTP::Client.new("http://127.0.0.1:15672") }
let(:subscriber) { ZombieSubscriber }

it "continues to receive messages following consumer_cancel_notify message" do
::ActionSubscriber::start_subscribers!
::ActivePublisher.publish("zombie.groan", "uuunngg", "events")

delete_queue!
sleep 5.0

::ActivePublisher.publish("zombie.groan", "meeuuhhh", "events")
verify_expectation_within(5.0) do
expect($messages).to eq(Set.new(["uuunngg", "meeuuhhh"]))
end
end

def delete_queue!
http_client.delete_queue("/","alice.zombie.groan")
end
end