@@ -30,8 +30,6 @@ TString TKafkaProduceActor::LogPrefix() {
3030 sb << " Init " ;
3131 } else if (stateFunc == &TKafkaProduceActor::StateWork) {
3232 sb << " Work " ;
33- } else if (stateFunc == &TKafkaProduceActor::StateAccepting) {
34- sb << " Accepting " ;
3533 } else {
3634 sb << " Unknown " ;
3735 }
@@ -229,22 +227,22 @@ void TKafkaProduceActor::Handle(TEvKafka::TEvProduceRequest::TPtr request, const
229227
230228void TKafkaProduceActor::ProcessRequests (const TActorContext& ctx) {
231229 if (&TKafkaProduceActor::StateWork != CurrentStateFunc ()) {
232- KAFKA_LOG_ERROR (" Produce actor: Unexpected state" );
233230 return ;
234231 }
235232
236233 if (Requests.empty ()) {
237234 return ;
238235 }
239236
240- if (EnqueueInitialization ()) {
237+ auto canProcess = EnqueueInitialization ();
238+ while (canProcess--) {
241239 PendingRequests.push_back (std::make_shared<TPendingRequest>(Requests.front ()));
242240 Requests.pop_front ();
243241
244242 ProcessRequest (PendingRequests.back (), ctx);
245- } else {
246- ProcessInitializationRequests (ctx);
247243 }
244+
245+ ProcessInitializationRequests (ctx);
248246}
249247
250248size_t TKafkaProduceActor::EnqueueInitialization () {
@@ -399,12 +397,10 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
399397 if (pendingRequest->WaitResultCookies .empty ()) {
400398 // All request for unknown topic or empty request
401399 SendResults (ctx);
402- } else {
403- Become (&TKafkaProduceActor::StateAccepting);
404400 }
405401}
406402
407- void TKafkaProduceActor::HandleAccepting (TEvPartitionWriter::TEvWriteAccepted::TPtr request, const TActorContext& ctx) {
403+ void TKafkaProduceActor::Handle (TEvPartitionWriter::TEvWriteAccepted::TPtr request, const TActorContext& ctx) {
408404 auto r = request->Get ();
409405 auto cookie = r->Cookie ;
410406
@@ -422,68 +418,87 @@ void TKafkaProduceActor::HandleAccepting(TEvPartitionWriter::TEvWriteAccepted::T
422418 Become (&TKafkaProduceActor::StateWork);
423419 ProcessRequests (ctx);
424420 } else {
425- KAFKA_LOG_W (" Still in Accepting state after TEvPartitionWriter::TEvWriteAccepted cause cookies are expected: " << JoinSeq (" , " , expectedCookies));
421+ KAFKA_LOG_W (" Still in accepting after receive TEvPartitionWriter::TEvWriteAccepted cause cookies are expected: " << JoinSeq (" , " , expectedCookies));
426422 }
427423}
428424
429425void TKafkaProduceActor::Handle (TEvPartitionWriter::TEvInitResult::TPtr request, const TActorContext& /* ctx*/ ) {
430426 KAFKA_LOG_D (" Produce actor: Init " << request->Get ()->ToString ());
427+
428+ if (!request->Get ()->IsSuccess ()) {
429+ auto sender = request->Sender ;
430+
431+ if (WriterDied (sender, EKafkaErrors::UNKNOWN_SERVER_ERROR, request->Get ()->GetError ().Reason )) {
432+ KAFKA_LOG_D (" Produce actor: Received TEvPartitionWriter::TEvInitResult for " << sender << " with error: " << request->Get ()->GetError ().Reason );
433+ return ;
434+ }
435+
436+ KAFKA_LOG_D (" Produce actor: Received TEvPartitionWriter::TEvInitResult with unexpected writer " << sender);
437+ }
431438}
432439
433- void TKafkaProduceActor::Handle (TEvPartitionWriter::TEvDisconnected::TPtr request, const TActorContext& ctx) {
440+ void TKafkaProduceActor::Handle (TEvPartitionWriter::TEvDisconnected::TPtr request, const TActorContext& /* ctx*/ ) {
434441 auto sender = request->Sender ;
435442
436- auto [topicPath, partitionId] = WriterDied (sender);
437- if (topicPath.empty ()) {
438- KAFKA_LOG_D (" Produce actor: Received TEvPartitionWriter::TEvDisconnected with unexpected writer " << sender);
443+ if (WriterDied (sender, EKafkaErrors::NOT_LEADER_OR_FOLLOWER, TStringBuilder () << " Partition writer " << sender << " disconnected" )) {
444+ KAFKA_LOG_D (" Produce actor: Received TEvPartitionWriter::TEvDisconnected for " << sender);
439445 return ;
440446 }
441447
442- KAFKA_LOG_D (" Produce actor: Received TEvPartitionWriter::TEvDisconnected for " << topicPath << " :" << partitionId);
448+ KAFKA_LOG_D (" Produce actor: Received TEvPartitionWriter::TEvDisconnected with unexpected writer " << sender);
449+ }
450+
451+ bool TKafkaProduceActor::WriterDied (const TActorId& writerId, EKafkaErrors errorCode, TStringBuf errorMessage) {
452+ auto findAndCleanWriter = [&]() -> std::pair<TString, ui32> {
453+ for (auto it = TransactionalWriters.begin (); it != TransactionalWriters.end (); ++it) {
454+ if (it->second .ActorId == writerId) {
455+ auto id = it->first ;
456+ CleanWriter (id, writerId);
457+ TransactionalWriters.erase (it);
458+ return {id.TopicPath , id.PartitionId };
459+ }
460+ }
461+
462+ for (auto & [topicPath, partitionWriters] : NonTransactionalWriters) {
463+ for (auto it = partitionWriters.begin (); it != partitionWriters.end (); ++it) {
464+ if (it->second .ActorId == writerId) {
465+ auto id = it->first ;
466+ CleanWriter ({topicPath, static_cast <ui32>(id)}, writerId);
467+ partitionWriters.erase (it);
468+ return {topicPath, static_cast <ui32>(id)};
469+ }
470+ }
471+ }
472+
473+ return {" " , 0 };
474+ };
475+
476+ auto [topicPath, partitionId] = findAndCleanWriter ();
477+ if (topicPath.empty ()) {
478+ return false ;
479+ }
443480
444481 for (auto it = Cookies.begin (); it != Cookies.end ();) {
445482 auto cookie = it->first ;
446483 auto & info = it->second ;
447484
448485 if (info.TopicPath == topicPath && info.PartitionId == partitionId) {
449- info.Request ->Results [info.Position ].ErrorCode = EKafkaErrors::NOT_LEADER_OR_FOLLOWER ;
450- info.Request ->Results [info.Position ].ErrorMessage = TStringBuilder () << " Partition writer " << sender << " disconnected " ;
486+ info.Request ->Results [info.Position ].ErrorCode = errorCode ;
487+ info.Request ->Results [info.Position ].ErrorMessage = errorMessage ;
451488 info.Request ->WaitAcceptingCookies .erase (cookie);
452489 info.Request ->WaitResultCookies .erase (cookie);
453490
454491 if (info.Request ->WaitAcceptingCookies .empty () && info.Request ->WaitResultCookies .empty ()) {
455- SendResults (ctx );
492+ SendResults (ActorContext () );
456493 }
457494
458495 it = Cookies.erase (it);
459496 } else {
460497 ++it;
461498 }
462499 }
463- }
464-
465- std::pair<TString, ui32> TKafkaProduceActor::WriterDied (const TActorId& writerId) {
466- for (auto it = TransactionalWriters.begin (); it != TransactionalWriters.end (); ++it) {
467- if (it->second .ActorId == writerId) {
468- auto id = it->first ;
469- CleanWriter (id, writerId);
470- TransactionalWriters.erase (it);
471- return {id.TopicPath , id.PartitionId };
472- }
473- }
474-
475- for (auto & [topicPath, partitionWriters] : NonTransactionalWriters) {
476- for (auto it = partitionWriters.begin (); it != partitionWriters.end (); ++it) {
477- if (it->second .ActorId == writerId) {
478- auto id = it->first ;
479- CleanWriter ({topicPath, static_cast <ui32>(id)}, writerId);
480- partitionWriters.erase (it);
481- return {topicPath, static_cast <ui32>(id)};
482- }
483- }
484- }
485500
486- return { " " , 0 } ;
501+ return true ;
487502}
488503
489504void TKafkaProduceActor::Handle (TEvPartitionWriter::TEvWriteResponse::TPtr request, const TActorContext& ctx) {
@@ -641,20 +656,6 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
641656
642657 Send (Context->ConnectionId , new TEvKafka::TEvResponse (correlationId, response, metricsErrorCode));
643658
644- if (!pendingRequest->WaitAcceptingCookies .empty ()) {
645- if (!expired) {
646- TStringBuilder sb;
647- sb << " Produce actor: All TEvWriteResponse were received, but not all TEvWriteAccepted. Unreceived cookies:" ;
648- for (auto cookie : pendingRequest->WaitAcceptingCookies ) {
649- sb << " " << cookie;
650- }
651- KAFKA_LOG_W (sb);
652- }
653- if (&TKafkaProduceActor::StateAccepting == CurrentStateFunc ()) {
654- Become (&TKafkaProduceActor::StateWork);
655- }
656- }
657-
658659 for (auto cookie : pendingRequest->WaitAcceptingCookies ) {
659660 Cookies.erase (cookie);
660661 }
@@ -664,6 +665,8 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
664665
665666 PendingRequests.pop_front ();
666667 }
668+
669+ ProcessRequests (ctx);
667670}
668671
669672void TKafkaProduceActor::ProcessInitializationRequests (const TActorContext& ctx) {
@@ -738,18 +741,19 @@ void TKafkaProduceActor::SendWriteRequest(const TProduceRequestData::TTopicProdu
738741 auto & result = pendingRequest->Results [position];
739742 if (OK == writer.first ) {
740743 auto ownCookie = ++Cookie;
741- auto & cookieInfo = Cookies[ownCookie];
742- cookieInfo.TopicPath = topicPath;
743- cookieInfo.PartitionId = partitionId;
744- cookieInfo.Position = position;
745- cookieInfo.RuPerRequest = ruPerRequest;
746- cookieInfo.Request = pendingRequest;
747-
748- pendingRequest->WaitAcceptingCookies .insert (ownCookie);
749- pendingRequest->WaitResultCookies .insert (ownCookie);
750744
751745 auto [error, ev] = Convert (transactionalId.GetOrElse (" " ), partitionData, topicPath, ownCookie, ClientDC, ruPerRequest);
752746 if (error == EKafkaErrors::NONE_ERROR) {
747+ auto & cookieInfo = Cookies[ownCookie];
748+ cookieInfo.TopicPath = topicPath;
749+ cookieInfo.PartitionId = partitionId;
750+ cookieInfo.Position = position;
751+ cookieInfo.RuPerRequest = ruPerRequest;
752+ cookieInfo.Request = pendingRequest;
753+
754+ pendingRequest->WaitAcceptingCookies .insert (ownCookie);
755+ pendingRequest->WaitResultCookies .insert (ownCookie);
756+
753757 ruPerRequest = false ;
754758 KAFKA_LOG_T (" Sending TEvPartitionWriter::TEvWriteRequest to " << writer.second << " with cookie " << ownCookie);
755759 Send (writer.second , std::move (ev));
0 commit comments