diff --git a/action_subscriber.gemspec b/action_subscriber.gemspec index 8740bcb..c534f9d 100644 --- a/action_subscriber.gemspec +++ b/action_subscriber.gemspec @@ -35,7 +35,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "bundler", ">= 1.6" spec.add_development_dependency "pry-coolline" spec.add_development_dependency "pry-nav" - spec.add_development_dependency "rabbitmq_http_api_client", "~> 1.2.0" + spec.add_development_dependency "rabbitmq_http_api_client", "~> 1.9.0" spec.add_development_dependency "rspec", "~> 3.0" spec.add_development_dependency "rake" end diff --git a/lib/action_subscriber/bunny/subscriber.rb b/lib/action_subscriber/bunny/subscriber.rb index 70bfc81..493d9a1 100644 --- a/lib/action_subscriber/bunny/subscriber.rb +++ b/lib/action_subscriber/bunny/subscriber.rb @@ -14,6 +14,10 @@ def cancel_consumers! end end + def set_bunny_consumers(consumers) + @bunny_consumers = consumers + end + def setup_subscriptions! fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty? routes.each do |route| @@ -26,32 +30,42 @@ def setup_subscriptions! def start_subscribers! subscriptions.each do |subscription| - route = subscription[:route] - queue = subscription[:queue] - channel = queue.channel - threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name) - channel.prefetch(route.prefetch) if route.acknowledgements? - consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?) - consumer.on_delivery do |delivery_info, properties, encoded_payload| - ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name - properties = { - :action => route.action, - :channel => queue.channel, - :content_type => properties.content_type, - :delivery_tag => delivery_info.delivery_tag, - :exchange => delivery_info.exchange, - :headers => properties.headers, - :message_id => properties.message_id, - :routing_key => delivery_info.routing_key, - :queue => queue.name, - :uses_acknowledgements => route.acknowledgements?, - } - env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) - run_env(env, threadpool) - end - bunny_consumers << consumer - queue.subscribe_with(consumer) + start_subscription!(subscription) + end + end + + def start_subscription!(subscription) + route = subscription[:route] + queue = subscription[:queue] + channel = queue.channel + threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name) + channel.prefetch(route.prefetch) if route.acknowledgements? + consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?) + consumer.on_cancellation do |_basic_cancel| + set_bunny_consumers(bunny_consumers.reject { |bunny_consumer| bunny_consumer == consumer }) + subscription[:queue] = setup_queue(route) + start_subscription!(subscription) + end + + consumer.on_delivery do |delivery_info, properties, encoded_payload| + ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name + properties = { + :action => route.action, + :channel => queue.channel, + :content_type => properties.content_type, + :delivery_tag => delivery_info.delivery_tag, + :exchange => delivery_info.exchange, + :headers => properties.headers, + :message_id => properties.message_id, + :routing_key => delivery_info.routing_key, + :queue => queue.name, + :uses_acknowledgements => route.acknowledgements?, + } + env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) + run_env(env, threadpool) end + bunny_consumers << consumer + queue.subscribe_with(consumer) end private diff --git a/lib/action_subscriber/march_hare/subscriber.rb b/lib/action_subscriber/march_hare/subscriber.rb index 6f80099..2de772d 100644 --- a/lib/action_subscriber/march_hare/subscriber.rb +++ b/lib/action_subscriber/march_hare/subscriber.rb @@ -13,6 +13,10 @@ def cancel_consumers! def march_hare_consumers @march_hare_consumers ||= [] end + + def set_march_hare_consumers(consumers) + @march_hare_consumers = consumers + end def setup_subscriptions! fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty? @@ -25,31 +29,46 @@ def setup_subscriptions! end def start_subscribers! + route_set = self subscriptions.each do |subscription| - route = subscription[:route] - queue = subscription[:queue] - queue.channel.prefetch = route.prefetch if route.acknowledgements? - threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name) - consumer = queue.subscribe(route.queue_subscription_options) do |metadata, encoded_payload| - ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name - properties = { - :action => route.action, - :channel => queue.channel, - :content_type => metadata.content_type, - :delivery_tag => metadata.delivery_tag, - :exchange => metadata.exchange, - :headers => _normalized_headers(metadata), - :message_id => metadata.message_id, - :routing_key => metadata.routing_key, - :queue => queue.name, - :uses_acknowledgements => route.acknowledgements?, - } - env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) - run_env(env, threadpool) - end + start_subscription! subscription + end + end + + def start_subscription!(subscription) + route = subscription[:route] + queue = subscription[:queue] + queue.channel.prefetch = route.prefetch if route.acknowledgements? + threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name) + + cancel = { + :on_cancellation => Proc.new { |channel, consumer| + channel.close + set_march_hare_consumers(march_hare_consumers.reject { |march_hare_consumer| march_hare_consumer == consumer }) - march_hare_consumers << consumer + subscription[:queue] = setup_queue(route) + start_subscription!(subscription) + } + } + consumer = queue.subscribe(cancel.merge(route.queue_subscription_options)) do |metadata, encoded_payload| + ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name + properties = { + :action => route.action, + :channel => queue.channel, + :content_type => metadata.content_type, + :delivery_tag => metadata.delivery_tag, + :exchange => metadata.exchange, + :headers => _normalized_headers(metadata), + :message_id => metadata.message_id, + :routing_key => metadata.routing_key, + :queue => queue.name, + :uses_acknowledgements => route.acknowledgements?, + } + env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) + run_env(env, threadpool) end + + march_hare_consumers << consumer end private diff --git a/spec/integration/consumer_cancel_notify_spec.rb b/spec/integration/consumer_cancel_notify_spec.rb new file mode 100644 index 0000000..3f11536 --- /dev/null +++ b/spec/integration/consumer_cancel_notify_spec.rb @@ -0,0 +1,34 @@ +require "rabbitmq/http/client" + +class ZombieSubscriber < ActionSubscriber::Base + def groan + $messages << payload + end +end + +describe "Rebuilds subscription after receiving consumer_cancel_notify", :integration => true, :slow => true do + let(:draw_routes) do + ::ActionSubscriber.draw_routes do + default_routes_for ZombieSubscriber + end + end + let(:http_client) { RabbitMQ::HTTP::Client.new("http://127.0.0.1:15672") } + let(:subscriber) { ZombieSubscriber } + + it "continues to receive messages following consumer_cancel_notify message" do + ::ActionSubscriber::start_subscribers! + ::ActivePublisher.publish("zombie.groan", "uuunngg", "events") + + delete_queue! + sleep 5.0 + + ::ActivePublisher.publish("zombie.groan", "meeuuhhh", "events") + verify_expectation_within(5.0) do + expect($messages).to eq(Set.new(["uuunngg", "meeuuhhh"])) + end + end + + def delete_queue! + http_client.delete_queue("/","alice.zombie.groan") + end +end