From 947205bd31f592d101ad8a48d2cd16c3cda582e0 Mon Sep 17 00:00:00 2001 From: hugh-j Date: Tue, 6 Nov 2018 13:19:30 -0700 Subject: [PATCH 1/4] enables jruby integration tests to pass on rabbitmq >= 3.7.8 --- action_subscriber.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/action_subscriber.gemspec b/action_subscriber.gemspec index 8740bcb..c534f9d 100644 --- a/action_subscriber.gemspec +++ b/action_subscriber.gemspec @@ -35,7 +35,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "bundler", ">= 1.6" spec.add_development_dependency "pry-coolline" spec.add_development_dependency "pry-nav" - spec.add_development_dependency "rabbitmq_http_api_client", "~> 1.2.0" + spec.add_development_dependency "rabbitmq_http_api_client", "~> 1.9.0" spec.add_development_dependency "rspec", "~> 3.0" spec.add_development_dependency "rake" end From 97acc7579983ec2f30bbc9816089cea4f8e71533 Mon Sep 17 00:00:00 2001 From: hugh-j Date: Tue, 6 Nov 2018 13:20:19 -0700 Subject: [PATCH 2/4] tests subscription rebuilds on consumer_cancel_notify --- .../consumer_cancel_notify_spec.rb | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 spec/integration/consumer_cancel_notify_spec.rb diff --git a/spec/integration/consumer_cancel_notify_spec.rb b/spec/integration/consumer_cancel_notify_spec.rb new file mode 100644 index 0000000..3f11536 --- /dev/null +++ b/spec/integration/consumer_cancel_notify_spec.rb @@ -0,0 +1,34 @@ +require "rabbitmq/http/client" + +class ZombieSubscriber < ActionSubscriber::Base + def groan + $messages << payload + end +end + +describe "Rebuilds subscription after receiving consumer_cancel_notify", :integration => true, :slow => true do + let(:draw_routes) do + ::ActionSubscriber.draw_routes do + default_routes_for ZombieSubscriber + end + end + let(:http_client) { RabbitMQ::HTTP::Client.new("http://127.0.0.1:15672") } + let(:subscriber) { ZombieSubscriber } + + it "continues to receive messages following consumer_cancel_notify message" do + ::ActionSubscriber::start_subscribers! + ::ActivePublisher.publish("zombie.groan", "uuunngg", "events") + + delete_queue! + sleep 5.0 + + ::ActivePublisher.publish("zombie.groan", "meeuuhhh", "events") + verify_expectation_within(5.0) do + expect($messages).to eq(Set.new(["uuunngg", "meeuuhhh"])) + end + end + + def delete_queue! + http_client.delete_queue("/","alice.zombie.groan") + end +end From 99dd9b45105310569c9a84468d71487c1f4bc8e7 Mon Sep 17 00:00:00 2001 From: hugh-j Date: Tue, 6 Nov 2018 13:21:00 -0700 Subject: [PATCH 3/4] rebuilds subscriptions on consumer_cancel_notify --- lib/action_subscriber/bunny/subscriber.rb | 20 +++++++++++--- .../march_hare/subscriber.rb | 27 ++++++++++++++++--- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/lib/action_subscriber/bunny/subscriber.rb b/lib/action_subscriber/bunny/subscriber.rb index 70bfc81..9e1f3a4 100644 --- a/lib/action_subscriber/bunny/subscriber.rb +++ b/lib/action_subscriber/bunny/subscriber.rb @@ -7,6 +7,10 @@ def bunny_consumers @bunny_consumers ||= [] end + def set_bunny_consumers(consumers) + @bunny_consumers = consumers + end + def cancel_consumers! bunny_consumers.each(&:cancel) ::ActionSubscriber::ThreadPools.threadpools.each do |name, threadpool| @@ -24,14 +28,19 @@ def setup_subscriptions! end end - def start_subscribers! - subscriptions.each do |subscription| - route = subscription[:route] + def start_subscription!(subscription) + route = subscription[:route] queue = subscription[:queue] channel = queue.channel threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name) channel.prefetch(route.prefetch) if route.acknowledgements? consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?) + consumer.on_cancellation do |_basic_cancel| + set_bunny_consumers(bunny_consumers.reject { |bunny_consumer| bunny_consumer == consumer }) + subscription[:queue] = setup_queue(route) + start_subscription!(subscription) + end + consumer.on_delivery do |delivery_info, properties, encoded_payload| ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name properties = { @@ -51,6 +60,11 @@ def start_subscribers! end bunny_consumers << consumer queue.subscribe_with(consumer) + end + + def start_subscribers! + subscriptions.each do |subscription| + start_subscription!(subscription) end end diff --git a/lib/action_subscriber/march_hare/subscriber.rb b/lib/action_subscriber/march_hare/subscriber.rb index 6f80099..793c4e0 100644 --- a/lib/action_subscriber/march_hare/subscriber.rb +++ b/lib/action_subscriber/march_hare/subscriber.rb @@ -13,6 +13,10 @@ def cancel_consumers! def march_hare_consumers @march_hare_consumers ||= [] end + + def set_march_hare_consumers(consumers) + @march_hare_consumers = consumers + end def setup_subscriptions! fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty? @@ -23,14 +27,23 @@ def setup_subscriptions! } end end - - def start_subscribers! - subscriptions.each do |subscription| + + def start_subscription!(subscription) route = subscription[:route] queue = subscription[:queue] queue.channel.prefetch = route.prefetch if route.acknowledgements? threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name) - consumer = queue.subscribe(route.queue_subscription_options) do |metadata, encoded_payload| + + cancel = { + :on_cancellation => Proc.new { |channel, consumer| + channel.close + set_march_hare_consumers(march_hare_consumers.reject { |march_hare_consumer| march_hare_consumer == consumer }) + + subscription[:queue] = setup_queue(route) + start_subscription!(subscription) + } + } + consumer = queue.subscribe(cancel.merge(route.queue_subscription_options)) do |metadata, encoded_payload| ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name properties = { :action => route.action, @@ -49,6 +62,12 @@ def start_subscribers! end march_hare_consumers << consumer + end + + def start_subscribers! + route_set = self + subscriptions.each do |subscription| + start_subscription! subscription end end From b33b0b508edc0db67559013e0b011e692c6f4d1a Mon Sep 17 00:00:00 2001 From: hugh-j Date: Tue, 6 Nov 2018 15:16:16 -0700 Subject: [PATCH 4/4] formats code --- lib/action_subscriber/bunny/subscriber.rb | 76 +++++++++---------- .../march_hare/subscriber.rb | 72 +++++++++--------- 2 files changed, 74 insertions(+), 74 deletions(-) diff --git a/lib/action_subscriber/bunny/subscriber.rb b/lib/action_subscriber/bunny/subscriber.rb index 9e1f3a4..493d9a1 100644 --- a/lib/action_subscriber/bunny/subscriber.rb +++ b/lib/action_subscriber/bunny/subscriber.rb @@ -7,10 +7,6 @@ def bunny_consumers @bunny_consumers ||= [] end - def set_bunny_consumers(consumers) - @bunny_consumers = consumers - end - def cancel_consumers! bunny_consumers.each(&:cancel) ::ActionSubscriber::ThreadPools.threadpools.each do |name, threadpool| @@ -18,6 +14,10 @@ def cancel_consumers! end end + def set_bunny_consumers(consumers) + @bunny_consumers = consumers + end + def setup_subscriptions! fail ::RuntimeError, "you cannot setup queues multiple times, this should only happen once at startup" unless subscriptions.empty? routes.each do |route| @@ -28,46 +28,46 @@ def setup_subscriptions! end end - def start_subscription!(subscription) - route = subscription[:route] - queue = subscription[:queue] - channel = queue.channel - threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name) - channel.prefetch(route.prefetch) if route.acknowledgements? - consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?) - consumer.on_cancellation do |_basic_cancel| - set_bunny_consumers(bunny_consumers.reject { |bunny_consumer| bunny_consumer == consumer }) - subscription[:queue] = setup_queue(route) - start_subscription!(subscription) - end - - consumer.on_delivery do |delivery_info, properties, encoded_payload| - ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name - properties = { - :action => route.action, - :channel => queue.channel, - :content_type => properties.content_type, - :delivery_tag => delivery_info.delivery_tag, - :exchange => delivery_info.exchange, - :headers => properties.headers, - :message_id => properties.message_id, - :routing_key => delivery_info.routing_key, - :queue => queue.name, - :uses_acknowledgements => route.acknowledgements?, - } - env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) - run_env(env, threadpool) - end - bunny_consumers << consumer - queue.subscribe_with(consumer) - end - def start_subscribers! subscriptions.each do |subscription| start_subscription!(subscription) end end + def start_subscription!(subscription) + route = subscription[:route] + queue = subscription[:queue] + channel = queue.channel + threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name) + channel.prefetch(route.prefetch) if route.acknowledgements? + consumer = ::Bunny::Consumer.new(channel, queue, channel.generate_consumer_tag, !route.acknowledgements?) + consumer.on_cancellation do |_basic_cancel| + set_bunny_consumers(bunny_consumers.reject { |bunny_consumer| bunny_consumer == consumer }) + subscription[:queue] = setup_queue(route) + start_subscription!(subscription) + end + + consumer.on_delivery do |delivery_info, properties, encoded_payload| + ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name + properties = { + :action => route.action, + :channel => queue.channel, + :content_type => properties.content_type, + :delivery_tag => delivery_info.delivery_tag, + :exchange => delivery_info.exchange, + :headers => properties.headers, + :message_id => properties.message_id, + :routing_key => delivery_info.routing_key, + :queue => queue.name, + :uses_acknowledgements => route.acknowledgements?, + } + env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) + run_env(env, threadpool) + end + bunny_consumers << consumer + queue.subscribe_with(consumer) + end + private def setup_queue(route) diff --git a/lib/action_subscriber/march_hare/subscriber.rb b/lib/action_subscriber/march_hare/subscriber.rb index 793c4e0..2de772d 100644 --- a/lib/action_subscriber/march_hare/subscriber.rb +++ b/lib/action_subscriber/march_hare/subscriber.rb @@ -27,42 +27,6 @@ def setup_subscriptions! } end end - - def start_subscription!(subscription) - route = subscription[:route] - queue = subscription[:queue] - queue.channel.prefetch = route.prefetch if route.acknowledgements? - threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name) - - cancel = { - :on_cancellation => Proc.new { |channel, consumer| - channel.close - set_march_hare_consumers(march_hare_consumers.reject { |march_hare_consumer| march_hare_consumer == consumer }) - - subscription[:queue] = setup_queue(route) - start_subscription!(subscription) - } - } - consumer = queue.subscribe(cancel.merge(route.queue_subscription_options)) do |metadata, encoded_payload| - ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name - properties = { - :action => route.action, - :channel => queue.channel, - :content_type => metadata.content_type, - :delivery_tag => metadata.delivery_tag, - :exchange => metadata.exchange, - :headers => _normalized_headers(metadata), - :message_id => metadata.message_id, - :routing_key => metadata.routing_key, - :queue => queue.name, - :uses_acknowledgements => route.acknowledgements?, - } - env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) - run_env(env, threadpool) - end - - march_hare_consumers << consumer - end def start_subscribers! route_set = self @@ -70,6 +34,42 @@ def start_subscribers! start_subscription! subscription end end + + def start_subscription!(subscription) + route = subscription[:route] + queue = subscription[:queue] + queue.channel.prefetch = route.prefetch if route.acknowledgements? + threadpool = ::ActionSubscriber::ThreadPools.threadpools.fetch(route.threadpool_name) + + cancel = { + :on_cancellation => Proc.new { |channel, consumer| + channel.close + set_march_hare_consumers(march_hare_consumers.reject { |march_hare_consumer| march_hare_consumer == consumer }) + + subscription[:queue] = setup_queue(route) + start_subscription!(subscription) + } + } + consumer = queue.subscribe(cancel.merge(route.queue_subscription_options)) do |metadata, encoded_payload| + ::ActiveSupport::Notifications.instrument "received_event.action_subscriber", :payload_size => encoded_payload.bytesize, :queue => queue.name + properties = { + :action => route.action, + :channel => queue.channel, + :content_type => metadata.content_type, + :delivery_tag => metadata.delivery_tag, + :exchange => metadata.exchange, + :headers => _normalized_headers(metadata), + :message_id => metadata.message_id, + :routing_key => metadata.routing_key, + :queue => queue.name, + :uses_acknowledgements => route.acknowledgements?, + } + env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties) + run_env(env, threadpool) + end + + march_hare_consumers << consumer + end private