Skip to content

Commit bd440c9

Browse files
authored
Merge pull request #126 from lerna-stack/enhance-request-vote
Enhance RequestVote receiving
2 parents c87ff6b + 0b2632c commit bd440c9

File tree

12 files changed

+1603
-365
lines changed

12 files changed

+1603
-365
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2929
- `ReplicatedEntity` can produce illegal snapshot if compaction and receiving new event occur same time [#111](https://github.com/lerna-stack/akka-entity-replication/issues/111)
3030
- Starting a follower member later than leader completes a compaction may break ReplicatedLog of the follower [#105](https://github.com/lerna-stack/akka-entity-replication/issues/105)
3131
- The Raft leader uses the same previous `LogEntryIndex` and `Term` to all batched `AppendEntries` messages [#123](https://github.com/lerna-stack/akka-entity-replication/issues/123)
32+
- Raft Actors doesn't accept a `RequestVote(lastLogIndex < log.lastLogIndex, lastLogTerm > log.lastLogTerm)` message [#125](https://github.com/lerna-stack/akka-entity-replication/issues/125)
3233

3334
## [v2.0.0] - 2021-07-16
3435
[v2.0.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v1.0.0...v2.0.0

src/main/scala/lerna/akka/entityreplication/raft/Candidate.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,8 @@ private[raft] trait Candidate { this: RaftActor =>
5858
}
5959

6060
case RequestVote(_, term, otherCandidate, lastLogIndex, lastLogTerm)
61-
if term.isNewerThan(
62-
currentData.currentTerm,
63-
) && lastLogTerm >= currentData.replicatedLog.lastLogTerm && lastLogIndex >= currentData.replicatedLog.lastLogIndex =>
61+
if term.isNewerThan(currentData.currentTerm) &&
62+
currentData.replicatedLog.isGivenLogUpToDate(lastLogTerm, lastLogIndex) =>
6463
if (log.isDebugEnabled) log.debug("=== [Candidate] accept RequestVote({}, {}) ===", term, otherCandidate)
6564
cancelElectionTimeoutTimer()
6665
applyDomainEvent(Voted(term, otherCandidate)) { domainEvent =>

src/main/scala/lerna/akka/entityreplication/raft/Follower.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ private[raft] trait Follower { this: RaftActor =>
4646
sender() ! RequestVoteDenied(currentData.currentTerm)
4747

4848
case request: RequestVote
49-
if request.lastLogTerm < currentData.replicatedLog.lastLogTerm || request.lastLogIndex < currentData.replicatedLog.lastLogIndex =>
49+
if !currentData.replicatedLog.isGivenLogUpToDate(request.lastLogTerm, request.lastLogIndex) =>
5050
if (log.isDebugEnabled) log.debug("=== [Follower] deny {} ===", request)
5151
if (request.term.isNewerThan(currentData.currentTerm)) {
5252
applyDomainEvent(DetectedNewTerm(request.term)) { _ =>

src/main/scala/lerna/akka/entityreplication/raft/Leader.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,8 @@ private[raft] trait Leader { this: RaftActor =>
4545
res match {
4646

4747
case RequestVote(_, term, candidate, lastLogIndex, lastLogTerm)
48-
if term.isNewerThan(
49-
currentData.currentTerm,
50-
) && lastLogTerm >= currentData.replicatedLog.lastLogTerm && lastLogIndex >= currentData.replicatedLog.lastLogIndex =>
48+
if term.isNewerThan(currentData.currentTerm) &&
49+
currentData.replicatedLog.isGivenLogUpToDate(lastLogTerm, lastLogIndex) =>
5150
if (log.isDebugEnabled) log.debug("=== [Leader] accept RequestVote({}, {}) ===", term, candidate)
5251
cancelHeartbeatTimeoutTimer()
5352
applyDomainEvent(Voted(term, candidate)) { domainEvent =>

src/main/scala/lerna/akka/entityreplication/raft/model/ReplicatedLog.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,21 @@ private[entityreplication] final case class ReplicatedLog private[model] (
6161
case logEntryIndex => get(logEntryIndex).map(_.term)
6262
}
6363

64+
/** Return true if the given log with the term and index is at least as up-to-date as this log.
65+
*
66+
* Determines which log is more up-to-date by comparing the term and index of the last entry.
67+
* If two logs have last entries with different terms, the greater term is more up-to-date.
68+
* If two logs have last entries with the same term, greater index is more up-to-date.
69+
*
70+
* Note: Returns true if two logs have last entries with the same term and the same index.
71+
*
72+
* @see [[https://github.com/ongardie/dissertation Raft thesis]] section 3.6.1
73+
*/
74+
def isGivenLogUpToDate(term: Term, index: LogEntryIndex): Boolean = {
75+
term > lastLogTerm ||
76+
(term == lastLogTerm && index >= lastLogIndex)
77+
}
78+
6479
def merge(thatEntries: Seq[LogEntry], prevLogIndex: LogEntryIndex): ReplicatedLog = {
6580
val newEntries = this.entries.takeWhile(_.index <= prevLogIndex) ++ thatEntries
6681
copy(newEntries)
Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
package lerna.akka.entityreplication.raft
2+
3+
import akka.actor.ActorSystem
4+
import akka.testkit.TestKit
5+
import lerna.akka.entityreplication.model.NormalizedEntityId
6+
import lerna.akka.entityreplication.raft.model.{ EntityEvent, LogEntry, LogEntryIndex, NoOp, ReplicatedLog, Term }
7+
import lerna.akka.entityreplication.raft.protocol.RaftCommands.{
8+
RequestVote,
9+
RequestVoteAccepted,
10+
RequestVoteDenied,
11+
RequestVoteResponse,
12+
}
13+
import lerna.akka.entityreplication.raft.routing.MemberIndex
14+
import org.scalatest.Inside
15+
16+
final class RaftActorCandidateReceivingRequestVoteSpec
17+
extends TestKit(ActorSystem())
18+
with RaftActorSpecBase
19+
with Inside {
20+
21+
import RaftActor._
22+
import lerna.akka.entityreplication.raft.RaftActorCandidateReceivingRequestVoteSpec._
23+
24+
private val shardId = createUniqueShardId()
25+
26+
private def verifyReceivingRequestVote(
27+
selfMemberIndex: MemberIndex,
28+
candidateData: RaftMemberData,
29+
requestVote: RequestVote,
30+
expectedReplyMessage: RequestVoteResponse,
31+
verifyState: RaftTestProbe.RaftState => Unit,
32+
): Unit = {
33+
val candidate = createRaftActor(
34+
shardId = shardId,
35+
selfMemberIndex = selfMemberIndex,
36+
)
37+
setState(candidate, Candidate, candidateData)
38+
candidate ! requestVote
39+
expectMsg(expectedReplyMessage)
40+
verifyState(getState(candidate))
41+
}
42+
43+
private def expectCandidateState(
44+
selfMemberIndex: MemberIndex,
45+
currentTerm: Term,
46+
votedFor: Option[MemberIndex],
47+
)(actual: RaftTestProbe.RaftState): Unit = {
48+
assert(actual.stateName === Candidate)
49+
assert(actual.stateData.currentTerm === currentTerm)
50+
assert(actual.stateData.votedFor === votedFor)
51+
votedFor.foreach { candidateMemberIndex =>
52+
assert(
53+
candidateMemberIndex === selfMemberIndex,
54+
"A candidate should become a follower after it votes for another candidate.",
55+
)
56+
}
57+
}
58+
59+
private def expectFollowerState(
60+
currentTerm: Term,
61+
votedFor: Option[MemberIndex],
62+
)(actual: RaftTestProbe.RaftState): Unit = {
63+
assert(actual.stateName === Follower)
64+
assert(actual.stateData.currentTerm === currentTerm)
65+
assert(actual.stateData.votedFor === votedFor)
66+
assert(
67+
actual.stateData.leaderMember === None,
68+
"A follower should not know the leader immediately after it comes from a candidate.",
69+
)
70+
}
71+
72+
"Candidate" should {
73+
74+
"deny RequestVote(term < currentTerm, ...)" in {
75+
val selfMemberIndex = createUniqueMemberIndex()
76+
77+
val candidateData = createCandidateData(Term(2), votedFor = None, acceptedMembers = Set.empty, newReplicatedLog())
78+
val requestVote = RequestVote(shardId, Term(1), selfMemberIndex, LogEntryIndex(0), Term(0))
79+
val expectedReplyMessage = RequestVoteDenied(Term(2))
80+
val verifyState = expectCandidateState(selfMemberIndex, Term(2), None)(_)
81+
82+
verifyReceivingRequestVote(selfMemberIndex, candidateData, requestVote, expectedReplyMessage, verifyState)
83+
}
84+
85+
"accept RequestVote(term = currentTerm, candidate = self, ...)" in {
86+
val selfMemberIndex = createUniqueMemberIndex()
87+
88+
val candidateData = createCandidateData(Term(1), votedFor = None, acceptedMembers = Set.empty, newReplicatedLog())
89+
val requestVote = RequestVote(shardId, Term(1), selfMemberIndex, LogEntryIndex(0), Term(0))
90+
val expectedReplyMessage = RequestVoteAccepted(Term(1), selfMemberIndex)
91+
val verifyState = expectCandidateState(selfMemberIndex, Term(1), Some(selfMemberIndex))(_)
92+
93+
verifyReceivingRequestVote(selfMemberIndex, candidateData, requestVote, expectedReplyMessage, verifyState)
94+
}
95+
96+
"deny RequestVote(term = currentTerm, candidate = other, ...)" in {
97+
val selfMemberIndex = createUniqueMemberIndex()
98+
val otherCandidateMemberIndex = createUniqueMemberIndex()
99+
100+
val candidateData = createCandidateData(Term(1), votedFor = None, acceptedMembers = Set.empty, newReplicatedLog())
101+
val requestVote = RequestVote(shardId, Term(1), otherCandidateMemberIndex, LogEntryIndex(0), Term(0))
102+
val expectedReplyMessage = RequestVoteDenied(Term(1))
103+
val verifyState = expectCandidateState(selfMemberIndex, Term(1), None)(_)
104+
105+
verifyReceivingRequestVote(selfMemberIndex, candidateData, requestVote, expectedReplyMessage, verifyState)
106+
}
107+
108+
"accept RequestVote(term > currentTerm, lastLogIndex > log.lastLogIndex, lastLogTerm > log.lastLogTerm, ...)" in {
109+
val selfMemberIndex = createUniqueMemberIndex()
110+
val otherCandidateMemberIndex = createUniqueMemberIndex()
111+
112+
val candidateData = createCandidateData(
113+
Term(2),
114+
votedFor = Some(selfMemberIndex),
115+
acceptedMembers = Set.empty,
116+
newReplicatedLog(
117+
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
118+
),
119+
)
120+
val requestVote = RequestVote(shardId, Term(3), otherCandidateMemberIndex, LogEntryIndex(2), Term(2))
121+
val expectedReplyMessage = RequestVoteAccepted(Term(3), selfMemberIndex)
122+
val verifyState = expectFollowerState(Term(3), Some(otherCandidateMemberIndex))(_)
123+
124+
verifyReceivingRequestVote(selfMemberIndex, candidateData, requestVote, expectedReplyMessage, verifyState)
125+
}
126+
127+
"accept RequestVote(term > currentTerm, lastLogIndex = log.lastLogIndex, lastLogTerm > log.lastLogTerm, ...)" in {
128+
val selfMemberIndex = createUniqueMemberIndex()
129+
val otherCandidateMemberIndex = createUniqueMemberIndex()
130+
131+
val candidateData = createCandidateData(
132+
Term(2),
133+
votedFor = Some(selfMemberIndex),
134+
acceptedMembers = Set.empty,
135+
newReplicatedLog(
136+
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
137+
),
138+
)
139+
val requestVote = RequestVote(shardId, Term(3), otherCandidateMemberIndex, LogEntryIndex(1), Term(2))
140+
val expectedReplyMessage = RequestVoteAccepted(Term(3), selfMemberIndex)
141+
val verifyState = expectFollowerState(Term(3), Some(otherCandidateMemberIndex))(_)
142+
143+
verifyReceivingRequestVote(selfMemberIndex, candidateData, requestVote, expectedReplyMessage, verifyState)
144+
}
145+
146+
"accept RequestVote(term > currentTerm, lastLogIndex < log.lastLogIndex, lastLogTerm > log.lastLogTerm, ...)" in {
147+
val selfMemberIndex = createUniqueMemberIndex()
148+
val otherCandidateMemberIndex = createUniqueMemberIndex()
149+
150+
val candidateData = {
151+
val entityId = NormalizedEntityId.from("test-entity")
152+
createCandidateData(
153+
Term(2),
154+
votedFor = Some(selfMemberIndex),
155+
acceptedMembers = Set.empty,
156+
newReplicatedLog(
157+
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
158+
LogEntry(LogEntryIndex(2), EntityEvent(Option(entityId), "a"), Term(1)),
159+
),
160+
)
161+
}
162+
val requestVote = RequestVote(shardId, Term(3), otherCandidateMemberIndex, LogEntryIndex(1), Term(2))
163+
val expectedReplyMessage = RequestVoteAccepted(Term(3), selfMemberIndex)
164+
val verifyState = expectFollowerState(Term(3), Some(otherCandidateMemberIndex))(_)
165+
166+
verifyReceivingRequestVote(selfMemberIndex, candidateData, requestVote, expectedReplyMessage, verifyState)
167+
}
168+
169+
"accept RequestVote(term > currentTerm, lastLogIndex > log.lastLogIndex, lastLogTerm = log.lastLogTerm, ...)" in {
170+
val selfMemberIndex = createUniqueMemberIndex()
171+
val otherCandidateMemberIndex = createUniqueMemberIndex()
172+
173+
val candidateData = {
174+
val entityId = NormalizedEntityId.from("test-entity")
175+
createCandidateData(
176+
Term(2),
177+
votedFor = Some(selfMemberIndex),
178+
acceptedMembers = Set.empty,
179+
newReplicatedLog(
180+
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
181+
LogEntry(LogEntryIndex(2), EntityEvent(Option(entityId), "a"), Term(1)),
182+
),
183+
)
184+
}
185+
val requestVote = RequestVote(shardId, Term(3), otherCandidateMemberIndex, LogEntryIndex(3), Term(1))
186+
val expectedReplyMessage = RequestVoteAccepted(Term(3), selfMemberIndex)
187+
val verifyState = expectFollowerState(Term(3), Some(otherCandidateMemberIndex))(_)
188+
189+
verifyReceivingRequestVote(selfMemberIndex, candidateData, requestVote, expectedReplyMessage, verifyState)
190+
}
191+
192+
"accept RequestVote(term > currentTerm, lastLogIndex = log.lastLogIndex, lastLogTerm = log.lastLogTerm, ...)" in {
193+
val selfMemberIndex = createUniqueMemberIndex()
194+
val otherCandidateMemberIndex = createUniqueMemberIndex()
195+
196+
val candidateData = createCandidateData(
197+
Term(1),
198+
votedFor = Some(selfMemberIndex),
199+
acceptedMembers = Set.empty,
200+
newReplicatedLog(),
201+
)
202+
val requestVote = RequestVote(shardId, Term(2), otherCandidateMemberIndex, LogEntryIndex(0), Term(0))
203+
val expectedReplyMessage = RequestVoteAccepted(Term(2), selfMemberIndex)
204+
val verifyState = expectFollowerState(Term(2), Some(otherCandidateMemberIndex))(_)
205+
206+
verifyReceivingRequestVote(selfMemberIndex, candidateData, requestVote, expectedReplyMessage, verifyState)
207+
}
208+
209+
"deny RequestVote(term > currentTerm, lastLogIndex < log.lastLogIndex, lastLogTerm = log.lastLogTerm, ...)" in {
210+
val selfMemberIndex = createUniqueMemberIndex()
211+
val otherCandidateMemberIndex = createUniqueMemberIndex()
212+
213+
val candidateData = {
214+
val entityId = NormalizedEntityId.from("test-entity")
215+
createCandidateData(
216+
Term(1),
217+
votedFor = Some(selfMemberIndex),
218+
acceptedMembers = Set.empty,
219+
newReplicatedLog(
220+
LogEntry(LogEntryIndex(1), EntityEvent(Option(entityId), "a"), Term(1)),
221+
LogEntry(LogEntryIndex(2), EntityEvent(Option(entityId), "b"), Term(1)),
222+
),
223+
)
224+
}
225+
val requestVote = RequestVote(shardId, Term(2), otherCandidateMemberIndex, LogEntryIndex(1), Term(1))
226+
val expectedReplyMessage = RequestVoteDenied(Term(2))
227+
val verifyState = expectFollowerState(Term(2), None)(_)
228+
229+
verifyReceivingRequestVote(selfMemberIndex, candidateData, requestVote, expectedReplyMessage, verifyState)
230+
}
231+
232+
"deny RequestVote(term > currentTerm, lastLogIndex > log.lastLogIndex, lastLogTerm < log.lastLogTerm, ...)" in {
233+
val selfMemberIndex = createUniqueMemberIndex()
234+
val otherCandidateMemberIndex = createUniqueMemberIndex()
235+
236+
val candidateData = {
237+
val entityId = NormalizedEntityId.from("test-entity")
238+
createCandidateData(
239+
Term(1),
240+
votedFor = Some(selfMemberIndex),
241+
acceptedMembers = Set.empty,
242+
newReplicatedLog(
243+
LogEntry(LogEntryIndex(1), EntityEvent(Option(entityId), "a"), Term(1)),
244+
LogEntry(LogEntryIndex(2), EntityEvent(Option(entityId), "b"), Term(2)),
245+
),
246+
)
247+
}
248+
val requestVote = RequestVote(shardId, Term(2), otherCandidateMemberIndex, LogEntryIndex(3), Term(1))
249+
val expectedReplyMessage = RequestVoteDenied(Term(2))
250+
val verifyState = expectFollowerState(Term(2), None)(_)
251+
252+
verifyReceivingRequestVote(selfMemberIndex, candidateData, requestVote, expectedReplyMessage, verifyState)
253+
}
254+
255+
"deny RequestVote(term > currentTerm, lastLogIndex = log.lastLogIndex, lastLogTerm < log.lastLogTerm, ...)" in {
256+
val selfMemberIndex = createUniqueMemberIndex()
257+
val otherCandidateMemberIndex = createUniqueMemberIndex()
258+
259+
val candidateData = {
260+
createCandidateData(
261+
Term(3),
262+
votedFor = Some(selfMemberIndex),
263+
acceptedMembers = Set.empty,
264+
newReplicatedLog(
265+
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
266+
LogEntry(LogEntryIndex(2), EntityEvent(None, NoOp), Term(2)),
267+
),
268+
)
269+
}
270+
val requestVote = RequestVote(shardId, Term(4), otherCandidateMemberIndex, LogEntryIndex(2), Term(1))
271+
val expectedReplyMessage = RequestVoteDenied(Term(4))
272+
val verifyState = expectFollowerState(Term(4), None)(_)
273+
274+
verifyReceivingRequestVote(selfMemberIndex, candidateData, requestVote, expectedReplyMessage, verifyState)
275+
}
276+
277+
"deny RequestVote(term > currentTerm, lastLogIndex < log.lastLogIndex, lastLogTerm < log.lastLogTerm, ...)" in {
278+
val selfMemberIndex = createUniqueMemberIndex()
279+
val otherCandidateMemberIndex = createUniqueMemberIndex()
280+
281+
val candidateData = {
282+
createCandidateData(
283+
Term(3),
284+
votedFor = Some(selfMemberIndex),
285+
acceptedMembers = Set.empty,
286+
newReplicatedLog(
287+
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
288+
LogEntry(LogEntryIndex(2), EntityEvent(None, NoOp), Term(2)),
289+
),
290+
)
291+
}
292+
val requestVote = RequestVote(shardId, Term(4), otherCandidateMemberIndex, LogEntryIndex(1), Term(1))
293+
val expectedReplyMessage = RequestVoteDenied(Term(4))
294+
val verifyState = expectFollowerState(Term(4), None)(_)
295+
296+
verifyReceivingRequestVote(selfMemberIndex, candidateData, requestVote, expectedReplyMessage, verifyState)
297+
}
298+
299+
}
300+
301+
}
302+
303+
object RaftActorCandidateReceivingRequestVoteSpec {
304+
305+
private def createCandidateData(
306+
currentTerm: Term,
307+
votedFor: Option[MemberIndex],
308+
acceptedMembers: Set[MemberIndex],
309+
log: ReplicatedLog,
310+
): RaftMemberData = {
311+
// `acceptedMembers` could be empty even if `votedFor` = Some(self)
312+
// since `acceptedMembers` will be updated after a candidate receives a `RequestVoteAccepted`.
313+
RaftMemberData(
314+
currentTerm = currentTerm,
315+
votedFor = votedFor,
316+
replicatedLog = log,
317+
leaderMember = None,
318+
acceptedMembers = acceptedMembers,
319+
).initializeCandidateData()
320+
}
321+
322+
private def newReplicatedLog(
323+
entries: LogEntry*,
324+
) = {
325+
ReplicatedLog()
326+
.reset(Term(0), LogEntryIndex(0))
327+
.merge(entries, LogEntryIndex(0))
328+
}
329+
330+
}

0 commit comments

Comments
 (0)