Skip to content

Commit dc64d31

Browse files
committed
[ICRDMA] Event based cq implementation. EXT-1228 (#29463)
Add blocking mode (ibv_get_cq_event) to reduce CPU consumption. https://www.rdmamojo.com/2013/03/09/ibv_get_cq_event/ Conflicts: ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h ydb/library/actors/interconnect/ut/lib/node.h
1 parent 60632eb commit dc64d31

File tree

13 files changed

+366
-82
lines changed

13 files changed

+366
-82
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -633,9 +633,23 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
633633
TIntrusivePtr<TInterconnectProxyCommon> icCommon;
634634
icCommon.Reset(new TInterconnectProxyCommon);
635635

636+
NMonitoring::TDynamicCounterPtr interconectCounters = GetServiceCounters(counters, "interconnect");
637+
636638
if (icConfig.GetUseRdma()) {
639+
NInterconnect::NRdma::ECqMode rdmaCqMode = NInterconnect::NRdma::ECqMode::EVENT;
640+
if (icConfig.HasRdmaCqMode()) {
641+
switch (icConfig.GetRdmaCqMode()) {
642+
case NKikimrConfig::TInterconnectConfig::CQ_EVENT:
643+
rdmaCqMode = NInterconnect::NRdma::ECqMode::EVENT;
644+
break;
645+
case NKikimrConfig::TInterconnectConfig::CQ_POLLING:
646+
rdmaCqMode = NInterconnect::NRdma::ECqMode::POLLING;
647+
break;
648+
}
649+
}
637650
setup->LocalServices.emplace_back(NInterconnect::NRdma::MakeCqActorId(),
638-
TActorSetupCmd(NInterconnect::NRdma::CreateCqActor(-1, icConfig.GetRdmaMaxWr()), TMailboxType::ReadAsFilled, interconnectPoolId));
651+
TActorSetupCmd(NInterconnect::NRdma::CreateCqActor(-1, icConfig.GetRdmaMaxWr(), rdmaCqMode, interconectCounters.Get()),
652+
TMailboxType::ReadAsFilled, interconnectPoolId));
639653

640654
// Interconnect uses rdma mem pool directly
641655
const auto counters = GetServiceCounters(appData->Counters, "utils");
@@ -644,7 +658,7 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
644658
setup->RcBufAllocator = std::make_shared<TRdmaAllocatorWithFallback>(icCommon->RdmaMemPool);
645659
}
646660
icCommon->NameserviceId = nameserviceId;
647-
icCommon->MonCounters = GetServiceCounters(counters, "interconnect");
661+
icCommon->MonCounters = interconectCounters;
648662
icCommon->ChannelsConfig = channels;
649663
icCommon->Settings = settings;
650664
icCommon->DestructorId = GetDestructActorID();

ydb/core/protos/config.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,11 @@ message TInterconnectConfig {
442442
IC_SO_MSG_ZEROCOPY = 1;
443443
};
444444

445+
enum ERdmaCqMode {
446+
CQ_POLLING = 0;
447+
CQ_EVENT = 1;
448+
};
449+
445450
repeated TChannel Channel = 1;
446451
optional bool FirstTryBeforePoll = 2; // DEPRECATED
447452
optional bool StartTcp = 3 [default = false];
@@ -479,6 +484,7 @@ message TInterconnectConfig {
479484
optional bool UseRdma = 52;
480485
optional uint32 RdmaMaxWr = 53 [default = 4096];
481486
optional bool RdmaChecksum = 54 [default = true];
487+
optional ERdmaCqMode RdmaCqMode = 55;
482488

483489
// ballast is added to IC handshake frames to ensure correctness of jumbo frames transmission over network
484490
optional uint32 HandshakeBallastSize = 14;

ydb/library/actors/interconnect/rdma/cq_actor/cq_actor.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,18 @@ class TCqActor: public TInterconnectLoggingBase, public TActorBootstrapped<TCqAc
226226
TCqFactory CqFactory;
227227
};
228228

229-
NActors::IActor* CreateCqActor(int maxCqe, int maxWr, NMonitoring::TDynamicCounters* counters) {
230-
return new TCqActor([maxCqe, maxWr, counters](const TRdmaCtx* ctx) {
231-
return CreateSimpleCq(ctx, TlsActivationContext->AsActorContext().ActorSystem(), maxCqe, maxWr, counters);
232-
});
229+
NActors::IActor* CreateCqActor(int maxCqe, int maxWr, ECqMode mode, NMonitoring::TDynamicCounters* counters) {
230+
switch (mode) {
231+
case NInterconnect::NRdma::ECqMode::POLLING:
232+
return new TCqActor([maxCqe, maxWr, counters](const TRdmaCtx* ctx) {
233+
return CreateSimpleCq(ctx, TlsActivationContext->AsActorContext().ActorSystem(), maxCqe, maxWr, counters);
234+
});
235+
236+
case NInterconnect::NRdma::ECqMode::EVENT:
237+
return new TCqActor([maxCqe, maxWr, counters](const TRdmaCtx* ctx) {
238+
return CreateSimpleEventDrivenCq(ctx, TlsActivationContext->AsActorContext().ActorSystem(), maxCqe, maxWr, counters);
239+
});
240+
}
233241
}
234242

235243
NActors::TActorId MakeCqActorId() {

ydb/library/actors/interconnect/rdma/cq_actor/cq_actor.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,17 @@ namespace NMonitoring {
1212

1313
namespace NInterconnect::NRdma {
1414

15+
enum class ECqMode : ui8 {
16+
POLLING = 0,
17+
EVENT = 1
18+
};
19+
1520
/*
1621
* Creates CQ actor - abstraction to commuticate with CQ from actor system.
1722
* creates at least one CQ per rdma context
1823
* maxCqe - max capacity of single queue under CQ actor abstruction. -1 - use limit from rdma context
1924
*/
20-
NActors::IActor* CreateCqActor(int maxCqe, int maxWr, NMonitoring::TDynamicCounters* counters = nullptr);
25+
NActors::IActor* CreateCqActor(int maxCqe, int maxWr, ECqMode mode, NMonitoring::TDynamicCounters* counters);
2126
NActors::TActorId MakeCqActorId();
2227

2328
}

ydb/library/actors/interconnect/rdma/rdma.cpp

Lines changed: 117 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <library/cpp/monlib/dynamic_counters/counters.h>
1212

1313
#include <util/system/thread.h>
14+
#include <util/system/yield.h>
1415

1516
namespace NInterconnect::NRdma {
1617

@@ -22,25 +23,135 @@ NMonitoring::TDynamicCounterPtr MakeCounters(NMonitoring::TDynamicCounters* coun
2223
return counters;
2324
}
2425

25-
TCqCommon::~TCqCommon() {
26-
if (Cq) {
27-
ibv_destroy_cq(Cq);
28-
}
26+
static void SigHandler(int) noexcept {
27+
// Empty handler. We just need to interrupt read syscall
28+
}
29+
30+
void SetSigHandler() noexcept {
31+
struct sigaction sigActionData;
32+
sigemptyset(&sigActionData.sa_mask);
33+
sigActionData.sa_handler = &SigHandler;
34+
sigActionData.sa_flags = SA_INTERRUPT;
35+
sigaction(SIGUSR1, &sigActionData, nullptr);
2936
}
3037

3138
class TSimpleCq: public TSimpleCqBase {
3239
public:
33-
using TSimpleCqBase::TSimpleCqBase;
40+
TSimpleCq(NActors::TActorSystem* as, size_t sz, NMonitoring::TDynamicCounters* c) noexcept
41+
: TSimpleCqBase(as, sz, c, true)
42+
{}
43+
44+
int Init(const TRdmaCtx* ctx, int maxCqe) noexcept {
45+
return TSimpleCqBase::Init(ctx, maxCqe, nullptr);
46+
}
3447

3548
virtual ~TSimpleCq() {
36-
Stop();
49+
// For simple polling mode CQ, we just can destroy ibv CQ without any issues just aftre joining to the thread
50+
Cont.store(false, std::memory_order_relaxed);
51+
if (Thread.Running())
52+
Thread.Join();
53+
54+
DestroyCq();
55+
}
56+
};
57+
58+
class TSimpleEventDrivenCq: public TSimpleCqBase {
59+
public:
60+
TSimpleEventDrivenCq(NActors::TActorSystem* as, size_t sz, NMonitoring::TDynamicCounters* c) noexcept
61+
: TSimpleCqBase(as, sz, c, false)
62+
{}
63+
64+
int Init(const TRdmaCtx* ctx, int maxCqe) noexcept {
65+
CompChannel = ibv_create_comp_channel(ctx->GetContext());
66+
if (!CompChannel) {
67+
return errno;
68+
}
69+
70+
int err = TSimpleCqBase::Init(ctx, maxCqe, CompChannel);
71+
if (err) {
72+
return err;
73+
}
74+
75+
err = ibv_req_notify_cq(Cq, 0);
76+
if (err) {
77+
return errno;
78+
}
79+
80+
return 0;
81+
}
82+
83+
void Idle() noexcept override final {
84+
struct ibv_cq *evCq = nullptr;
85+
void *evCtx = nullptr;
86+
87+
int err = ibv_get_cq_event(CompChannel, &evCq, &evCtx);
88+
if (err) {
89+
if (errno != EINTR) {
90+
NotifyErr();
91+
return;
92+
}
93+
}
94+
95+
if (!evCq) {
96+
return;
97+
}
98+
// TODO: batch ack
99+
ibv_ack_cq_events(evCq, 1);
100+
err = ibv_req_notify_cq(evCq, 0);
101+
if (err) {
102+
Cerr << "Couldn't request CQ notification\n" << Endl;
103+
NotifyErr();
104+
Y_DEBUG_ABORT_UNLESS(false);
105+
}
37106
}
107+
108+
virtual ~TSimpleEventDrivenCq() {
109+
// For event driven CQ stopping is a bit complicated
110+
111+
// 1. Lock the verbs builder. This prevents possibility to add new WR. Not nessesearly but just to be sure
112+
// No deadlock here - the builder routine is protected by TryLock semantic.
113+
VerbsBuildingState.Lock.Acquire();
114+
115+
// 2. Set flag to exit from loop
116+
Cont.store(false, std::memory_order_relaxed);
117+
118+
// 3. Send signal to the thread to interrupt waiting on the read syscall ()
119+
// NOTE: There is a tiny chanse the signal was send before thread blocked on the read syscall
120+
// so in this case repeat send signal until cq thread finished
121+
while (!Finished.load(std::memory_order_relaxed)) {
122+
Awake();
123+
if (Finished.load(std::memory_order_relaxed)) {
124+
break;
125+
}
126+
ThreadYield();
127+
}
128+
129+
// 4. As usual, join and destroy CQ
130+
if (Thread.Running())
131+
Thread.Join();
132+
133+
DestroyCq();
134+
135+
// 5. Destroy completion event channel
136+
if (ibv_destroy_comp_channel(CompChannel)) {
137+
// https://www.rdmamojo.com/2012/10/26/ibv_destroy_comp_channel
138+
Cerr << "Unable to destroy completion event channel, errno: " << errno << Endl;
139+
// it should not happen, but if it happens it is not a fatal error for production
140+
Y_DEBUG_ABORT_UNLESS(false);
141+
}
142+
}
143+
private:
144+
ibv_comp_channel* CompChannel;
38145
};
39146

40147
ICq::TPtr CreateSimpleCq(const TRdmaCtx* ctx, NActors::TActorSystem* as, int maxCqe, int maxWr, NMonitoring::TDynamicCounters* counter) noexcept {
41148
return CreateCq<TSimpleCq>(ctx, as, maxCqe, maxWr, counter);
42149
}
43150

151+
ICq::TPtr CreateSimpleEventDrivenCq(const TRdmaCtx* ctx, NActors::TActorSystem* as, int maxCqe, int maxWr, NMonitoring::TDynamicCounters* counter) noexcept {
152+
return CreateCq<TSimpleEventDrivenCq>(ctx, as, maxCqe, maxWr, counter);
153+
}
154+
44155
const int TQueuePair::UnknownQpState = IBV_QPS_UNKNOWN;
45156

46157
TQueuePair::~TQueuePair() {

ydb/library/actors/interconnect/rdma/rdma.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ class ICq {
7777
};
7878

7979
ICq::TPtr CreateSimpleCq(const TRdmaCtx* ctx, NActors::TActorSystem* as, int maxCqe, int maxWr, NMonitoring::TDynamicCounters* counter) noexcept;
80+
ICq::TPtr CreateSimpleEventDrivenCq(const TRdmaCtx* ctx, NActors::TActorSystem* as, int maxCqe, int maxWr, NMonitoring::TDynamicCounters* counter) noexcept;
8081

8182
struct THandshakeData {
8283
ui32 QpNum;

0 commit comments

Comments
 (0)