11package lerna .akka .entityreplication .typed
22
33import akka .actor
4+ import akka .actor .testkit .typed .scaladsl .LoggingTestKit
45import akka .{ actor => classic }
56import akka .actor .typed .scaladsl .Behaviors
67import akka .actor .typed .scaladsl .adapter .ClassicActorSystemOps
@@ -36,10 +37,10 @@ object MultiSnapshotSyncSpecConfig extends MultiNodeConfig {
3637 |lerna.akka.entityreplication {
3738 | // RaftActors will recover entities as possible quick.
3839 | recovery-entity-timeout = 1s
39- | // Using this default value, RaftActors have a chance to become the leader.
40- | // By using a larger value, RaftActors don't have such a chance.
41- | election-timeout = 1s
4240 | raft {
41+ | // Using this default value, RaftActors have a chance to become the leader.
42+ | // By using a larger value, RaftActors don't have such a chance.
43+ | election-timeout = 1s
4344 | // EntityReplication runs only one RaftActor group for the sake of simplicity.
4445 | number-of-shards = 1
4546 | raft-actor-auto-start {
@@ -217,6 +218,25 @@ class MultiSnapshotSyncSpec extends MultiNodeSpec(MultiSnapshotSyncSpecConfig) w
217218 }
218219 }
219220
221+ /** Executes the given block of code only on other than the given nodes */
222+ private def runOnOtherThan (nodes : RoleName * )(thunk : => Unit ): Unit = {
223+ val otherNodes = roles.filterNot(nodes.contains(_))
224+ runOn(otherNodes : _* )(thunk)
225+ }
226+
227+ /** Verifies compaction completed by inspecting logging
228+ *
229+ * The compaction should completes in the given block of code.
230+ */
231+ private def expectCompactionCompleted [T ](code : => T ): T = {
232+ LoggingTestKit .info(" compaction completed" ).expect(code)
233+ }
234+
235+ /** Waits for compaction completed by sleeping the current thread.
236+ *
237+ * The compaction will complete automatically and asynchronously.
238+ * Use [[expectCompactionCompleted ]] if this test verifies the compaction completed.
239+ */
220240 private def waitForCompactionCompleted (): Unit = {
221241 Thread .sleep(compactionTimeout.toMillis)
222242 }
@@ -241,19 +261,17 @@ class MultiSnapshotSyncSpec extends MultiNodeSpec(MultiSnapshotSyncSpecConfig) w
241261 enterBarrier(" The cluster has nodes: [2,3]." )
242262 }
243263
244- " The leader replicates some log entries" in {
245- runOn(node2) { // Can choose any of nodes: [2,3,_]
246- setValue(" 0" , 0 )(initializationTimeout) shouldBe 0
247- (1 to 10 ).foreach { n =>
248- setValue(n.toString, n) shouldBe n
264+ " The leader replicates some log entries and then it compacts the entries" in {
265+ runOn(node2) {
266+ expectCompactionCompleted {
267+ setValue(" 0" , 0 )(initializationTimeout) shouldBe 0
268+ (1 to 10 ).foreach { n =>
269+ setValue(n.toString, n) shouldBe n
270+ }
271+ waitForCompactionCompleted()
249272 }
250273 }
251- enterBarrier(" Replicated log entries (entity ids = 1 ~ 10)." )
252- }
253-
254- " RaftActors on node2 compacts log entries" in {
255- waitForCompactionCompleted()
256- enterBarrier(" The node2 compacted their log entries." )
274+ enterBarrier(" Nodes([2,3]) replicated log entries (entity ids = 1 ~ 10) and node2 compacts its log entries." )
257275 }
258276
259277 " The cluster has nodes: [4,_,5]" in {
@@ -265,19 +283,34 @@ class MultiSnapshotSyncSpec extends MultiNodeSpec(MultiSnapshotSyncSpecConfig) w
265283 enterBarrier(" The cluster has nodes: [4,_,5]." )
266284 }
267285
268- " The leader (which is on node4) replicates some log entries" in {
269- runOn(node4) { // Can choose any of nodes [4,_,5]
270- setValue(" 0" , 0 )(initializationTimeout) shouldBe 0
271- (11 to 20 ).foreach { n =>
272- setValue(n.toString, n) shouldBe n
286+ " The leader (which is on node4) replicates some log entries. Nodes ([4,5]) compacts their log entries" in {
287+ object BarrierNames {
288+ val ExpectingCompactionCompleted = " Expecting compaction completed"
289+ val ReplicatedLogEntries = " Replicated log entries"
290+ }
291+ runOn(node4) {
292+ expectCompactionCompleted {
293+ enterBarrier(BarrierNames .ExpectingCompactionCompleted )
294+ setValue(" 0" , 0 )(initializationTimeout) shouldBe 0
295+ (11 to 20 ).foreach { n =>
296+ setValue(n.toString, n) shouldBe n
297+ }
298+ enterBarrier(BarrierNames .ReplicatedLogEntries )
299+ waitForCompactionCompleted()
273300 }
274301 }
275- enterBarrier(" Replicated log entries (entity ids = 11 ~ 20)." )
276- }
277-
278- " RaftActors on node: [4,_,5] compacts their log entries" in {
279- waitForCompactionCompleted()
280- enterBarrier(" The nodes [4,_,5] compacted their log entries." )
302+ runOn(node5) {
303+ expectCompactionCompleted {
304+ enterBarrier(BarrierNames .ExpectingCompactionCompleted )
305+ enterBarrier(BarrierNames .ReplicatedLogEntries )
306+ waitForCompactionCompleted()
307+ }
308+ }
309+ runOnOtherThan(node4, node5) {
310+ enterBarrier(BarrierNames .ExpectingCompactionCompleted )
311+ enterBarrier(BarrierNames .ReplicatedLogEntries )
312+ }
313+ enterBarrier(" Nodes([4,5]) replicated log entries (entity ids = 11 ~ 20) and compacted their log entries." )
281314 }
282315
283316 " The cluster has nodes: [6,7,8]" in {
@@ -290,20 +323,35 @@ class MultiSnapshotSyncSpec extends MultiNodeSpec(MultiSnapshotSyncSpecConfig) w
290323 }
291324
292325 " The leader (which is on node8) replicates some log entries" in {
293- runOn(node6) { // Can choose any of nodes [6,7,8]
294- setValue(" 0" , 0 )(initializationTimeout) shouldBe 0
295- (21 to 30 ).foreach { n =>
296- setValue(n.toString, n) shouldBe n
326+ object BarrierNames {
327+ val ExpectingCompactionCompleted = " Expecting compaction completed"
328+ val ReplicatedLogEntries = " Replicated log entries"
329+ }
330+ runOn(node6) {
331+ expectCompactionCompleted {
332+ enterBarrier(BarrierNames .ExpectingCompactionCompleted )
333+ setValue(" 0" , 0 )(initializationTimeout) shouldBe 0
334+ (21 to 30 ).foreach { n =>
335+ setValue(n.toString, n) shouldBe n
336+ }
337+ enterBarrier(BarrierNames .ReplicatedLogEntries )
338+ waitForCompactionCompleted()
339+ }
340+ }
341+ runOn(node7, node8) {
342+ expectCompactionCompleted {
343+ enterBarrier(BarrierNames .ExpectingCompactionCompleted )
344+ enterBarrier(BarrierNames .ReplicatedLogEntries )
345+ waitForCompactionCompleted()
297346 }
298347 }
348+ runOnOtherThan(node6, node7, node8) {
349+ enterBarrier(BarrierNames .ExpectingCompactionCompleted )
350+ enterBarrier(BarrierNames .ReplicatedLogEntries )
351+ }
299352 enterBarrier(" Replicated log entries (entity ids = 21 ~ 30)." )
300353 }
301354
302- " RaftActors on node: [6,7,8] compacts their log entries" in {
303- waitForCompactionCompleted()
304- enterBarrier(" The nodes [6,7,8] compacted their log entries." )
305- }
306-
307355 " The new cluster has nodes: [9,10,11]" in {
308356 shutdownNode(node8, node7, node6)
309357 newCluster(node9, node10, node11)
0 commit comments