Skip to content

Commit 618a211

Browse files
authored
Merge pull request #172 from lerna-stack/252-impl-shard-id-extractor
Add function extract shard id from entity id
2 parents 457f529 + 61951df commit 618a211

File tree

5 files changed

+69
-8
lines changed

5 files changed

+69
-8
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
[PR#163](https://github.com/lerna-stack/akka-entity-replication/pull/163)
1414
- Add diagnostic logging to CommitLogStoreActor
1515
[PR#164](https://github.com/lerna-stack/akka-entity-replication/pull/164)
16+
- Add function extracting shard id from entity id to lerna.akka.entityreplication.typed.ClusterReplication
17+
[PR#172](https://github.com/lerna-stack/akka-entity-replication/pull/172)
1618

1719
### Fixed
1820
- RaftActor might delete committed entries
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# New function is added in lerna.akka.entityreplication.typed.ClusterReplication.
2+
ProblemFilters.exclude[ReversedMissingMethodProblem]("lerna.akka.entityreplication.typed.ClusterReplication.shardIdOf")

src/main/scala/lerna/akka/entityreplication/typed/ClusterReplication.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,13 @@ trait ClusterReplication extends Extension {
2323
*/
2424
def entityRefFor[M](typeKey: ReplicatedEntityTypeKey[M], entityId: String): ReplicatedEntityRef[M]
2525

26+
/**
27+
* Extract shard id of given entity id.
28+
*
29+
* @param typeKey
30+
* @param entityId
31+
* @tparam M the type parameter of the typeKey
32+
* @return shard id
33+
*/
34+
def shardIdOf[M](typeKey: ReplicatedEntityTypeKey[M], entityId: String): String
2635
}

src/main/scala/lerna/akka/entityreplication/typed/internal/ClusterReplicationImpl.scala

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,27 @@ import java.util.concurrent.ConcurrentHashMap
1515

1616
private[entityreplication] class ClusterReplicationImpl(system: ActorSystem[_]) extends ClusterReplication {
1717

18-
private[this] val regions: concurrent.Map[ReplicatedEntityTypeKey[Nothing], ActorRef[Nothing]] =
19-
new ConcurrentHashMap[ReplicatedEntityTypeKey[_], ActorRef[_]].asScala
18+
private case class ReplicationRegionEntry(settings: ClusterReplicationSettings, regionRef: ActorRef[Nothing])
19+
20+
private[this] val regions: concurrent.Map[ReplicatedEntityTypeKey[Nothing], ReplicationRegionEntry] =
21+
new ConcurrentHashMap[ReplicatedEntityTypeKey[_], ReplicationRegionEntry].asScala
2022

2123
override def init[M, E](entity: ReplicatedEntity[M, E]): ActorRef[E] =
22-
regions.getOrElseUpdate(entity.typeKey, internalInit(entity)).unsafeUpcast[E]
24+
regions
25+
.getOrElseUpdate(
26+
entity.typeKey,
27+
internalInit(entity),
28+
).regionRef.unsafeUpcast[E]
2329

24-
private[this] def internalInit[M, E](entity: ReplicatedEntity[M, E]): ActorRef[E] = {
30+
private[this] def internalInit[M, E](entity: ReplicatedEntity[M, E]): ReplicationRegionEntry = {
2531
val classicSystem = system.toClassic
26-
val settings = entity.settings.getOrElse(untyped.ClusterReplicationSettings.create(classicSystem))
32+
val settings = entity.settings.getOrElse(ClusterReplicationSettings(system))
2733
val extractEntityId: untyped.ReplicationRegion.ExtractEntityId = {
2834
case ReplicationEnvelope(entityId, message) => (entityId, message)
2935
}
3036
val extractShardId: untyped.ReplicationRegion.ExtractShardId = {
3137
case ReplicationEnvelope(entityId, _) =>
32-
Math.abs(entityId.hashCode % settings.raftSettings.numberOfShards).toString
38+
shardIdOf(settings, entityId)
3339
}
3440
val possibleShardIds: Set[untyped.ReplicationRegion.ShardId] = {
3541
(0 until settings.raftSettings.numberOfShards).map(_.toString).toSet
@@ -67,13 +73,31 @@ private[entityreplication] class ClusterReplicationImpl(system: ActorSystem[_])
6773
extractShardId = extractShardId,
6874
possibleShardIds = possibleShardIds,
6975
)
70-
region.toTyped
76+
ReplicationRegionEntry(settings, region.toTyped)
7177
}
7278

7379
override def entityRefFor[M](typeKey: ReplicatedEntityTypeKey[M], entityId: String): ReplicatedEntityRef[M] =
7480
regions.get(typeKey) match {
75-
case Some(region) =>
81+
case Some(ReplicationRegionEntry(_, region)) =>
7682
new ReplicatedEntityRefImpl[M](typeKey, entityId, region.unsafeUpcast[ReplicationEnvelope[M]], system)
7783
case None => throw new IllegalStateException(s"The type [${typeKey}] must be init first")
7884
}
85+
86+
override def shardIdOf[M](
87+
typeKey: ReplicatedEntityTypeKey[M],
88+
entityId: String,
89+
): untyped.ReplicationRegion.ShardId = {
90+
regions.get(typeKey) match {
91+
case Some(ReplicationRegionEntry(settings, _)) =>
92+
shardIdOf(settings, entityId)
93+
case None =>
94+
throw new IllegalStateException(s"The type [${typeKey}] must be init first")
95+
}
96+
}
97+
98+
private def shardIdOf(
99+
settings: ClusterReplicationSettings,
100+
entityId: String,
101+
): String =
102+
Math.abs(entityId.hashCode % settings.raftSettings.numberOfShards).toString
79103
}

src/test/scala/lerna/akka/entityreplication/typed/ClusterReplicationSpec.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,28 @@ class ClusterReplicationSpec extends FlatSpec with Matchers with ScalaFutures wi
5151

5252
clusterReplication.entityRefFor(typeKey, "test") shouldBe a[ReplicatedEntityRef[_]]
5353
}
54+
55+
behavior of "ClusterReplication.shardIdOf"
56+
57+
it should "throw an exception if the typeKey has not initialized" in {
58+
val typeKey = ReplicatedEntityTypeKey[NotUsed]("NotInitialized")
59+
val entityId = "entity-id"
60+
val exception = intercept[IllegalStateException] {
61+
clusterReplication.shardIdOf(typeKey, entityId)
62+
}
63+
exception.getMessage should be(
64+
"The type [ReplicatedEntityTypeKey[akka.NotUsed](NotInitialized)] must be init first",
65+
)
66+
}
67+
68+
it should "extract shardId from given entityId" in {
69+
val typeKey = ReplicatedEntityTypeKey[NotUsed]("ExtractShardId")
70+
val entity = ReplicatedEntity(typeKey)(_ => Behaviors.empty)
71+
clusterReplication.init(entity)
72+
73+
val entityId = "entity-id"
74+
val shardId = clusterReplication.shardIdOf(typeKey, entityId)
75+
val settings = ClusterReplicationSettings(actorTestKit.system)
76+
assert(shardId.toInt >= 0 && shardId.toInt < settings.raftSettings.numberOfShards)
77+
}
5478
}

0 commit comments

Comments
 (0)