Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/active_publisher/async/redis_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
module ActivePublisher
module Async
module RedisAdapter
REDIS_LIST_KEY = "ACTIVE_PUBLISHER_LIST".freeze
REDIS_LIST_KEY = "ACTIVE_PUBLISHER_LIST.V2".freeze

def self.new(*args)
::ActivePublisher::Async::RedisAdapter::Adapter.new(*args)
Expand Down Expand Up @@ -42,7 +42,7 @@ def initialize(new_redis_pool)

def publish(route, payload, exchange_name, options = {})
message = ::ActivePublisher::Message.new(route, payload, exchange_name, options)
queue << ::Marshal.dump(message)
queue << message.to_json
flush_queue! if queue.size >= flush_min || options[:flush_queue]

nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require "json"

module ActivePublisher
module Async
module RedisAdapter
Expand All @@ -10,7 +12,7 @@ def initialize(redis_connection_pool, new_list_key)
end

def <<(message)
encoded_message = ::Marshal.dump(message)
encoded_message = message.to_json

redis_pool.with do |redis|
redis.rpush(list_key, encoded_message)
Expand All @@ -24,7 +26,7 @@ def concat(*messages)

encoded_messages = []
messages.each do |message|
encoded_messages << ::Marshal.dump(message)
encoded_messages << message.to_json
end

redis_pool.with do |redis|
Expand Down Expand Up @@ -92,10 +94,10 @@ def shift(number)
messages = [messages] unless messages.respond_to?(:each)

shifted_messages = []

messages.each do |message|
next if message.nil?

shifted_messages << ::Marshal.load(message)
shifted_messages << ::ActivePublisher::Message.from_json(message)
end

shifted_messages
Expand Down
27 changes: 26 additions & 1 deletion lib/active_publisher/message.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,28 @@
require "base64"
require "json"
require "active_support/core_ext/hash/keys"

module ActivePublisher
class Message < Struct.new(:route, :payload, :exchange_name, :options); end
class Message < Struct.new(:route, :payload, :exchange_name, :options)
class << self
def from_json(payload)
parsed = JSON.load(payload)
self.new(
parsed["route"],
Base64.decode64(parsed["payload"]),
parsed["exchange_name"],
parsed["options"].symbolize_keys,
)
end
end

def to_json
{
route: self.route,
payload: Base64.encode64(self.payload),
exchange_name: self.exchange_name,
options: self.options,
}.to_json
end
end
end
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
describe ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue do
let(:list_key) { ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY }
let(:redis_pool) { ::ConnectionPool.new(:size => 5) { ::Redis.new } }
let(:message) { ::ActivePublisher::Message.new('rtg.key', 'payload', 'some.exchange', {})}
let(:ten_messages) { 10.times.map { message } }
subject { described_class.new(redis_pool, list_key) }

describe "initialize with a redis_pool and list_key" do
Expand All @@ -13,29 +15,18 @@

describe "#<<" do
it "pushes 1 item on the list" do
subject << "derp"
subject << message
expect(subject.size).to be 1
expect(subject.pop_up_to(100)).to eq(["derp"])
expect(subject.pop_up_to(100)).to eq([message])
end

it "pushes 10 items on the list" do
10.times do
subject << "derp"
subject << message
end

expect(subject.size).to be 10
expect(subject.pop_up_to(100)).to eq([
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
])
expect(subject.pop_up_to(100)).to eq(ten_messages)
end
end

Expand All @@ -45,86 +36,40 @@
end

it "pushes 1 item on the list" do
subject.concat("derp")
subject.concat(message)
expect(subject.size).to be 1
expect(subject.pop_up_to(100)).to eq(["derp"])
expect(subject.pop_up_to(100)).to eq([message])
end

it "pushes 10 items on the list" do
10.times do
subject.concat("derp")
subject.concat(message)
end

expect(subject.size).to be 10
expect(subject.pop_up_to(100)).to eq([
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
])
expect(subject.pop_up_to(100)).to eq(ten_messages)
end

it "pushes 10 items on the list in single concat" do
subject.concat("derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp")
subject.concat(message,
message,
message,
message,
message,
message,
message,
message,
message,
message)

expect(subject.size).to be 10
expect(subject.pop_up_to(100)).to eq([
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
])
expect(subject.pop_up_to(100)).to eq(ten_messages)
end

it "pushes 10 items on the list in single concat (with array)" do
array = [
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp"
]

subject.concat(array)
subject.concat(ten_messages)
expect(subject.size).to be 10
expect(subject.pop_up_to(100)).to eq([
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
])
expect(subject.pop_up_to(100)).to eq(ten_messages)
end
end

Expand All @@ -135,7 +80,7 @@

it "is false when a single item is inserted to the list_key List" do
redis_pool.with do |redis|
redis.rpush(list_key, "derp")
redis.rpush(list_key, message.to_json)
end

expect(subject.empty?).to be false
Expand All @@ -144,7 +89,7 @@
it "is false when ten items are inserted to the list_key List" do
redis_pool.with do |redis|
10.times do
redis.rpush(list_key, "derp")
redis.rpush(list_key, message.to_json)
end
end

Expand All @@ -159,31 +104,20 @@

it "returns 1 item when a single item is inserted to the list_key List" do
redis_pool.with do |redis|
redis.rpush(list_key, ::Marshal.dump("derp"))
redis.rpush(list_key, message.to_json)
end

expect(subject.pop_up_to(100)).to eq(["derp"])
expect(subject.pop_up_to(100)).to eq([message])
end

it "is 10 when ten items are inserted to the list_key List" do
redis_pool.with do |redis|
10.times do
redis.rpush(list_key, ::Marshal.dump("derp"))
redis.rpush(list_key, message.to_json)
end
end

expect(subject.pop_up_to(100)).to eq([
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
])
expect(subject.pop_up_to(100)).to eq(ten_messages)
end
end

Expand All @@ -194,31 +128,20 @@

it "returns 1 item when a single item is inserted to the list_key List" do
redis_pool.with do |redis|
redis.rpush(list_key, ::Marshal.dump("derp"))
redis.rpush(list_key, message.to_json)
end

expect(subject.shift(100)).to eq(["derp"])
expect(subject.shift(100)).to eq([message])
end

it "is 10 when ten items are inserted to the list_key List" do
redis_pool.with do |redis|
10.times do
redis.rpush(list_key, ::Marshal.dump("derp"))
redis.rpush(list_key, message.to_json)
end
end

expect(subject.shift(100)).to eq([
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
])
expect(subject.shift(100)).to eq(ten_messages)
end
end

Expand All @@ -229,7 +152,7 @@

it "is 1 when a single item is inserted to the list_key List" do
redis_pool.with do |redis|
redis.rpush(list_key, "derp")
redis.rpush(list_key, message.to_json)
end

expect(subject.size).to be 1
Expand Down