Skip to content

Commit 6dcfc25

Browse files
author
Taichi Yamakawa
committed
Add snapshot synchronization diagnostic logs
1 parent 618a211 commit 6dcfc25

File tree

2 files changed

+76
-9
lines changed

2 files changed

+76
-9
lines changed

src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,15 @@ private[raft] class RaftActor(
703703
response match {
704704
case response: SnapshotSyncManager.SyncSnapshotSucceeded =>
705705
applyDomainEvent(SnapshotSyncCompleted(response.snapshotLastLogTerm, response.snapshotLastLogIndex)) { _ =>
706+
if (log.isInfoEnabled) {
707+
log.info(
708+
"[{}] Completed snapshot synchronization: srcMemberIndex=[{}], snapshotLastLogTerm=[{}], snapshotLastLogIndex=[{}]",
709+
currentState,
710+
response.srcMemberIndex,
711+
response.snapshotLastLogTerm.term,
712+
response.snapshotLastLogIndex,
713+
)
714+
}
706715
region ! ReplicationRegion.DeliverTo(
707716
response.srcMemberIndex,
708717
InstallSnapshotSucceeded(
@@ -715,6 +724,15 @@ private[raft] class RaftActor(
715724
}
716725

717726
case response: SnapshotSyncManager.SyncSnapshotAlreadySucceeded =>
727+
if (log.isInfoEnabled) {
728+
log.info(
729+
"[{}] Completed snapshot synchronization already: srcMemberIndex=[{}], snapshotLastLogTerm=[{}], snapshotLastLogIndex=[{}]",
730+
currentState,
731+
response.srcMemberIndex,
732+
response.snapshotLastLogTerm.term,
733+
response.snapshotLastLogIndex,
734+
)
735+
}
718736
region ! ReplicationRegion.DeliverTo(
719737
response.srcMemberIndex,
720738
InstallSnapshotSucceeded(
@@ -725,7 +743,11 @@ private[raft] class RaftActor(
725743
),
726744
)
727745

728-
case _: SnapshotSyncManager.SyncSnapshotFailed => // ignore
746+
case _: SnapshotSyncManager.SyncSnapshotFailed =>
747+
// ignore
748+
if (log.isWarningEnabled) {
749+
log.warning("[{}] Failed snapshot synchronization", currentState)
750+
}
729751
}
730752

731753
private val snapshotSyncManagerName: String = ActorIds.actorName(
@@ -738,7 +760,8 @@ private[raft] class RaftActor(
738760
// Snapshot updates during compaction will break consistency
739761
if (log.isInfoEnabled)
740762
log.info(
741-
"Skipping snapshot synchronization because compaction is in progress (remaining: {}/{})",
763+
"[{}] Skipping snapshot synchronization because compaction is in progress (remaining: {}/{})",
764+
currentState,
742765
currentData.snapshottingProgress.inProgressEntities.size,
743766
currentData.snapshottingProgress.inProgressEntities.size + currentData.snapshottingProgress.completedEntities.size,
744767
)
@@ -757,6 +780,17 @@ private[raft] class RaftActor(
757780
snapshotSyncManagerName,
758781
)
759782
}
783+
if (log.isDebugEnabled) {
784+
log.debug(
785+
s"[${currentState}] Starting snapshot synchronization " +
786+
"(srcLatestSnapshotLastLogTerm=[{}], srcLatestSnapshotLastLogIndex=[{}], " +
787+
"dstLatestSnapshotLastLogTerm=[{}], dstLatestSnapshotLastLogIndex=[{}])",
788+
installSnapshot.srcLatestSnapshotLastLogTerm.term,
789+
installSnapshot.srcLatestSnapshotLastLogLogIndex,
790+
currentData.lastSnapshotStatus.snapshotLastTerm.term,
791+
currentData.lastSnapshotStatus.snapshotLastLogIndex,
792+
)
793+
}
760794
snapshotSyncManager ! SnapshotSyncManager.SyncSnapshot(
761795
srcLatestSnapshotLastLogTerm = installSnapshot.srcLatestSnapshotLastLogTerm,
762796
srcLatestSnapshotLastLogIndex = installSnapshot.srcLatestSnapshotLastLogLogIndex,

src/main/scala/lerna/akka/entityreplication/raft/snapshot/sync/SnapshotSyncManager.scala

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package lerna.akka.entityreplication.raft.snapshot.sync
33
import akka.actor.{ ActorLogging, ActorRef, Props, Status }
44
import akka.pattern.extended.ask
55
import akka.pattern.pipe
6-
import akka.persistence.{ PersistentActor, RuntimePluginConfig, SnapshotOffer }
6+
import akka.persistence.{ PersistentActor, RecoveryCompleted, RuntimePluginConfig, SnapshotOffer }
77
import akka.persistence.query.{ EventEnvelope, Offset, PersistenceQuery }
88
import akka.persistence.query.scaladsl.CurrentEventsByTagQuery
99
import akka.stream.{ KillSwitches, UniqueKillSwitch }
@@ -222,10 +222,18 @@ private[entityreplication] class SnapshotSyncManager(
222222

223223
override def receiveRecover: Receive = {
224224

225-
case SnapshotOffer(_, snapshot: SyncProgress) =>
225+
case SnapshotOffer(metadata, snapshot: SyncProgress) =>
226+
if (log.isInfoEnabled) {
227+
log.info("Loaded snapshot: metadata=[{}], snapshot=[{}]", metadata, snapshot)
228+
}
226229
this.state = snapshot
227230

228231
case event: Event => updateState(event)
232+
233+
case RecoveryCompleted =>
234+
if (log.isInfoEnabled) {
235+
log.info("Recovery completed: state=[{}]", this.state)
236+
}
229237
}
230238

231239
private[this] var state = SyncProgress(Offset.noOffset)
@@ -286,10 +294,16 @@ private[entityreplication] class SnapshotSyncManager(
286294
s"(typeName: $typeName, memberIndex: $dstMemberIndex, snapshotLastLogTerm: ${dstLatestSnapshotLastLogTerm.term}, snapshotLastLogIndex: $dstLatestSnapshotLastLogIndex)",
287295
)
288296

289-
case _: akka.persistence.SaveSnapshotSuccess =>
297+
case akka.persistence.SaveSnapshotSuccess(metadata) =>
298+
if (log.isInfoEnabled) {
299+
log.info("Succeeded to save snapshot synchronization progress: metadata=[{}]", metadata)
300+
}
290301
context.stop(self)
291302

292-
case _: akka.persistence.SaveSnapshotFailure =>
303+
case akka.persistence.SaveSnapshotFailure(metadata, cause) =>
304+
if (log.isWarningEnabled) {
305+
log.warning("Failed to save snapshot synchronization progress: metadata=[{}], cause=[{}]", metadata, cause)
306+
}
293307
context.stop(self)
294308
}
295309

@@ -300,7 +314,10 @@ private[entityreplication] class SnapshotSyncManager(
300314
dstLatestSnapshotLastLogIndex: LogEntryIndex,
301315
): Receive = {
302316

303-
case _: SyncSnapshot => // ignore
317+
case syncSnapshot: SyncSnapshot =>
318+
if (log.isDebugEnabled) {
319+
log.debug("Dropping [{}] since the snapshot synchronization is running.", syncSnapshot)
320+
}
304321

305322
case syncStatus: SyncStatus =>
306323
this.killSwitch = None
@@ -318,6 +335,13 @@ private[entityreplication] class SnapshotSyncManager(
318335
updateState(event)
319336
if (event.snapshotLastLogIndex < srcLatestSnapshotLastLogIndex) {
320337
// complete partially
338+
if (log.isDebugEnabled) {
339+
log.debug(
340+
"Snapshot synchronization partially completed and continues: {} -> {}",
341+
s"(typeName: $typeName, memberIndex: $srcMemberIndex, snapshotLastLogIndex: ${event.snapshotLastLogIndex}/${srcLatestSnapshotLastLogIndex})",
342+
s"(typeName: $typeName, memberIndex: $dstMemberIndex, snapshotLastLogTerm: ${dstLatestSnapshotLastLogTerm.term}, snapshotLastLogIndex: $dstLatestSnapshotLastLogIndex)",
343+
)
344+
}
321345
startSnapshotSynchronizationBatch(
322346
srcLatestSnapshotLastLogIndex,
323347
dstLatestSnapshotLastLogTerm,
@@ -374,8 +398,17 @@ private[entityreplication] class SnapshotSyncManager(
374398
)
375399
context.stop(self)
376400

377-
case _: akka.persistence.SaveSnapshotSuccess => // ignore: previous execution result
378-
case _: akka.persistence.SaveSnapshotFailure => // ignore: previous execution result
401+
case saveSnapshotSuccess: akka.persistence.SaveSnapshotSuccess =>
402+
// ignore: previous execution result
403+
if (log.isDebugEnabled) {
404+
log.debug("Dropping [{}] of the previous synchronization.", saveSnapshotSuccess)
405+
}
406+
407+
case saveSnapshotFailure: akka.persistence.SaveSnapshotFailure =>
408+
// ignore: previous execution result
409+
if (log.isDebugEnabled) {
410+
log.debug("Dropping [{}] of the previous synchronization.", saveSnapshotFailure)
411+
}
379412
}
380413

381414
def updateState(event: Event): Unit =

0 commit comments

Comments
 (0)