Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/blob_depot/data_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,16 @@ namespace NKikimr::NBlobDepot {
if (!rows.IsReady()) {
return false;
}
const ui32 generation = Self->Executor()->Generation();
while (rows.IsValid()) {
TS3Locator item{
.Len = rows.GetValue<Schema::TrashS3::Len>(),
.Generation = rows.GetValue<Schema::TrashS3::Generation>(),
.KeyId = rows.GetValue<Schema::TrashS3::KeyId>(),
};
if (item.Generation == generation) {
return true; // we don't want to read newly added items by this tablet's generation
}
if (item != from) {
Self->S3Manager->AddTrashToCollect(item);
from = item;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/blob_depot/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ namespace NKikimr::NBlobDepot {
static constexpr ui32 MaxDeletesInFlight = 3;
static constexpr size_t MaxObjectsToDeleteAtOnce = 10;

std::deque<TS3Locator> DeleteQueue; // items we are definitely going to delete (must be present in TrashS3)
// items we are definitely going to delete (must be present in TrashS3)
std::deque<TS3Locator> DeleteQueueInPrevGenerations;
std::deque<TS3Locator> DeleteQueueInCurrentGeneration;
THashSet<TActorId> ActiveDeleters;
ui32 NumDeleteTxInFlight = 0;
ui64 TotalS3TrashObjects = 0;
Expand Down
34 changes: 26 additions & 8 deletions ydb/core/blob_depot/s3_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,28 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_INFO, BLOB_DEPOT, BDTS06, "AddTrashToCollect", (Id, Self->GetLogId()), (Locator, locator));
Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_OBJECTS] = ++TotalS3TrashObjects;
Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_SIZE] = TotalS3TrashSize += locator.Len;
DeleteQueue.push_back(locator);
if (const ui32 generation = Self->Executor()->Generation(); locator.Generation < generation) {
DeleteQueueInPrevGenerations.push_back(locator);
} else {
Y_DEBUG_ABORT_UNLESS(locator.Generation == generation);
DeleteQueueInCurrentGeneration.push_back(locator);
}
RunDeletersIfNeeded();
}

void TS3Manager::RunDeletersIfNeeded() {
while (!DeleteQueue.empty() && NumDeleteTxInFlight + ActiveDeleters.size() < MaxDeletesInFlight) {
while (NumDeleteTxInFlight + ActiveDeleters.size() < MaxDeletesInFlight) {
// create list of locators we are going to delete during this operation
THashMap<TString, TS3Locator> locators;

while (!DeleteQueue.empty() && locators.size() < MaxObjectsToDeleteAtOnce) {
const TS3Locator& locator = DeleteQueue.front();
locators.emplace(locator.MakeObjectName(BasePath), locator);
DeleteQueue.pop_front();
for (auto *queue : {&DeleteQueueInPrevGenerations, &DeleteQueueInCurrentGeneration}) {
while (!queue->empty() && locators.size() < MaxObjectsToDeleteAtOnce) {
const TS3Locator& locator = queue->front();
locators.emplace(locator.MakeObjectName(BasePath), locator);
queue->pop_front();
}
}
if (!locators) {
break;
}

const TActorId actorId = Self->Register(new TDeleterActor(Self->SelfId(), locators, Self->GetLogId()));
Expand Down Expand Up @@ -218,7 +228,15 @@ namespace NKikimr::NBlobDepot {
}

if (!msg.LocatorsError.empty()) {
DeleteQueue.insert(DeleteQueue.end(), msg.LocatorsError.begin(), msg.LocatorsError.end());
const ui32 generation = Self->Executor()->Generation();
for (auto& locator : msg.LocatorsError) {
if (locator.Generation < generation) {
DeleteQueueInPrevGenerations.push_back(locator);
} else {
Y_DEBUG_ABORT_UNLESS(locator.Generation == generation);
DeleteQueueInCurrentGeneration.push_back(locator);
}
}
RunDeletersIfNeeded();
}
})
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/blob_depot/s3_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ namespace NKikimr::NBlobDepot {
};

TS3Locator TS3Manager::AllocateS3Locator(ui32 len) {
if (!DeleteQueue.empty()) {
TS3Locator res = DeleteQueue.front();
DeleteQueue.pop_front();
if (!DeleteQueueInCurrentGeneration.empty()) {
TS3Locator res = DeleteQueueInCurrentGeneration.front();
DeleteQueueInCurrentGeneration.pop_front();
Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_OBJECTS] = --TotalS3TrashObjects;
Self->TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_TOTAL_S3_TRASH_SIZE] = TotalS3TrashSize -= res.Len;
res.Len = len;
Expand Down
Loading