Skip to content

Commit b0b0229

Browse files
author
Taichi Yamakawa
committed
Test SnapshotStore when it is saving an EntitySnapshot
1 parent c1913e4 commit b0b0229

File tree

1 file changed

+97
-2
lines changed

1 file changed

+97
-2
lines changed

src/test/scala/lerna/akka/entityreplication/raft/snapshot/ShardSnapshotStoreFailureSpec.scala

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,39 @@
11
package lerna.akka.entityreplication.raft.snapshot
22

3+
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
4+
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
5+
36
import java.util.concurrent.atomic.AtomicInteger
47
import akka.actor.{ ActorRef, ActorSystem }
58
import akka.persistence.testkit.scaladsl.SnapshotTestKit
9+
import akka.persistence.testkit.{
10+
ProcessingResult,
11+
ProcessingSuccess,
12+
SnapshotOperation,
13+
SnapshotStorage,
14+
WriteSnapshot,
15+
}
616
import akka.testkit.TestKit
717
import lerna.akka.entityreplication.model.{ NormalizedEntityId, TypeName }
818
import lerna.akka.entityreplication.raft.model.LogEntryIndex
919
import lerna.akka.entityreplication.raft.routing.MemberIndex
10-
import lerna.akka.entityreplication.raft.snapshot.ShardSnapshotStoreFailureSpec._
1120
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol._
1221
import lerna.akka.entityreplication.raft.{ ActorSpec, RaftSettings }
1322
import lerna.akka.entityreplication.testkit.KryoSerializable
1423

24+
import scala.concurrent.Promise
25+
import scala.util.Using
26+
1527
object ShardSnapshotStoreFailureSpec {
1628
final case object DummyState extends KryoSerializable
17-
1829
}
1930

2031
class ShardSnapshotStoreFailureSpec
2132
extends TestKit(
2233
ActorSystem("ShardSnapshotStoreFailureSpec", ShardSnapshotStoreSpecBase.configWithPersistenceTestKits),
2334
)
2435
with ActorSpec {
36+
import ShardSnapshotStoreFailureSpec._
2537

2638
private val snapshotTestKit = SnapshotTestKit(system)
2739

@@ -74,4 +86,87 @@ class ShardSnapshotStoreFailureSpec
7486
}
7587
}
7688

89+
"ShardSnapshotStore (with time-consuming writes)" should {
90+
91+
// Emulates a time-consuming write
92+
class TimeConsumingWriteSnapshotPolicy extends SnapshotStorage.SnapshotPolicies.PolicyType with AutoCloseable {
93+
val processingResultPromise = Promise[ProcessingResult]()
94+
override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult = {
95+
processingUnit match {
96+
case _: WriteSnapshot => processingResultPromise.future.await
97+
case _ => ProcessingSuccess
98+
}
99+
}
100+
override def close(): Unit = {
101+
processingResultPromise.trySuccess(ProcessingSuccess)
102+
}
103+
}
104+
105+
"reply with `SnapshotNotFound` to `FetchSnapshot` if it has no EntitySnapshot and is saving an EntitySnapshot" ignore {
106+
// TODO Change SnapshotStore.savingSnapshot such that this test passes.
107+
val entityId = generateUniqueEntityId()
108+
val shardSnapshotStore = createShardSnapshotStore()
109+
val metadata = EntitySnapshotMetadata(entityId, LogEntryIndex(1))
110+
val snapshot = EntitySnapshot(metadata, EntityState(DummyState))
111+
112+
Using(new TimeConsumingWriteSnapshotPolicy()) { timeConsumingWriteSnapshotPolicy =>
113+
// Prepare: SnapshotStore is saving the snapshot
114+
snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
115+
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)
116+
117+
// Test:
118+
shardSnapshotStore ! FetchSnapshot(entityId, replyTo = testActor)
119+
expectMsg(SnapshotNotFound)
120+
}
121+
}
122+
123+
"reply with `SnapshotFound` to `FetchSnapshot` if it has an EntitySnapshot and is saving a new EntitySnapshot" in {
124+
val entityId = generateUniqueEntityId()
125+
val shardSnapshotStore = createShardSnapshotStore()
126+
127+
val firstSnapshotMetadata = EntitySnapshotMetadata(entityId, LogEntryIndex(1))
128+
val firstSnapshot =
129+
EntitySnapshot(firstSnapshotMetadata, EntityState(DummyState))
130+
shardSnapshotStore ! SaveSnapshot(firstSnapshot, replyTo = testActor)
131+
expectMsg(SaveSnapshotSuccess(firstSnapshotMetadata))
132+
133+
Using(new TimeConsumingWriteSnapshotPolicy()) { timeConsumingWriteSnapshotPolicy =>
134+
// Prepare: SnapshotStore is saving the second snapshot
135+
snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
136+
val secondSnapshot =
137+
EntitySnapshot(EntitySnapshotMetadata(entityId, LogEntryIndex(5)), EntityState(DummyState))
138+
shardSnapshotStore ! SaveSnapshot(secondSnapshot, replyTo = testActor)
139+
140+
// Test:
141+
shardSnapshotStore ! FetchSnapshot(entityId, replyTo = testActor)
142+
expectMsg(SnapshotFound(firstSnapshot))
143+
}
144+
}
145+
146+
"reply with nothing to `SaveSnapshot` and log a warning if it is saving an EntitySnapshot" in {
147+
implicit val typedSystem: akka.actor.typed.ActorSystem[Nothing] = system.toTyped
148+
149+
val entityId = generateUniqueEntityId()
150+
val shardSnapshotStore = createShardSnapshotStore()
151+
val metadata = EntitySnapshotMetadata(entityId, LogEntryIndex(1))
152+
val snapshot = EntitySnapshot(metadata, EntityState(DummyState))
153+
154+
Using(new TimeConsumingWriteSnapshotPolicy()) { timeConsumingWriteSnapshotPolicy =>
155+
// Prepare: SnapshotStore is saving the snapshot
156+
snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
157+
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)
158+
159+
// Test:
160+
LoggingTestKit
161+
.warn(
162+
s"Saving snapshot for an entity ($entityId) currently. Consider to increase log-size-threshold or log-size-check-interval.",
163+
).expect {
164+
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)
165+
}
166+
expectNoMessage()
167+
}
168+
}
169+
170+
}
171+
77172
}

0 commit comments

Comments
 (0)