Skip to content

Commit ece4040

Browse files
committed
Multiple encryption threads in PDisk
1 parent c642870 commit ece4040

28 files changed

+1025
-176
lines changed

ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,16 @@ struct TEventTypeField {
217217
PROBE(PDiskChunkReadPieceComplete, GROUPS("PDisk", "PDiskRequest"), \
218218
TYPES(TPDiskIdField, ui64, ui64, double), \
219219
NAMES("pdisk", "size", "relativeOffset", "deviceTimeMs")) \
220+
PROBE(PDiskChunkWritePieceComplete, GROUPS("PDisk", "PDiskRequest"), \
221+
TYPES(TPDiskIdField, ui64, ui64, double), \
222+
NAMES("pdisk", "size", "relativeOffset", "deviceTimeMs")) \
220223
PROBE(PDiskChunkWriteAddToScheduler, GROUPS("PDisk", "PDiskRequest"), \
221224
TYPES(TPDiskIdField, ui64, double, ui64, bool, ui64, ui64), \
222225
NAMES("pdisk", "reqId", "creationTimeSec", "owner", "isFast", "priorityClass", "size")) \
223-
PROBE(PDiskChunkWriteLastPieceSendToDevice, GROUPS("PDisk", "PDiskRequest"), \
226+
PROBE(PDiskChunkWritePieceAddToScheduler, GROUPS("PDisk", "PDiskRequest"), \
227+
TYPES(TPDiskIdField, ui32, ui64, ui64), \
228+
NAMES("pdisk", "pieceIdx", "offset", "size")) \
229+
PROBE(PDiskChunkWritePieceSendToDevice, GROUPS("PDisk", "PDiskRequest"), \
224230
TYPES(TPDiskIdField, ui64, ui64, ui64, ui64), \
225231
NAMES("pdisk", "owner", "chunkIdx", "pieceOffset", "pieceSize")) \
226232
PROBE(PDiskLogWriteComplete, GROUPS("PDisk", "PDiskRequest"), \

ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -889,7 +889,7 @@ class TRealBlockDevice : public IBlockDevice {
889889
}
890890

891891
Y_VERIFY_S(PCtx->ActorSystem->AppData<TAppData>(), PCtx->PDiskLogPrefix);
892-
Y_VERIFY_S(PCtx->ActorSystem->AppData<TAppData>()->IoContextFactory, PCtx->PDiskLogPrefix);
892+
Y_VERIFY_S(PCtx->ActorSystem->AppData<TAppData>()->IoContextFactory, PCtx->PDiskLogPrefix);
893893
auto *factory = PCtx->ActorSystem->AppData<TAppData>()->IoContextFactory;
894894
IoContext = factory->CreateAsyncIoContext(Path, PCtx->PDiskId, Flags, SectorMap);
895895
if (Flags & TDeviceMode::UseSpdk) {
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#include <util/system/thread.h>
2+
#include <util/thread/lfqueue.h>
3+
4+
namespace NKikimr {
5+
namespace NPDisk {
6+
7+
class TChunkWritePiece;
8+
9+
class TChunkWritePieceQueue {
10+
public:
11+
TChunkWritePieceQueue() = default;
12+
bool Empty() const {
13+
return SizeCounter.load(std::memory_order_relaxed) == 0;
14+
}
15+
size_t Size() const {
16+
return SizeCounter.load(std::memory_order_relaxed);
17+
}
18+
TChunkWritePiece* Dequeue() {
19+
TChunkWritePiece* item;
20+
QueueImpl.Dequeue(&item);
21+
SizeCounter.fetch_sub(1, std::memory_order_relaxed);
22+
return item;
23+
}
24+
void Enqueue(TChunkWritePiece* item) {
25+
QueueImpl.Enqueue(item);
26+
SizeCounter.fetch_add(1, std::memory_order_relaxed);
27+
}
28+
private:
29+
TLockFreeQueue<TChunkWritePiece*> QueueImpl;
30+
std::atomic<size_t> SizeCounter = 0;
31+
};
32+
33+
} // NPDisk
34+
} // NKikimr

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,55 @@ namespace NPDisk {
1111
// Completion actions
1212
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1313

14+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
15+
// Chunk write completion actions
16+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
17+
18+
19+
TCompletionChunkWritePiece::TCompletionChunkWritePiece(TChunkWritePiece* piece, TCompletionChunkWrite* cumulativeCompletion)
20+
: TCompletionAction()
21+
, Piece(piece)
22+
, CumulativeCompletion(cumulativeCompletion)
23+
, Span(piece->Span.CreateChild(TWilson::PDiskDetailed, "PDisk.ChunkWritePiece.CompletionPart"))
24+
, ActorSystem(Piece->PDisk->PCtx->ActorSystem)
25+
{
26+
}
27+
28+
TCompletionChunkWritePiece::~TCompletionChunkWritePiece() {
29+
if (CumulativeCompletion) {
30+
CumulativeCompletion->RemovePart(ActorSystem);
31+
}
32+
}
33+
34+
void TCompletionChunkWritePiece::Exec(TActorSystem *actorSystem) {
35+
Span.Event("PDisk.CompletionChunkWritePart.Exec");
36+
Y_VERIFY(actorSystem);
37+
Y_VERIFY(CumulativeCompletion);
38+
if (TCompletionAction::Result != EIoResult::Ok) {
39+
Release(actorSystem);
40+
return;
41+
}
42+
43+
double deviceTimeMs = HPMilliSecondsFloat(GetTime - SubmitTime);
44+
//TODO: Fork and join this orbit from ChunkWrite orbit.
45+
LWTRACK(PDiskChunkWritePieceComplete, Orbit, Piece->PDisk->PCtx->PDiskId, Piece->PieceSize, Piece->PieceShift, deviceTimeMs);
46+
47+
CumulativeCompletion->CompletePart(actorSystem);
48+
CumulativeCompletion = nullptr;
49+
50+
Span.Event("PDisk.CompletionChunkWritePart.ExecStop");
51+
delete this;
52+
}
53+
54+
void TCompletionChunkWritePiece::Release(TActorSystem *actorSystem) {
55+
Y_UNUSED(actorSystem);
56+
if (CumulativeCompletion) {
57+
CumulativeCompletion->ErrorReason = ErrorReason;
58+
}
59+
Span.EndError("PDisk.CompletionChunkWritePart.Release");
60+
delete this;
61+
}
62+
1463
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1564
// Log write completion action
1665
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,14 @@ class TCompletionChunkWrite : public TCompletionAction {
7676
std::function<void()> OnDestroy;
7777
TReqId ReqId;
7878
NWilson::TSpan Span;
79-
79+
std::atomic<ui32> PartsStarted;
80+
std::atomic<ui32> PartsRemoved;
81+
std::atomic<ui32> PartsWritten;
8082
public:
83+
bool IsReplied = false;
84+
ui32 Pieces;
8185
TEvChunkWrite::TPartsPtr Parts;
8286
std::optional<TAlignedData> Buffer;
83-
8487
TCompletionChunkWrite(const TActorId &recipient, TEvChunkWriteResult *event,
8588
TPDiskMon *mon, ui32 pdiskId, NHPTimer::STime startTime, size_t sizeBytes,
8689
ui8 priorityClass, std::function<void()> onDestroy, TReqId reqId, NWilson::TSpan&& span)
@@ -164,16 +167,57 @@ class TCompletionChunkWrite : public TCompletionAction {
164167
if (Mon) {
165168
Mon->GetWriteCounter(PriorityClass)->CountResponse();
166169
}
170+
167171
delete this;
168172
}
169173

170174
void Release(TActorSystem *actorSystem) override {
171-
Event->Status = NKikimrProto::CORRUPTED;
172-
Event->ErrorReason = ErrorReason;
173-
actorSystem->Send(Recipient, Event.Release());
174-
Span.EndError(ErrorReason);
175+
if (!IsReplied) {
176+
Event->Status = NKikimrProto::CORRUPTED;
177+
Event->ErrorReason = ErrorReason;
178+
actorSystem->Send(Recipient, Event.Release());
179+
Span.EndError(ErrorReason);
180+
}
175181
delete this;
176182
}
183+
184+
void AddPart() {
185+
PartsStarted++;
186+
}
187+
188+
bool AllPartsStarted() {
189+
return PartsStarted == Pieces;
190+
}
191+
192+
void RemovePart(TActorSystem *actorSystem) {
193+
ui32 old = PartsRemoved.fetch_add(1, std::memory_order::seq_cst);
194+
if (old + 1 == Pieces) {
195+
if (PartsWritten.load(std::memory_order::seq_cst) == Pieces) {
196+
Exec(actorSystem);
197+
} else {
198+
Release(actorSystem);
199+
}
200+
}
201+
}
202+
203+
void CompletePart(TActorSystem *actorSystem) {
204+
PartsWritten++;
205+
RemovePart(actorSystem);
206+
}
207+
};
208+
209+
class TChunkWritePiece;
210+
211+
class TCompletionChunkWritePiece : public TCompletionAction {
212+
TChunkWritePiece* Piece;
213+
TCompletionChunkWrite* CumulativeCompletion;
214+
NWilson::TSpan Span;
215+
TActorSystem* ActorSystem;
216+
public:
217+
TCompletionChunkWritePiece(NKikimr::NPDisk::TChunkWritePiece* piece, TCompletionChunkWrite* cumulativeCompletion);
218+
void Exec(TActorSystem *actorSystem) override;
219+
void Release(TActorSystem *actorSystem) override;
220+
virtual ~TCompletionChunkWritePiece();
177221
};
178222

179223
class TCompletionLogWrite : public TCompletionAction {

ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ struct TPDiskConfig : public TThrRefBase {
165165
NKikimrBlobStorage::TPDiskSpaceColor::E SpaceColorBorder = NKikimrBlobStorage::TPDiskSpaceColor::GREEN;
166166

167167
ui32 CompletionThreadsCount = 1;
168+
ui32 EncryptionThreadCount = 0;
168169
bool UseNoopScheduler = false;
169170

170171
bool PlainDataChunks = false;
@@ -419,6 +420,9 @@ struct TPDiskConfig : public TThrRefBase {
419420
if (cfg->HasCompletionThreadsCount()) {
420421
CompletionThreadsCount = cfg->GetCompletionThreadsCount();
421422
}
423+
if (cfg->HasEncryptionThreadCount()) {
424+
EncryptionThreadCount = cfg->GetEncryptionThreadCount();
425+
}
422426

423427
if (cfg->HasUseNoopScheduler()) {
424428
UseNoopScheduler = cfg->GetUseNoopScheduler();

ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,7 @@ struct TDiskFormat {
654654
str << " MagicFormatChunk: " << MagicFormatChunk << x;
655655
str << " ChunkSize: " << ChunkSize << " bytes (" << (ChunkSize / 1000000ull) << " MB)" << x;
656656
str << " SectorSize: " << SectorSize << x;
657+
str << " SectorPayloadSize: " << SectorPayloadSize() << x;
657658
str << " SysLogSectorCount: " << SysLogSectorCount << x;
658659
str << " SystemChunkCount: " << SystemChunkCount << x;
659660
str << " FormatText: \"" << FormatText << "\"" << x;
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
#include "blobstorage_pdisk_encryption_threads.h"
2+
#include "blobstorage_pdisk_requestimpl.h"
3+
4+
#include <ydb/core/util/hp_timer_helpers.h>
5+
#include <ydb/library/yverify_stream/yverify_stream.h>
6+
7+
namespace NKikimr {
8+
namespace NPDisk {
9+
10+
TEncryptionThread::TEncryptionThread(TString name) : Name(name) {}
11+
12+
size_t TEncryptionThread::GetQueuedActions() {
13+
return Queue.GetWaitingSize();
14+
}
15+
16+
void* TEncryptionThread::ThreadProc() {
17+
SetCurrentThreadName(Name.data());
18+
bool isWorking = true;
19+
20+
while (isWorking) {
21+
TAtomicBase itemCount = Queue.GetWaitingSize();
22+
if (itemCount > 0) {
23+
for (TAtomicBase idx = 0; idx < itemCount; ++idx) {
24+
TChunkWritePiece *piece = Queue.Pop();
25+
if (piece == nullptr) {
26+
isWorking = false;
27+
} else {
28+
piece->Process();
29+
}
30+
}
31+
} else {
32+
Queue.ProducedWaitI();
33+
}
34+
}
35+
return nullptr;
36+
}
37+
38+
void TEncryptionThread::Schedule(TChunkWritePiece *piece) {
39+
Queue.Push(piece);
40+
}
41+
42+
TEncryptionThreads::TEncryptionThreads(size_t threadsCount) {
43+
Y_VERIFY(threadsCount <= MAX_THREAD_COUNT, "too many encryption threads");
44+
Threads.reserve(16);
45+
for (size_t i = 0; i < std::min(threadsCount, size_t(2)); i++) {
46+
Threads.push_back(MakeHolder<TEncryptionThread>(TStringBuilder() << "PdEncrypt" << i));
47+
Threads.back()->Start();
48+
}
49+
AvailableThreads = threadsCount;
50+
}
51+
52+
void TEncryptionThreads::SetThreadCount(size_t threadsCount) {
53+
Y_VERIFY(threadsCount <= MAX_THREAD_COUNT, "too many encryption threads");
54+
for (size_t i = Threads.size(); i < threadsCount; i++) {
55+
Threads.push_back(MakeHolder<TEncryptionThread>(TStringBuilder() << "PdEncrypt" << i));
56+
Threads.back()->Start();
57+
}
58+
AvailableThreads = threadsCount;
59+
}
60+
61+
62+
void TEncryptionThreads::StopWork() {
63+
for (auto& thread : Threads) {
64+
thread->Schedule(nullptr);
65+
}
66+
}
67+
68+
void TEncryptionThreads::Join() {
69+
for (auto& thread : Threads) {
70+
thread->Join();
71+
}
72+
Threads.clear();
73+
}
74+
75+
void TEncryptionThreads::Stop() {
76+
StopWork();
77+
Join();
78+
}
79+
80+
void TEncryptionThreads::Schedule(TChunkWritePiece* piece) noexcept {
81+
if (!piece) {
82+
StopWork();
83+
return;
84+
}
85+
86+
if (!AvailableThreads) {
87+
piece->Process();
88+
return;
89+
}
90+
91+
auto min_it = Threads.begin();
92+
auto minQueueSize = (*min_it)->GetQueuedActions();
93+
for (auto it = Threads.begin() + 1; it != Threads.begin() + AvailableThreads; ++it) {
94+
auto queueSize = (*it)->GetQueuedActions();
95+
if (queueSize < minQueueSize) {
96+
minQueueSize = queueSize;
97+
min_it = it;
98+
}
99+
}
100+
Y_VERIFY(min_it != Threads.end());
101+
(*min_it)->Schedule(piece);
102+
}
103+
104+
} // NPDisk
105+
} // NKikimr
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#include "blobstorage_pdisk_completion.h"
2+
#include "blobstorage_pdisk_util_countedqueuemanyone.h"
3+
4+
#include <util/system/thread.h>
5+
#include <util/thread/lfqueue.h>
6+
7+
#include <queue>
8+
#include <variant>
9+
10+
namespace NKikimr {
11+
namespace NPDisk {
12+
13+
class TChunkWritePiece;
14+
15+
class TEncryptionThread : public ISimpleThread {
16+
public:
17+
TEncryptionThread(TString name);
18+
void *ThreadProc() override;
19+
void Schedule(TChunkWritePiece *piece);
20+
size_t GetQueuedActions();
21+
private:
22+
TCountedQueueManyOne<TChunkWritePiece, 4 << 10> Queue;
23+
TString Name;
24+
};
25+
class TEncryptionThreads {
26+
public:
27+
static const size_t MAX_THREAD_COUNT = 4;
28+
TVector<THolder<TEncryptionThread>> Threads;
29+
size_t AvailableThreads;
30+
31+
TEncryptionThreads(size_t threadsCount);
32+
void SetThreadCount(size_t threadsCount);
33+
34+
// Schedule action execution
35+
// pass action = nullptr to quit
36+
void Schedule(TChunkWritePiece* piece) noexcept;
37+
38+
void StopWork();
39+
void Join();
40+
void Stop();
41+
};
42+
43+
44+
} // NPDisk
45+
} // NKikimr

0 commit comments

Comments
 (0)