Skip to content

Commit 80ee0ac

Browse files
authored
Merge branch 'master' into test-receiving-request-vote-response
2 parents 0fb2b52 + ad18426 commit 80ee0ac

File tree

15 files changed

+188
-17
lines changed

15 files changed

+188
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3030
- Starting a follower member later than leader completes a compaction may break ReplicatedLog of the follower [#105](https://github.com/lerna-stack/akka-entity-replication/issues/105)
3131
- The Raft leader uses the same previous `LogEntryIndex` and `Term` to all batched `AppendEntries` messages [#123](https://github.com/lerna-stack/akka-entity-replication/issues/123)
3232
- Raft Actors doesn't accept a `RequestVote(lastLogIndex < log.lastLogIndex, lastLogTerm > log.lastLogTerm)` message [#125](https://github.com/lerna-stack/akka-entity-replication/issues/125)
33+
- A new event is created even though all past events have not been applied [#130](https://github.com/lerna-stack/akka-entity-replication/issues/130)
3334

3435
## [v2.0.0] - 2021-07-16
3536
[v2.0.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v1.0.0...v2.0.0

src/main/scala/lerna/akka/entityreplication/ReplicationActor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ trait ReplicationActor[StateData] extends Actor with Stash with akka.lerna.Stash
144144
innerApplyEvent(logEntry.event.event, logEntry.index)
145145
changeState(ready)
146146
internalStash.unstashAll()
147+
case ReplicationFailed =>
148+
changeState(ready)
149+
internalStash.unstashAll()
147150
case ReplicationSucceeded(_, logEntryIndex, responseInstanceId) if responseInstanceId.contains(instanceId) =>
148151
changeState(ready)
149152
internalStash.unstashAll()

src/main/scala/lerna/akka/entityreplication/raft/Leader.scala

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -186,17 +186,29 @@ private[raft] trait Leader { this: RaftActor =>
186186
}
187187

188188
private[this] def replicate(replicate: Replicate): Unit = {
189-
cancelHeartbeatTimeoutTimer()
190-
applyDomainEvent(AppendedEvent(EntityEvent(replicate.entityId, replicate.event))) { _ =>
191-
applyDomainEvent(
192-
StartedReplication(
193-
ClientContext(replicate.replyTo, replicate.instanceId, replicate.originSender),
194-
currentData.replicatedLog.lastLogIndex,
195-
),
196-
) { _ =>
197-
publishAppendEntries()
198-
}
189+
replicate.entityId match {
190+
case Some(normalizedEntityId) // from entity(ReplicationActor)
191+
if currentData.hasUncommittedLogEntryOf(normalizedEntityId) =>
192+
if (log.isWarningEnabled)
193+
log.warning(
194+
s"Failed to replicate the event (${replicate.event.getClass.getName}) since an uncommitted event exists for the entity (entityId: ${normalizedEntityId.raw}). Replicating new events is allowed after the event is committed",
195+
)
196+
replicate.replyTo ! ReplicationFailed
197+
198+
case _ =>
199+
cancelHeartbeatTimeoutTimer()
200+
applyDomainEvent(AppendedEvent(EntityEvent(replicate.entityId, replicate.event))) { _ =>
201+
applyDomainEvent(
202+
StartedReplication(
203+
ClientContext(replicate.replyTo, replicate.instanceId, replicate.originSender),
204+
currentData.replicatedLog.lastLogIndex,
205+
),
206+
) { _ =>
207+
publishAppendEntries()
208+
}
209+
}
199210
}
211+
200212
}
201213

202214
private[this] def receiveReplicationResponse(event: ReplicationResponse): Unit =

src/main/scala/lerna/akka/entityreplication/raft/RaftMemberData.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,12 @@ private[entityreplication] trait RaftMemberData
323323
replicatedLog.sliceEntries(from, to).filter(_.event.entityId.contains(entityId))
324324
}
325325

326+
def hasUncommittedLogEntryOf(entityId: NormalizedEntityId): Boolean = {
327+
replicatedLog
328+
.entriesAfter(index = commitIndex) // uncommitted entries
329+
.exists(_.event.entityId.contains(entityId))
330+
}
331+
326332
def alreadyVotedOthers(candidate: MemberIndex): Boolean = votedFor.exists(candidate != _)
327333

328334
def hasMatchLogEntry(prevLogIndex: LogEntryIndex, prevLogTerm: Term): Boolean = {

src/main/scala/lerna/akka/entityreplication/raft/RaftProtocol.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ private[entityreplication] object RaftProtocol {
5252
final case class Replica(logEntry: LogEntry) extends EntityCommand
5353
final case class TakeSnapshot(metadata: EntitySnapshotMetadata, replyTo: ActorRef) extends EntityCommand
5454
final case object RecoveryTimeout extends EntityCommand
55+
final case object ReplicationFailed extends EntityCommand
5556

5657
sealed trait ReplicationResponse
5758

src/main/scala/lerna/akka/entityreplication/raft/model/ReplicatedLog.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ private[entityreplication] final case class ReplicatedLog private[model] (
3636
entries.slice(toSeqIndex(from), until = toSeqIndex(to.next()))
3737
}
3838

39+
def entriesAfter(index: LogEntryIndex): Iterator[LogEntry] =
40+
entries.iterator.drop(n = toSeqIndex(index) + 1)
41+
3942
def nonEmpty: Boolean = entries.nonEmpty
4043

4144
def append(event: EntityEvent, term: Term): ReplicatedLog = {

src/main/scala/lerna/akka/entityreplication/typed/internal/behavior/Inactive.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ private[entityreplication] class Inactive[Command, Event, State](
3737
case _: RaftProtocol.RecoveryState => Behaviors.unhandled
3838
case _: RaftProtocol.ReplicationSucceeded => Behaviors.unhandled
3939
case RaftProtocol.RecoveryTimeout => Behaviors.unhandled
40+
case RaftProtocol.ReplicationFailed => Behaviors.unhandled
4041
}.receiveSignal(setup.onSignal(setup.emptyState))
4142

4243
def receiveActivate(command: RaftProtocol.Activate): Behavior[EntityCommand] = {

src/main/scala/lerna/akka/entityreplication/typed/internal/behavior/Ready.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ private[entityreplication] class Ready[Command, Event, State](
7676
case _: RaftProtocol.RecoveryState => Behaviors.unhandled
7777
case _: RaftProtocol.ReplicationSucceeded => Behaviors.unhandled
7878
case RaftProtocol.RecoveryTimeout => Behaviors.unhandled
79+
case RaftProtocol.ReplicationFailed => Behaviors.unhandled
7980
}.receiveSignal(setup.onSignal(readyState.entityState))
8081

8182
def receiveProcessCommand(command: RaftProtocol.ProcessCommand, state: BehaviorState): Behavior[EntityCommand] = {

src/main/scala/lerna/akka/entityreplication/typed/internal/behavior/Recovering.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ private[entityreplication] class Recovering[Command, Event, State](
9292
Behaviors.same
9393
case _: RaftProtocol.Activate => Behaviors.unhandled
9494
case _: RaftProtocol.ReplicationSucceeded => Behaviors.unhandled
95+
case RaftProtocol.ReplicationFailed => Behaviors.unhandled
9596
}.receiveSignal(setup.onSignal(setup.emptyState))
9697
}
9798
}

src/main/scala/lerna/akka/entityreplication/typed/internal/behavior/WaitForReplication.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ private[entityreplication] class WaitForReplication[Command, Event, State](
3939
.receiveMessage[EntityCommand] {
4040
case command: RaftProtocol.Replica => receiveReplica(command, state)
4141
case command: RaftProtocol.ReplicationSucceeded => receiveReplicationSucceeded(command, state)
42+
case RaftProtocol.ReplicationFailed => Ready.behavior(setup, transformReadyState(state)) // Discard side effects
4243
case command: RaftProtocol.TakeSnapshot => receiveTakeSnapshot(command, state.entityState)
4344
case command: RaftProtocol.ProcessCommand =>
4445
setup.stashBuffer.stash(command)

0 commit comments

Comments
 (0)