diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp index a904c6be0aed..a49e1606962a 100644 --- a/ydb/core/blob_depot/data_load.cpp +++ b/ydb/core/blob_depot/data_load.cpp @@ -110,12 +110,16 @@ namespace NKikimr::NBlobDepot { if (!rows.IsReady()) { return false; } + const ui32 generation = Self->Executor()->Generation(); while (rows.IsValid()) { TS3Locator item{ .Len = rows.GetValue(), .Generation = rows.GetValue(), .KeyId = rows.GetValue(), }; + if (item.Generation == generation) { + return true; // we don't want to read newly added items by this tablet's generation + } if (item != from) { Self->S3Manager->AddTrashToCollect(item); from = item; diff --git a/ydb/core/blob_depot/s3.h b/ydb/core/blob_depot/s3.h index f6e10b10f542..ed0720da343e 100644 --- a/ydb/core/blob_depot/s3.h +++ b/ydb/core/blob_depot/s3.h @@ -69,7 +69,9 @@ namespace NKikimr::NBlobDepot { static constexpr ui32 MaxDeletesInFlight = 3; static constexpr size_t MaxObjectsToDeleteAtOnce = 10; - std::deque DeleteQueue; // items we are definitely going to delete (must be present in TrashS3) + // items we are definitely going to delete (must be present in TrashS3) + std::deque DeleteQueueInPrevGenerations; + std::deque DeleteQueueInCurrentGeneration; THashSet ActiveDeleters; ui32 NumDeleteTxInFlight = 0; ui64 TotalS3TrashObjects = 0; diff --git a/ydb/core/blob_depot/s3_delete.cpp b/ydb/core/blob_depot/s3_delete.cpp index 913c598729e2..bd12f5ac3648 100644 --- a/ydb/core/blob_depot/s3_delete.cpp +++ b/ydb/core/blob_depot/s3_delete.cpp @@ -156,18 +156,28 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_INFO, BLOB_DEPOT, BDTS06, "AddTrashToCollect", (Id, Self->GetLogId()), (Locator, locator)); Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_OBJECTS] = ++TotalS3TrashObjects; Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_SIZE] = TotalS3TrashSize += locator.Len; - DeleteQueue.push_back(locator); + if (const ui32 generation = Self->Executor()->Generation(); locator.Generation < generation) { + DeleteQueueInPrevGenerations.push_back(locator); + } else { + Y_DEBUG_ABORT_UNLESS(locator.Generation == generation); + DeleteQueueInCurrentGeneration.push_back(locator); + } RunDeletersIfNeeded(); } void TS3Manager::RunDeletersIfNeeded() { - while (!DeleteQueue.empty() && NumDeleteTxInFlight + ActiveDeleters.size() < MaxDeletesInFlight) { + while (NumDeleteTxInFlight + ActiveDeleters.size() < MaxDeletesInFlight) { + // create list of locators we are going to delete during this operation THashMap locators; - - while (!DeleteQueue.empty() && locators.size() < MaxObjectsToDeleteAtOnce) { - const TS3Locator& locator = DeleteQueue.front(); - locators.emplace(locator.MakeObjectName(BasePath), locator); - DeleteQueue.pop_front(); + for (auto *queue : {&DeleteQueueInPrevGenerations, &DeleteQueueInCurrentGeneration}) { + while (!queue->empty() && locators.size() < MaxObjectsToDeleteAtOnce) { + const TS3Locator& locator = queue->front(); + locators.emplace(locator.MakeObjectName(BasePath), locator); + queue->pop_front(); + } + } + if (!locators) { + break; } const TActorId actorId = Self->Register(new TDeleterActor(Self->SelfId(), locators, Self->GetLogId())); @@ -218,7 +228,15 @@ namespace NKikimr::NBlobDepot { } if (!msg.LocatorsError.empty()) { - DeleteQueue.insert(DeleteQueue.end(), msg.LocatorsError.begin(), msg.LocatorsError.end()); + const ui32 generation = Self->Executor()->Generation(); + for (auto& locator : msg.LocatorsError) { + if (locator.Generation < generation) { + DeleteQueueInPrevGenerations.push_back(locator); + } else { + Y_DEBUG_ABORT_UNLESS(locator.Generation == generation); + DeleteQueueInCurrentGeneration.push_back(locator); + } + } RunDeletersIfNeeded(); } }) diff --git a/ydb/core/blob_depot/s3_write.cpp b/ydb/core/blob_depot/s3_write.cpp index a74ebaa39956..b439b1c67fb4 100644 --- a/ydb/core/blob_depot/s3_write.cpp +++ b/ydb/core/blob_depot/s3_write.cpp @@ -98,9 +98,9 @@ namespace NKikimr::NBlobDepot { }; TS3Locator TS3Manager::AllocateS3Locator(ui32 len) { - if (!DeleteQueue.empty()) { - TS3Locator res = DeleteQueue.front(); - DeleteQueue.pop_front(); + if (!DeleteQueueInCurrentGeneration.empty()) { + TS3Locator res = DeleteQueueInCurrentGeneration.front(); + DeleteQueueInCurrentGeneration.pop_front(); Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_OBJECTS] = --TotalS3TrashObjects; Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_SIZE] = TotalS3TrashSize -= res.Len; res.Len = len;