diff --git a/lib/active_publisher/async/redis_adapter.rb b/lib/active_publisher/async/redis_adapter.rb index c418e60..d940b6d 100644 --- a/lib/active_publisher/async/redis_adapter.rb +++ b/lib/active_publisher/async/redis_adapter.rb @@ -13,6 +13,7 @@ def self.new(*args) end class Adapter + QUEUE_FLUSH_SIZE = 100 SUPERVISOR_INTERVAL = { :execution_interval => 1.5, # seconds :timeout_interval => 1, # seconds @@ -41,7 +42,7 @@ def initialize(new_redis_pool) def publish(route, payload, exchange_name, options = {}) message = ::ActivePublisher::Message.new(route, payload, exchange_name, options) queue << ::Marshal.dump(message) - flush_queue! if queue.size >= 20 || options[:flush_queue] + flush_queue! if queue.size >= QUEUE_FLUSH_SIZE || options[:flush_queue] nil end diff --git a/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb b/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb index 80ecbf9..154b2b3 100644 --- a/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb +++ b/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb @@ -2,6 +2,8 @@ module ActivePublisher module Async module RedisAdapter class RedisMultiPopQueue + RABBITMQ_PAUSED_KEY = "ACTIVE_PUBLISHER_RABBITMQ_PAUSED".freeze + attr_reader :list_key, :redis_pool def initialize(redis_connection_pool, new_list_key) @@ -36,6 +38,12 @@ def empty? size <= 0 end + def pause_publishing? + redis_pool.with do |redis| + redis.exists?(RABBITMQ_PAUSED_KEY) + end + end + def pop_up_to(num_to_pop, opts = {}) case opts when TrueClass, FalseClass @@ -73,6 +81,7 @@ def pop_up_to(num_to_pop, opts = {}) def shift(number) number = [number, size].min return [] if number <= 0 + return [] if pause_publishing? messages = [] multi_response = []