Skip to content

Commit 0b70c78

Browse files
authored
Parallel compaction (#30028)
2 parents 2cbf9bb + e4fa812 commit 0b70c78

29 files changed

+208
-124
lines changed

ydb/core/protos/config.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1186,7 +1186,7 @@ message TSharedReadingConfig {
11861186
optional bool Enabled = 1;
11871187
optional uint64 TimeoutBeforeStartSessionSec = 2;
11881188
optional uint64 SendStatusPeriodSec = 3;
1189-
optional uint64 MaxSessionUsedMemory = 4;
1189+
optional uint64 MaxSessionUsedMemory = 4;
11901190
optional bool WithoutConsumer = 5;
11911191
optional TJsonParserConfig JsonParser = 7;
11921192
optional TCompileServiceConfig CompileService = 8;
@@ -2181,6 +2181,8 @@ message TColumnShardConfig {
21812181
optional uint64 BadPortionSizeLimit = 51 [default = 524288];
21822182
optional uint64 BadPortionsLimit = 52;
21832183
optional bool CombineChunksInResult = 54 [default = true];
2184+
optional bool EnableDiagnostics = 55 [default = true];
2185+
optional bool EnableParallelCompaction = 56 [default = false];
21842186
optional NKikimrSchemeOp.TS3Settings S3Client = 58;
21852187
}
21862188

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ message TCompactionLevelConstructorContainer {
538538
optional uint64 PortionsCountAvailable = 3;
539539
optional uint64 PortionsCountLimit = 4;
540540
optional uint64 PortionsSizeLimit = 5;
541+
optional uint64 Concurrency = 6;
541542
}
542543

543544
message TOneLayer {

ydb/core/tx/columnshard/background_controller.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
namespace NKikimr::NColumnShard {
55

66
bool TBackgroundController::StartCompaction(const TInternalPathId pathId, const TString& taskId) {
7-
auto [it, _] = ActiveCompactionInfo.emplace(pathId, NOlap::TPlanCompactionInfo{ pathId, taskId });
7+
auto [it, _] = ActiveCompactionInfo.emplace(std::make_pair(pathId, taskId), NOlap::TPlanCompactionInfo{ pathId, taskId });
88
it->second.Start();
99
return true;
1010
}
1111

12-
void TBackgroundController::FinishCompaction(const TInternalPathId pathId) {
13-
auto it = ActiveCompactionInfo.find(pathId);
12+
void TBackgroundController::FinishCompaction(const TInternalPathId pathId, const TString& taskId) {
13+
auto it = ActiveCompactionInfo.find(std::make_pair(pathId, taskId));
1414
AFL_VERIFY(it != ActiveCompactionInfo.end());
1515
if (it->second.Finish()) {
1616
ActiveCompactionInfo.erase(it);
@@ -21,7 +21,7 @@ void TBackgroundController::FinishCompaction(const TInternalPathId pathId) {
2121
void TBackgroundController::CheckDeadlines() {
2222
for (auto&& i : ActiveCompactionInfo) {
2323
if (TMonotonic::Now() - i.second.GetStartTime() > NOlap::TCompactionLimits::CompactionTimeout) {
24-
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_compaction")("path_id", i.first)("task_id", i.second.GetTaskId());
24+
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_compaction")("path_id", i.first.first)("task_id", i.second.GetTaskId());
2525
// uncomment it for debug purpose
2626
// AFL_VERIFY_DEBUG(false);
2727
}

ydb/core/tx/columnshard/background_controller.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace NKikimr::NColumnShard {
1212

1313
class TBackgroundController {
1414
private:
15-
using TCurrentCompaction = THashMap<TInternalPathId, NOlap::TPlanCompactionInfo>;
15+
using TCurrentCompaction = THashMap<std::pair<TInternalPathId, TString>, NOlap::TPlanCompactionInfo>;
1616
TCurrentCompaction ActiveCompactionInfo;
1717
std::optional<ui64> WaitingCompactionPriority;
1818

@@ -60,7 +60,7 @@ class TBackgroundController {
6060
void CheckDeadlines();
6161

6262
bool StartCompaction(const TInternalPathId pathId, const TString& taskId);
63-
void FinishCompaction(const TInternalPathId pathId);
63+
void FinishCompaction(const TInternalPathId pathId, const TString& taskId);
6464

6565
ui32 GetCompactionsCount() const {
6666
return ActiveCompactionInfo.size();

ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
3434
const auto* portion = changes->GetWritePortionInfo(i);
3535
LOG_S_WARN(TxPrefix() << "(" << changes->TypeString() << ":" << portion->DebugString() << ") blob cannot apply changes: " << TxSuffix());
3636
}
37-
NOlap::TChangesFinishContext context("cannot write index blobs: " + ::ToString(Ev->Get()->GetPutStatus()));
37+
NOlap::TChangesFinishContext context("cannot write index blobs: " + ::ToString(Ev->Get()->GetPutStatus()) + ", error: " + Ev->Get()->ErrorMessage);
3838
changes->Abort(*Self, context);
3939
LOG_S_ERROR(TxPrefix() << " (" << changes->TypeString() << ") cannot write index blobs" << TxSuffix());
4040
}

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -623,28 +623,31 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo
623623
Counters.GetCSCounters().OnSetupCompaction();
624624
BackgroundController.ResetWaitingPriority();
625625

626-
auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(DataLocksManager);
627-
if (!indexChanges) {
626+
auto indexChangesList = TablesManager.MutablePrimaryIndex().StartCompaction(DataLocksManager);
627+
628+
if (indexChangesList.empty()) {
628629
LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID());
629630
return;
630631
}
631632

632-
auto& compaction = *VerifyDynamicCast<NOlap::NCompaction::TGeneralCompactColumnEngineChanges*>(indexChanges.get());
633-
compaction.SetActivityFlag(GetTabletActivity());
634-
compaction.SetQueueGuard(guard);
635-
compaction.Start(*this);
633+
for (const auto& indexChanges : indexChangesList) {
634+
auto& compaction = *VerifyDynamicCast<NOlap::NCompaction::TGeneralCompactColumnEngineChanges*>(indexChanges.get());
635+
compaction.SetActivityFlag(GetTabletActivity());
636+
compaction.SetQueueGuard(guard);
637+
compaction.Start(*this);
636638

637-
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
638-
static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures =
639-
NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("COMPACTION", NOlap::TGlobalLimits::GeneralCompactionMemoryLimit);
640-
auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures });
641-
NOlap::NDataFetcher::TRequestInput rInput(compaction.GetSwitchedPortions(), actualIndexInfo,
642-
NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier(), processGuard);
643-
auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager);
644-
NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput),
645-
std::make_shared<TCompactionExecutor>(
646-
TabletID(), SelfId(), indexChanges, actualIndexInfo, Counters.GetIndexationCounters(), GetLastCompletedTx(), TabletActivityImpl),
647-
env, NConveyorComposite::ESpecialTaskCategory::Compaction);
639+
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
640+
static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures =
641+
NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("COMPACTION", NOlap::TGlobalLimits::GeneralCompactionMemoryLimit);
642+
auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures });
643+
NOlap::NDataFetcher::TRequestInput rInput(compaction.GetSwitchedPortions(), actualIndexInfo,
644+
NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier(), processGuard);
645+
auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager);
646+
NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput),
647+
std::make_shared<TCompactionExecutor>(
648+
TabletID(), SelfId(), indexChanges, actualIndexInfo, Counters.GetIndexationCounters(), GetLastCompletedTx(), TabletActivityImpl),
649+
env, NConveyorComposite::ESpecialTaskCategory::Compaction);
650+
}
648651
}
649652

650653
class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber {

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,6 @@ IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController,
130130
IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId);
131131

132132
struct TSettings {
133-
static constexpr ui32 MAX_ACTIVE_COMPACTIONS = 1;
134-
135133
static constexpr ui32 MAX_INDEXATIONS_TO_SKIP = 16;
136134
static constexpr TDuration GuaranteeIndexationInterval = TDuration::Seconds(10);
137135
static constexpr TDuration DefaultStatsReportInterval = TDuration::Seconds(10);

ydb/core/tx/columnshard/engines/changes/compaction.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ void TCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnSh
4747
}
4848

4949
void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) {
50-
self.BackgroundController.FinishCompaction(GranuleMeta->GetPathId());
50+
self.BackgroundController.FinishCompaction(GranuleMeta->GetPathId(), GetTaskIdentifier());
5151
Y_ABORT_UNLESS(NeedGranuleStatusProvide);
5252
if (context.FinishedSuccessfully) {
5353
GranuleMeta->OnCompactionFinished();

ydb/core/tx/columnshard/engines/column_engine.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class IColumnEngine {
146146
virtual bool IsOverloadedByMetadata(const ui64 limit) const = 0;
147147
virtual std::vector<std::shared_ptr<TPortionInfo>> Select(
148148
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const = 0;
149-
virtual std::shared_ptr<TColumnEngineChanges> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept = 0;
149+
virtual std::vector<std::shared_ptr<TColumnEngineChanges>> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept = 0;
150150
virtual ui64 GetCompactionPriority(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const std::set<TInternalPathId>& pathIds,
151151
const std::optional<ui64> waitingPriority) const noexcept = 0;
152152
virtual std::shared_ptr<TCleanupPortionsColumnEngineChanges> StartCleanupPortions(const TSnapshot& snapshot,

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,17 +211,17 @@ ui64 TColumnEngineForLogs::GetCompactionPriority(const std::shared_ptr<NDataLock
211211
}
212212
}
213213

214-
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(
214+
std::vector<std::shared_ptr<TColumnEngineChanges>> TColumnEngineForLogs::StartCompaction(
215215
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept {
216216
AFL_VERIFY(dataLocksManager);
217217
auto granule = GranulesStorage->GetGranuleForCompaction(dataLocksManager);
218218
if (!granule) {
219219
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no granules for start compaction");
220-
return nullptr;
220+
return {};
221221
}
222222
granule->OnStartCompaction();
223-
auto changes = granule->GetOptimizationTask(granule, dataLocksManager);
224-
if (!changes) {
223+
auto changes = granule->GetOptimizationTasks(granule, dataLocksManager);
224+
if (changes.empty()) {
225225
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot build optimization task for granule that need compaction")(
226226
"weight", granule->GetCompactionPriority().DebugString());
227227
}

0 commit comments

Comments
 (0)