From 8335bc7af208056444b8e28c4701f71220e58867 Mon Sep 17 00:00:00 2001 From: Calvert Date: Wed, 6 Jun 2018 12:23:35 -0400 Subject: [PATCH] Modify lease keep alive call to be compatible with grpc proxy --- lib/etcdv3/lease.rb | 43 ++++++++++++++++++++++++++++++++++++--- spec/etcdv3/lease_spec.rb | 5 +++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/lib/etcdv3/lease.rb b/lib/etcdv3/lease.rb index 37c64ff..ec948ae 100644 --- a/lib/etcdv3/lease.rb +++ b/lib/etcdv3/lease.rb @@ -1,5 +1,32 @@ class Etcdv3 class Lease + + # this is used for gRPC proxy compatibility so that we do not + # mark as finished writing until we've received a response + class BlockingRequest + def initialize(request_op) + @blocked = false + @request_op = request_op + end + + def read_done! + @proceed = true + end + + def blocked? + @blocked + end + + def each + @blocked = true + + yield @request_op + + sleep 0.001 until @proceed == true + @blocked = false + end + end + def initialize(hostname, credentials, timeout, metadata={}) @stub = Etcdserverpb::Lease::Stub.new(hostname, credentials) @timeout = timeout @@ -22,10 +49,20 @@ def lease_ttl(id, timeout: nil) end def lease_keep_alive_once(id, timeout: nil) - request = Etcdserverpb::LeaseKeepAliveRequest.new(ID: id) - @stub.lease_keep_alive([request], metadata: @metadata, deadline: deadline(timeout)).each do |resp| - return resp + request = BlockingRequest.new Etcdserverpb::LeaseKeepAliveRequest.new(ID: id) + response = nil + begin + @stub.lease_keep_alive(request, metadata: @metadata, deadline: deadline(timeout)).each do |resp| + response = resp + break; + end + ensure + request.read_done! + while request.blocked? + sleep 0.001 + end end + return response end private diff --git a/spec/etcdv3/lease_spec.rb b/spec/etcdv3/lease_spec.rb index 01db3d7..9ff113f 100644 --- a/spec/etcdv3/lease_spec.rb +++ b/spec/etcdv3/lease_spec.rb @@ -32,6 +32,11 @@ stub = local_stub(Etcdv3::Lease, 0) expect { stub.lease_keep_alive_once(id) }.to raise_error(GRPC::DeadlineExceeded) end + it "doesn't orphan threads if there is a server error" do + expect_any_instance_of(GRPC::BidiCall).to receive(:read_loop).and_raise(GRPC::DeadlineExceeded) + stub = local_stub(Etcdv3::Lease, 2) + expect { stub.lease_keep_alive_once(314159) rescue nil; sleep 0.5}.to_not change { Thread.list.size } + end end describe '#lease_ttl' do