Skip to content

Commit c6b28b5

Browse files
committed
🔀Merge branch 'master' into fix-lost-commited-events-by-leader-transfer
2 parents 308a602 + 71c1cef commit c6b28b5

File tree

20 files changed

+1398
-120
lines changed

20 files changed

+1398
-120
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3131
- The Raft leader uses the same previous `LogEntryIndex` and `Term` to all batched `AppendEntries` messages [#123](https://github.com/lerna-stack/akka-entity-replication/issues/123)
3232
- Raft Actors doesn't accept a `RequestVote(lastLogIndex < log.lastLogIndex, lastLogTerm > log.lastLogTerm)` message [#125](https://github.com/lerna-stack/akka-entity-replication/issues/125)
3333
- A new event is created even though all past events have not been applied [#130](https://github.com/lerna-stack/akka-entity-replication/issues/130)
34+
- `InstallSnapshot` can miss snapshots to copy [PR#128](https://github.com/lerna-stack/akka-entity-replication/pull/128)
35+
36+
⚠️ This change adds a new persistence event. This might don't allow downgrading after upgrading.
3437
- Moving a leader during snapshot synchronization can delete committed log entries [#133](https://github.com/lerna-stack/akka-entity-replication/issues/133)
3538

3639
⚠️ This change adds a new persistence event. This might don't allow downgrading after upgrading.

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ lazy val lerna = (project in file("."))
5555
// TODO 2.6.x 系に対応できる方法に変更する。
5656
"com.github.dnvriend" %% "akka-persistence-inmemory" % "2.5.15.2" % Test,
5757
"com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion % Test,
58+
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
5859
),
5960
inConfig(MultiJvm)(
6061
// multi-jvm ディレクトリをフォーマットするために必要
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# It is safe to exclude the following since CompactionCompletedTag (that is renamed to EntitySnapshotsUpdatedTag) is package-private.
2+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.persistence.CompactionCompletedTag")
3+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.persistence.CompactionCompletedTag$")
4+
# It is safe to exclude the following since SnapshotSyncManager is package-private.
5+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$CompactionEnvelope")
6+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$CompactionEnvelope$")
7+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$SyncCompleteAll")
8+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$SyncCompleteAll$")

src/main/protobuf/cluster_replication.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,14 @@ message TimeBasedUUID {
206206
required int64 least_sig_bits = 2;
207207
}
208208

209+
message SnapshotCopied {
210+
required Offset offset = 1;
211+
required MemberIndex member_index = 2;
212+
required NormalizedShardId shard_id = 3;
213+
required Term snapshot_last_log_term = 4;
214+
required LogEntryIndex snapshot_last_log_index = 5;
215+
repeated NormalizedEntityId entity_ids = 6;
216+
}
209217

210218
// ===
211219
// model

src/main/resources/reference.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ lerna.akka.entityreplication {
5757

5858
// Time to abort operations related to persistence
5959
persistence-operation-timeout = 10s
60+
61+
// Maximum size of a snapshot batch copied from leader's snapshot store to local snapshot store
62+
// Note:
63+
// If the event that updated the snapshots contains more than this batch size of entityId,
64+
// only the snapshots the single event indicates will be copied over this limit.
65+
// Copying snapshot should be executed atomically per event.
66+
max-snapshot-batch-size = 1000
6067
}
6168

6269
sharding = ${akka.cluster.sharding} {
@@ -92,6 +99,7 @@ lerna.akka.entityreplication {
9299
}
93100
event-adapter-bindings {
94101
"lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted" = akka-entity-replication-raft-event-adapter
102+
"lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$SnapshotCopied" = akka-entity-replication-raft-event-adapter
95103
}
96104
}
97105

src/main/scala/lerna/akka/entityreplication/protobuf/ClusterReplicationSerializer.scala

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,12 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
4848
// raft.snapshot
4949
val EntitySnapshotManifest = "DA"
5050
// raft.snapshot.sync
51-
val SyncCompletedManifest = "EA"
52-
val SyncProgressManifest = "EB"
53-
val NoOffsetManifest = "EC"
54-
val SequenceManifest = "ED"
55-
val TimeBasedUUIDManifest = "EE"
51+
val SyncCompletedManifest = "EA"
52+
val SyncProgressManifest = "EB"
53+
val NoOffsetManifest = "EC"
54+
val SequenceManifest = "ED"
55+
val TimeBasedUUIDManifest = "EE"
56+
val SnapshotCopiedManifest = "EF"
5657
// raft.model
5758
val NoOpManifest = "FA"
5859
// typed
@@ -90,11 +91,12 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
9091
// raft.snapshot
9192
EntitySnapshotManifest -> entitySnapshotFromBinary,
9293
// raft.snapshot.sync
93-
SyncCompletedManifest -> syncCompletedFromBinary,
94-
SyncProgressManifest -> syncProgressFromBinary,
95-
NoOffsetManifest -> noOffsetEnvelopeFromBinary,
96-
SequenceManifest -> sequenceEnvelopeFromBinary,
97-
TimeBasedUUIDManifest -> timeBasedUUIDEnvelopeFromBinary,
94+
SyncCompletedManifest -> syncCompletedFromBinary,
95+
SyncProgressManifest -> syncProgressFromBinary,
96+
NoOffsetManifest -> noOffsetEnvelopeFromBinary,
97+
SequenceManifest -> sequenceEnvelopeFromBinary,
98+
TimeBasedUUIDManifest -> timeBasedUUIDEnvelopeFromBinary,
99+
SnapshotCopiedManifest -> snapshotCopiedFromBinary,
98100
// raft.model
99101
NoOpManifest -> noOpFromBinary,
100102
// typed
@@ -160,11 +162,12 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
160162
// raft.snapsnot
161163
case _: raft.snapshot.SnapshotProtocol.EntitySnapshot => EntitySnapshotManifest
162164
// raft.snapshot.sync
163-
case _: raft.snapshot.sync.SnapshotSyncManager.SyncCompleted => SyncCompletedManifest
164-
case _: raft.snapshot.sync.SnapshotSyncManager.SyncProgress => SyncProgressManifest
165-
case _: NoOffsetEnvelope.type => NoOffsetManifest
166-
case _: SequenceEnvelope => SequenceManifest
167-
case _: TimeBasedUUIDEnvelope => TimeBasedUUIDManifest
165+
case _: raft.snapshot.sync.SnapshotSyncManager.SyncCompleted => SyncCompletedManifest
166+
case _: raft.snapshot.sync.SnapshotSyncManager.SyncProgress => SyncProgressManifest
167+
case _: NoOffsetEnvelope.type => NoOffsetManifest
168+
case _: SequenceEnvelope => SequenceManifest
169+
case _: TimeBasedUUIDEnvelope => TimeBasedUUIDManifest
170+
case _: raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied => SnapshotCopiedManifest
168171
// raft.model
169172
case _: raft.model.NoOp.type => NoOpManifest
170173
// typed
@@ -202,11 +205,12 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
202205
// raft.snapshot
203206
case m: raft.snapshot.SnapshotProtocol.EntitySnapshot => entitySnapShotToBinary(m)
204207
// raft.snapshot.sync
205-
case m: raft.snapshot.sync.SnapshotSyncManager.SyncCompleted => syncCompletedToBinary(m)
206-
case m: raft.snapshot.sync.SnapshotSyncManager.SyncProgress => syncProgressToBinary(m)
207-
case m: NoOffsetEnvelope.type => noOffsetEnvelopeToBinary(m)
208-
case m: SequenceEnvelope => sequenceEnvelopeToBinary(m)
209-
case m: TimeBasedUUIDEnvelope => timeBasedUUIDEnvelopeToBinary(m)
208+
case m: raft.snapshot.sync.SnapshotSyncManager.SyncCompleted => syncCompletedToBinary(m)
209+
case m: raft.snapshot.sync.SnapshotSyncManager.SyncProgress => syncProgressToBinary(m)
210+
case m: NoOffsetEnvelope.type => noOffsetEnvelopeToBinary(m)
211+
case m: SequenceEnvelope => sequenceEnvelopeToBinary(m)
212+
case m: TimeBasedUUIDEnvelope => timeBasedUUIDEnvelopeToBinary(m)
213+
case m: raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied => snapshotCopiedToBinary(m)
210214
// raft.model
211215
case m: raft.model.NoOp.type => noOpToBinary(m)
212216
// typed
@@ -811,6 +815,30 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
811815
)
812816
}
813817

818+
private def snapshotCopiedToBinary(message: raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied): Array[Byte] = {
819+
msg.SnapshotCopied
820+
.of(
821+
offset = offsetToProto(message.offset),
822+
memberIndex = memberIndexToProto(message.memberIndex),
823+
shardId = normalizedShardIdToProto(message.shardId),
824+
snapshotLastLogTerm = termToProto(message.snapshotLastLogTerm),
825+
snapshotLastLogIndex = logEntryIndexToProto(message.snapshotLastLogIndex),
826+
entityIds = message.entityIds.map(normalizedEntityIdToProto).toSeq,
827+
).toByteArray
828+
}
829+
830+
private def snapshotCopiedFromBinary(bytes: Array[Byte]): raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied = {
831+
val proto = msg.SnapshotCopied.parseFrom(bytes)
832+
raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied(
833+
offset = offsetFromProto(proto.offset),
834+
memberIndex = memberIndexFromProto(proto.memberIndex),
835+
shardId = normalizedShardIdFromProto(proto.shardId),
836+
snapshotLastLogTerm = termFromProto(proto.snapshotLastLogTerm),
837+
snapshotLastLogIndex = logEntryIndexFromProto(proto.snapshotLastLogIndex),
838+
entityIds = proto.entityIds.map(normalizedEntityIdFromProto).toSet,
839+
)
840+
}
841+
814842
// ===
815843
// model
816844
// ===

src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@ private[raft] class RaftActor(
161161
override val persistenceId: String =
162162
ActorIds.persistenceId("raft", typeName.underlying, shardId.underlying, selfMemberIndex.role)
163163

164+
/**
165+
* NOTE:
166+
* [[RaftActor]] has to use the same journal plugin as [[SnapshotSyncManager]]
167+
* because snapshot synchronization is achieved by reading both the events
168+
* [[CompactionCompleted]] which [[RaftActor]] persisted and SnapshotCopied which [[SnapshotSyncManager]] persisted.
169+
*/
164170
override def journalPluginId: String = settings.journalPluginId
165171

166172
override def journalPluginConfig: Config = settings.journalPluginAdditionalConfig

src/main/scala/lerna/akka/entityreplication/raft/RaftSettings.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ trait RaftSettings {
4646

4747
def snapshotSyncPersistenceOperationTimeout: FiniteDuration
4848

49+
def snapshotSyncMaxSnapshotBatchSize: Int
50+
4951
def clusterShardingConfig: Config
5052

5153
def raftActorAutoStartFrequency: FiniteDuration

src/main/scala/lerna/akka/entityreplication/raft/RaftSettingsImpl.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ private[entityreplication] final case class RaftSettingsImpl(
2525
compactionLogSizeCheckInterval: FiniteDuration,
2626
snapshotSyncCopyingParallelism: Int,
2727
snapshotSyncPersistenceOperationTimeout: FiniteDuration,
28+
snapshotSyncMaxSnapshotBatchSize: Int,
2829
clusterShardingConfig: Config,
2930
raftActorAutoStartFrequency: FiniteDuration,
3031
raftActorAutoStartNumberOfActors: Int,
@@ -140,6 +141,13 @@ private[entityreplication] object RaftSettingsImpl {
140141
val snapshotSyncPersistenceOperationTimeout: FiniteDuration =
141142
config.getDuration("snapshot-sync.persistence-operation-timeout").toScala
142143

144+
val snapshotSyncMaxSnapshotBatchSize: Int =
145+
config.getInt("snapshot-sync.max-snapshot-batch-size")
146+
require(
147+
snapshotSyncMaxSnapshotBatchSize > 0,
148+
s"snapshot-sync.max-snapshot-batch-size (${snapshotSyncMaxSnapshotBatchSize}) should be larger than 0",
149+
)
150+
143151
val clusterShardingConfig: Config = config.getConfig("sharding")
144152

145153
val raftActorAutoStartFrequency: FiniteDuration =
@@ -196,6 +204,7 @@ private[entityreplication] object RaftSettingsImpl {
196204
compactionLogSizeCheckInterval = compactionLogSizeCheckInterval,
197205
snapshotSyncCopyingParallelism = snapshotSyncCopyingParallelism,
198206
snapshotSyncPersistenceOperationTimeout = snapshotSyncPersistenceOperationTimeout,
207+
snapshotSyncMaxSnapshotBatchSize = snapshotSyncMaxSnapshotBatchSize,
199208
clusterShardingConfig = clusterShardingConfig,
200209
raftActorAutoStartFrequency = raftActorAutoStartFrequency,
201210
raftActorAutoStartNumberOfActors = raftActorAutoStartNumberOfActors,

src/main/scala/lerna/akka/entityreplication/raft/persistence/CompactionCompletedTag.scala renamed to src/main/scala/lerna/akka/entityreplication/raft/persistence/EntitySnapshotsUpdatedTag.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package lerna.akka.entityreplication.raft.persistence
33
import lerna.akka.entityreplication.model.NormalizedShardId
44
import lerna.akka.entityreplication.raft.routing.MemberIndex
55

6-
private[entityreplication] final case class CompactionCompletedTag(
6+
private[entityreplication] final case class EntitySnapshotsUpdatedTag(
77
memberIndex: MemberIndex,
88
shardId: NormalizedShardId,
99
) {
1010
private[this] val delimiter = ":"
1111

12+
// Do not change this tag format for compatibility
1213
override def toString: String = s"CompactionCompleted${delimiter}${shardId.underlying}${delimiter}${memberIndex.role}"
1314
}

0 commit comments

Comments
 (0)