Skip to content

Commit 1f773c9

Browse files
committed
Fixed an error that caused the pipe break to not be handled. (#30096)
1 parent 0a4b372 commit 1f773c9

File tree

3 files changed

+94
-1
lines changed

3 files changed

+94
-1
lines changed

ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,62 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvInitResult::TPtr request,
430430
KAFKA_LOG_D("Produce actor: Init " << request->Get()->ToString());
431431
}
432432

433+
void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvDisconnected::TPtr request, const TActorContext& ctx) {
434+
auto sender = request->Sender;
435+
436+
auto [topicPath, partitionId] = WriterDied(sender);
437+
if (topicPath.empty()) {
438+
KAFKA_LOG_D("Produce actor: Received TEvPartitionWriter::TEvDisconnected with unexpected writer " << sender);
439+
return;
440+
}
441+
442+
KAFKA_LOG_D("Produce actor: Received TEvPartitionWriter::TEvDisconnected for " << topicPath << ":" << partitionId);
443+
444+
for (auto it = Cookies.begin(); it != Cookies.end();) {
445+
auto cookie = it->first;
446+
auto& info = it->second;
447+
448+
if (info.TopicPath == topicPath && info.PartitionId == partitionId) {
449+
info.Request->Results[info.Position].ErrorCode = EKafkaErrors::NOT_LEADER_OR_FOLLOWER;
450+
info.Request->Results[info.Position].ErrorMessage = TStringBuilder() << "Partition writer " << sender << " disconnected";
451+
info.Request->WaitAcceptingCookies.erase(cookie);
452+
info.Request->WaitResultCookies.erase(cookie);
453+
454+
if (info.Request->WaitAcceptingCookies.empty() && info.Request->WaitResultCookies.empty()) {
455+
SendResults(ctx);
456+
}
457+
458+
it = Cookies.erase(it);
459+
} else {
460+
++it;
461+
}
462+
}
463+
}
464+
465+
std::pair<TString, ui32> TKafkaProduceActor::WriterDied(const TActorId& writerId) {
466+
for (auto it = TransactionalWriters.begin(); it != TransactionalWriters.end(); ++it) {
467+
if (it->second.ActorId == writerId) {
468+
auto id = it->first;
469+
CleanWriter(id, writerId);
470+
TransactionalWriters.erase(it);
471+
return {id.TopicPath, id.PartitionId};
472+
}
473+
}
474+
475+
for (auto& [topicPath, partitionWriters] : NonTransactionalWriters) {
476+
for (auto it = partitionWriters.begin(); it != partitionWriters.end(); ++it) {
477+
if (it->second.ActorId == writerId) {
478+
auto id = it->first;
479+
CleanWriter({topicPath, static_cast<ui32>(id)}, writerId);
480+
partitionWriters.erase(it);
481+
return {topicPath, static_cast<ui32>(id)};
482+
}
483+
}
484+
}
485+
486+
return {"", 0};
487+
}
488+
433489
void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr request, const TActorContext& ctx) {
434490
auto r = request->Get();
435491
auto cookie = r->Record.GetPartitionResponse().GetCookie();
@@ -532,7 +588,8 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
532588
size_t recordsCount = partitionData.Records.has_value() ? partitionData.Records->Records.size() : 0;
533589
partitionResponse.Index = partitionData.Index;
534590
if (EKafkaErrors::NONE_ERROR != result.ErrorCode) {
535-
KAFKA_LOG_ERROR("Produce actor: Partition result with error: ErrorCode=" << static_cast<int>(result.ErrorCode) << ", ErrorMessage=" << result.ErrorMessage << ", #01");
591+
KAFKA_LOG_ERROR("Produce actor: Partition result with error: ErrorCode=" << static_cast<int>(result.ErrorCode)
592+
<< ", ErrorMessage=" << result.ErrorMessage << ", #01");
536593
partitionResponse.ErrorCode = result.ErrorCode;
537594
metricsErrorCode = result.ErrorCode;
538595
partitionResponse.ErrorMessage = result.ErrorMessage;

ydb/core/kafka_proxy/actors/kafka_produce_actor.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>
5454
void Handle(TEvKafka::TEvWakeup::TPtr request, const TActorContext& ctx);
5555
void Handle(TEvPartitionWriter::TEvWriteResponse::TPtr request, const TActorContext& ctx);
5656
void Handle(TEvPartitionWriter::TEvInitResult::TPtr request, const TActorContext& ctx);
57+
void Handle(TEvPartitionWriter::TEvDisconnected::TPtr request, const TActorContext& ctx);
5758
void EnqueueRequest(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& ctx);
5859
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev, const TActorContext& ctx);
5960
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);
@@ -69,6 +70,7 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>
6970
HFunc(TEvKafka::TEvProduceRequest, EnqueueRequest);
7071
HFunc(TEvPartitionWriter::TEvInitResult, Handle);
7172
HFunc(TEvPartitionWriter::TEvWriteResponse, Handle);
73+
HFunc(TEvPartitionWriter::TEvDisconnected, Handle);
7274

7375
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
7476
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
@@ -87,6 +89,7 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>
8789
HFunc(TEvKafka::TEvProduceRequest, Handle);
8890
HFunc(TEvPartitionWriter::TEvInitResult, Handle);
8991
HFunc(TEvPartitionWriter::TEvWriteResponse, Handle);
92+
HFunc(TEvPartitionWriter::TEvDisconnected, Handle);
9093

9194
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
9295
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
@@ -108,6 +111,7 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>
108111
HFunc(TEvKafka::TEvProduceRequest, EnqueueRequest);
109112
HFunc(TEvPartitionWriter::TEvInitResult, Handle);
110113
HFunc(TEvPartitionWriter::TEvWriteResponse, Handle);
114+
HFunc(TEvPartitionWriter::TEvDisconnected, Handle);
111115

112116
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
113117
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
@@ -129,6 +133,7 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>
129133
void CleanTopics(const TActorContext& ctx);
130134
void CleanWriters(const TActorContext& ctx);
131135
std::pair<ETopicStatus, TActorId> PartitionWriter(const TTopicPartition& topicPartition, const TProducerInstanceId& producerInstanceId, const TMaybe<TString>& transactionalId, const TActorContext& ctx);
136+
std::pair<TString, ui32> WriterDied(const TActorId& writerId);
132137

133138
TString LogPrefix();
134139
void LogEvent(IEventHandle& ev);

ydb/core/kafka_proxy/ut/ut_produce_actor.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,5 +301,36 @@ namespace {
301301
UNIT_ASSERT(response != nullptr);
302302
UNIT_ASSERT_VALUES_EQUAL(response->ErrorCode, NKafka::EKafkaErrors::REQUEST_TIMED_OUT);
303303
}
304+
305+
Y_UNIT_TEST(OnProduce_andPipeDisconnected) {
306+
i64 producerId = 1;
307+
i32 producerEpoch = 2;
308+
309+
int writeRequestsCounter = 0;
310+
int poisonPillCounter = 0;
311+
312+
auto observer = [&](TAutoPtr<IEventHandle>& input) {
313+
if (input->CastAsLocal<TEvPartitionWriter::TEvWriteRequest>()) {
314+
if (writeRequestsCounter++ == 0) {
315+
auto r = std::make_unique<TEvPartitionWriter::TEvDisconnected>(TEvPartitionWriter::TEvWriteResponse::EErrorCode::InternalError);
316+
Ctx->Runtime->Send(new IEventHandle(input->Sender, input->Recipient, r.release()));
317+
return TTestActorRuntimeBase::EEventAction::DROP;
318+
}
319+
} else if (input->CastAsLocal<TEvents::TEvPoison>()) {
320+
poisonPillCounter++;
321+
}
322+
323+
return TTestActorRuntimeBase::EEventAction::PROCESS;
324+
};
325+
326+
Ctx->Runtime->SetObserverFunc(observer);
327+
328+
SendProduce({}, producerId, producerEpoch);
329+
330+
auto response = Ctx->Runtime->GrabEdgeEvent<NKafka::TEvKafka::TEvResponse>();
331+
UNIT_ASSERT(response);
332+
UNIT_ASSERT_VALUES_EQUAL(response->ErrorCode, NKafka::EKafkaErrors::NOT_LEADER_OR_FOLLOWER);
333+
UNIT_ASSERT_VALUES_EQUAL(std::dynamic_pointer_cast<NKafka::TProduceResponseData>(response->Response)->Responses[0].PartitionResponses[0].ErrorCode, NKafka::EKafkaErrors::NOT_LEADER_OR_FOLLOWER);
334+
}
304335
}
305336
} // anonymous namespace

0 commit comments

Comments
 (0)