Skip to content

Commit c87ff6b

Browse files
authored
Merge pull request #122 from lerna-stack/fix-batched-append-entries
Raft leader should set appropriate previous values (`prevLogIndex` and `prevLogTerm`) to batched `AppendEntries`
2 parents 36743bd + 6b02a25 commit c87ff6b

File tree

4 files changed

+202
-5
lines changed

4 files changed

+202
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2828
- TestKit throws "Shard received unexpected message" exception after the entity passivated [PR#100](https://github.com/lerna-stack/akka-entity-replication/pull/100)
2929
- `ReplicatedEntity` can produce illegal snapshot if compaction and receiving new event occur same time [#111](https://github.com/lerna-stack/akka-entity-replication/issues/111)
3030
- Starting a follower member later than leader completes a compaction may break ReplicatedLog of the follower [#105](https://github.com/lerna-stack/akka-entity-replication/issues/105)
31+
- The Raft leader uses the same previous `LogEntryIndex` and `Term` to all batched `AppendEntries` messages [#123](https://github.com/lerna-stack/akka-entity-replication/issues/123)
3132

3233
## [v2.0.0] - 2021-07-16
3334
[v2.0.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v1.0.0...v2.0.0

src/main/scala/lerna/akka/entityreplication/raft/Leader.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -242,16 +242,20 @@ private[raft] trait Leader { this: RaftActor =>
242242
),
243243
)
244244
case batchEntries =>
245-
batchEntries.map { entries =>
246-
AppendEntries(
245+
batchEntries.foldLeft(Seq.empty[AppendEntries]) { (previousBatches, entriesOfThisBatch) =>
246+
val lastEntryOfPreviousBatches = previousBatches.lastOption.flatMap(_.entries.lastOption)
247+
val prevLogIndexOfThisBatch = lastEntryOfPreviousBatches.fold(prevLogIndex)(_.index)
248+
val prevLogTermOfThisBatch = lastEntryOfPreviousBatches.fold(prevLogTerm)(_.term)
249+
val thisBatch = AppendEntries(
247250
shardId,
248251
currentData.currentTerm,
249252
selfMemberIndex,
250-
prevLogIndex,
251-
prevLogTerm,
252-
entries,
253+
prevLogIndexOfThisBatch,
254+
prevLogTermOfThisBatch,
255+
entriesOfThisBatch,
253256
currentData.commitIndex,
254257
)
258+
previousBatches :+ thisBatch
255259
}
256260
}
257261
case None =>

src/test/scala/lerna/akka/entityreplication/raft/RaftActorLeaderSpec.scala

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import lerna.akka.entityreplication.raft.snapshot.{ ShardSnapshotStore, Snapshot
1919
import lerna.akka.entityreplication.util.EventStore
2020
import org.scalatest.Inside
2121

22+
import scala.concurrent.duration.DurationInt
23+
2224
class RaftActorLeaderSpec extends TestKit(ActorSystem()) with RaftActorSpecBase with Inside {
2325

2426
import RaftActor._
@@ -434,6 +436,179 @@ class RaftActorLeaderSpec extends TestKit(ActorSystem()) with RaftActorSpecBase
434436
} should contain theSameElementsAs (Set(follower1Index, follower2Index))
435437
}
436438

439+
"send at most `max-append-entries-batch-size` AppendEntries messages to followers on HeartbeatTimeout if it has more logs than `max-append-entries-size`" in {
440+
// The leader will send 3 AppendEntries messages to follower1.
441+
// The leader will send 2 AppendEntries messages to follower2.
442+
val leaderIndex = createUniqueMemberIndex()
443+
val follower1Index = createUniqueMemberIndex()
444+
val follower2Index = createUniqueMemberIndex()
445+
val regionProbe = TestProbe()
446+
val shardId = createUniqueShardId()
447+
val leader = {
448+
val testConfig = ConfigFactory.parseString(
449+
"""
450+
|lerna.akka.entityreplication.raft {
451+
| # Heartbeat never happen for simplicity of this test case.
452+
| heartbeat-interval = 1000s
453+
| max-append-entries-size = 2
454+
| max-append-entries-batch-size = 3
455+
|}
456+
|""".stripMargin,
457+
)
458+
val customSettings = RaftSettings(testConfig.withFallback(defaultRaftConfig))
459+
customSettings.heartbeatInterval shouldBe 1000.seconds
460+
customSettings.maxAppendEntriesSize shouldBe 2
461+
customSettings.maxAppendEntriesBatchSize shouldBe 3
462+
createRaftActor(
463+
shardId = shardId,
464+
selfMemberIndex = leaderIndex,
465+
otherMemberIndexes = Set(follower1Index, follower2Index),
466+
region = regionProbe.ref,
467+
settings = customSettings,
468+
)
469+
}
470+
val currentTerm = Term(2)
471+
val logEntries = Seq(
472+
LogEntry(LogEntryIndex(1), EntityEvent(Option(entityId), "a"), Term(1)),
473+
LogEntry(LogEntryIndex(2), EntityEvent(Option(entityId), "b"), Term(1)),
474+
LogEntry(LogEntryIndex(3), EntityEvent(Option(entityId), "c"), Term(1)),
475+
LogEntry(LogEntryIndex(4), EntityEvent(Option(entityId), "d"), Term(1)),
476+
LogEntry(LogEntryIndex(5), EntityEvent(Option(entityId), "e"), Term(2)),
477+
LogEntry(LogEntryIndex(6), EntityEvent(Option(entityId), "f"), Term(2)),
478+
LogEntry(LogEntryIndex(7), EntityEvent(Option(entityId), "g"), Term(2)),
479+
LogEntry(LogEntryIndex(8), EntityEvent(None, NoOp), Term(2)),
480+
)
481+
val logEntryByIndex = logEntries.map(entry => entry.index -> entry).toMap
482+
val leaderData = {
483+
val replicatedLog = ReplicatedLog().merge(logEntries, LogEntryIndex(0))
484+
createLeaderData(currentTerm, replicatedLog, commitIndex = LogEntryIndex(1))
485+
.syncLastLogIndex(follower1Index, LogEntryIndex(1))
486+
.syncLastLogIndex(follower2Index, LogEntryIndex(5))
487+
}
488+
leaderData.nextIndexFor(follower1Index) shouldBe LogEntryIndex(2)
489+
leaderData.nextIndexFor(follower2Index) shouldBe LogEntryIndex(6)
490+
setState(leader, Leader, leaderData)
491+
492+
def assertEquals(value: AppendEntries, expected: AppendEntries, clue: String): Unit = {
493+
withClue(clue) {
494+
value shouldBe expected
495+
// The following verification is needed since `LogEntry.equals` does not take `event` field into account.
496+
value.entries.zip(expected.entries).foreach {
497+
case (entry, expectedEntry) =>
498+
entry.event shouldBe expectedEntry.event
499+
}
500+
}
501+
}
502+
503+
// Sending HeartbeatTimeout triggers that leader send AppendEntries messages to followers.
504+
leader ! HeartbeatTimeout
505+
506+
// Fish AppendEntries messages sent by the leader.
507+
val appendEntriesSentByLeader = regionProbe.fishForMessageN(messages = 3 + 2) {
508+
case ReplicationRegion.DeliverTo(followerIndex, cmd: AppendEntries) =>
509+
val appendEntries = followerIndex match {
510+
case `follower1Index` => cmd
511+
case `follower2Index` => cmd
512+
case otherIndex => fail(s"Got an AppendEntries with unexpected memberIndex($otherIndex)")
513+
}
514+
followerIndex -> appendEntries
515+
}
516+
val appendEntriesFollower1Received = appendEntriesSentByLeader.filter(_._1 == follower1Index).map(_._2)
517+
val appendEntriesFollower2Received = appendEntriesSentByLeader.filter(_._1 == follower2Index).map(_._2)
518+
519+
// Verify all AppendEntries messages.
520+
appendEntriesFollower1Received.size shouldBe 3
521+
assertEquals(
522+
appendEntriesFollower1Received(0),
523+
AppendEntries(
524+
shardId,
525+
currentTerm,
526+
leaderIndex,
527+
prevLogIndex = LogEntryIndex(1),
528+
prevLogTerm = Term(1),
529+
entries = Seq(
530+
logEntryByIndex(LogEntryIndex(2)),
531+
logEntryByIndex(LogEntryIndex(3)),
532+
),
533+
leaderCommit = LogEntryIndex(1),
534+
),
535+
"The 1st AppendEntries follower1 received: ",
536+
)
537+
assertEquals(
538+
appendEntriesFollower1Received(1),
539+
AppendEntries(
540+
shardId,
541+
currentTerm,
542+
leaderIndex,
543+
prevLogIndex = LogEntryIndex(3),
544+
prevLogTerm = Term(1),
545+
entries = Seq(
546+
logEntryByIndex(LogEntryIndex(4)),
547+
logEntryByIndex(LogEntryIndex(5)),
548+
),
549+
leaderCommit = LogEntryIndex(1),
550+
),
551+
"The 2nd AppendEntries follower1 received: ",
552+
)
553+
assertEquals(
554+
appendEntriesFollower1Received(2),
555+
AppendEntries(
556+
shardId,
557+
currentTerm,
558+
leaderIndex,
559+
prevLogIndex = LogEntryIndex(5),
560+
prevLogTerm = Term(2),
561+
entries = Seq(
562+
logEntryByIndex(LogEntryIndex(6)),
563+
logEntryByIndex(LogEntryIndex(7)),
564+
),
565+
leaderCommit = LogEntryIndex(1),
566+
),
567+
"The 3rd AppendEntries follower1 received: ",
568+
)
569+
570+
appendEntriesFollower2Received.size shouldBe 2
571+
assertEquals(
572+
appendEntriesFollower2Received(0),
573+
AppendEntries(
574+
shardId,
575+
currentTerm,
576+
leaderIndex,
577+
prevLogIndex = LogEntryIndex(5),
578+
prevLogTerm = Term(2),
579+
entries = Seq(
580+
logEntryByIndex(LogEntryIndex(6)),
581+
logEntryByIndex(LogEntryIndex(7)),
582+
),
583+
leaderCommit = LogEntryIndex(1),
584+
),
585+
"The 1st AppendEntries follower2 received: ",
586+
)
587+
assertEquals(
588+
appendEntriesFollower2Received(1),
589+
AppendEntries(
590+
shardId,
591+
currentTerm,
592+
leaderIndex,
593+
prevLogIndex = LogEntryIndex(7),
594+
prevLogTerm = Term(2),
595+
entries = Seq(
596+
logEntryByIndex(LogEntryIndex(8)),
597+
),
598+
leaderCommit = LogEntryIndex(1),
599+
),
600+
"The 2nd AppendEntries follower2 received: ",
601+
)
602+
603+
// The leader should send no more AppendEntries on this HeartbeatTimeout.
604+
assertThrows[AssertionError] {
605+
regionProbe.fishForMessage(hint = "The leader sent an unexpected AppendEntries") {
606+
case ReplicationRegion.DeliverTo(_, _: AppendEntries) => true
607+
case _ => false
608+
}
609+
}
610+
}
611+
437612
"send InstallSnapshot to the follower when the leader loses logs that the follower requires by compaction" in {
438613
val leaderIndex = createUniqueMemberIndex()
439614
val follower1Index = createUniqueMemberIndex()

src/test/scala/lerna/akka/entityreplication/raft/model/ReplicatedLogSpec.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,23 @@ class ReplicatedLogSpec extends WordSpecLike with Matchers {
4242
log.getFrom(target, maxEntryCount = 10, maxBatchCount = 1) should be(expected)
4343
}
4444

45+
"return only one part of entries by getFrom(..., maxBatchCount=1) even if it has more succeeding entries" in {
46+
47+
val logEntries = Seq(
48+
LogEntry(LogEntryIndex(1), EntityEvent(None, "a"), Term(1)),
49+
LogEntry(LogEntryIndex(2), EntityEvent(None, "b"), Term(1)),
50+
LogEntry(LogEntryIndex(3), EntityEvent(None, "c"), Term(1)),
51+
LogEntry(LogEntryIndex(4), EntityEvent(None, "d"), Term(1)),
52+
LogEntry(LogEntryIndex(5), EntityEvent(None, "e"), Term(1)),
53+
LogEntry(LogEntryIndex(6), EntityEvent(None, "f"), Term(1)),
54+
)
55+
val log = new ReplicatedLog(logEntries)
56+
57+
val expectedParts = Seq(Seq(logEntries(2), logEntries(3)))
58+
log.getFrom(LogEntryIndex(3), maxEntryCount = 2, maxBatchCount = 1) shouldBe expectedParts
59+
60+
}
61+
4562
"return part of logEntries by getFrom(LogEntryIndex, maxEntryCount, maxBatchCount) when (maxEntryCount * maxBatchCount) is lower value than count of logEntries" in {
4663
val logEntries = Seq(
4764
LogEntry(LogEntryIndex(1), EntityEvent(None, "a"), Term(1)),

0 commit comments

Comments
 (0)