Skip to content

Commit f019635

Browse files
committed
Rename total_score to nonce in sync message
When CodeChain is synchronized by snapshot sync, `total_score` doesn't mean the accumulated score from the genesis block. To handle this, the sync extension is updated to not rely on `total_score` for deciding whether a peer is leading or not, and use the value only as a monotonically increasing nonce.
1 parent 96739f9 commit f019635

File tree

6 files changed

+68
-98
lines changed

6 files changed

+68
-98
lines changed

core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,4 @@ pub use crate::service::ClientService;
100100
pub use crate::transaction::{
101101
LocalizedTransaction, PendingSignedTransactions, SignedTransaction, UnverifiedTransaction,
102102
};
103-
pub use crate::types::{BlockId, TransactionId};
103+
pub use crate::types::{BlockId, BlockStatus, TransactionId};

spec/Block-Synchronization-Extension.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ Message :=
1919
### Status
2020

2121
```
22-
Status(total_score, best_hash, genesis_hash)
22+
Status(nonce, best_hash, genesis_hash)
2323
```
2424

2525
Send current chain status to peer.
2626

2727
* Identifier: 0x01
28-
* Restriction: None
28+
* Restriction:
29+
* `nonce` SHOULD be monotonically increasing every time the message is sent.
2930

3031
## Request messages
3132

sync/src/block/downloader/header.rs

Lines changed: 28 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -31,65 +31,50 @@ const MAX_HEADER_QUEUE_LENGTH: usize = 1024;
3131
const MAX_RETRY: usize = 3;
3232
const MAX_WAIT: u64 = 15;
3333

34-
#[derive(Clone)]
35-
struct Pivot {
36-
hash: BlockHash,
37-
total_score: U256,
38-
}
39-
4034
#[derive(Clone)]
4135
pub struct HeaderDownloader {
4236
// NOTE: Use this member as minimum as possible.
4337
client: Arc<dyn BlockChainClient>,
4438

45-
total_score: U256,
39+
nonce: U256,
4640
best_hash: BlockHash,
47-
48-
pivot: Pivot,
41+
pivot: BlockHash,
4942
request_time: Option<Instant>,
5043
downloaded: HashMap<BlockHash, Header>,
5144
queued: HashMap<BlockHash, Header>,
5245
trial: usize,
5346
}
5447

5548
impl HeaderDownloader {
56-
pub fn total_score(&self) -> U256 {
57-
self.total_score
58-
}
59-
60-
pub fn new(client: Arc<dyn BlockChainClient>, total_score: U256, best_hash: BlockHash) -> Self {
49+
pub fn new(client: Arc<dyn BlockChainClient>, nonce: U256, best_hash: BlockHash) -> Self {
6150
let best_header_hash = client.best_block_header().hash();
62-
let best_score = client.block_total_score(&BlockId::Latest).expect("Best block always exist");
6351

6452
Self {
6553
client,
6654

67-
total_score,
55+
nonce,
6856
best_hash,
69-
70-
pivot: Pivot {
71-
hash: best_header_hash,
72-
total_score: best_score,
73-
},
57+
pivot: best_header_hash,
7458
request_time: None,
7559
downloaded: HashMap::new(),
7660
queued: HashMap::new(),
7761
trial: 0,
7862
}
7963
}
8064

81-
pub fn update(&mut self, total_score: U256, best_hash: BlockHash) -> bool {
82-
match self.total_score.cmp(&total_score) {
65+
pub fn best_hash(&self) -> BlockHash {
66+
self.best_hash
67+
}
68+
69+
pub fn update(&mut self, nonce: U256, best_hash: BlockHash) -> bool {
70+
match self.nonce.cmp(&nonce) {
8371
Ordering::Equal => true,
8472
Ordering::Less => {
85-
self.total_score = total_score;
73+
self.nonce = nonce;
8674
self.best_hash = best_hash;
8775

8876
if self.client.block_header(&BlockId::Hash(best_hash)).is_some() {
89-
self.pivot = Pivot {
90-
hash: best_hash,
91-
total_score,
92-
}
77+
self.pivot = best_hash;
9378
}
9479
true
9580
}
@@ -108,25 +93,25 @@ impl HeaderDownloader {
10893
/// Find header from queued headers, downloaded cache and then from blockchain
10994
/// Panics if header dosn't exist
11095
fn pivot_header(&self) -> Header {
111-
match self.queued.get(&self.pivot.hash) {
96+
match self.queued.get(&self.pivot) {
11297
Some(header) => header.clone(),
113-
None => match self.downloaded.get(&self.pivot.hash) {
98+
None => match self.downloaded.get(&self.pivot) {
11499
Some(header) => header.clone(),
115-
None => self.client.block_header(&BlockId::Hash(self.pivot.hash)).unwrap(),
100+
None => self.client.block_header(&BlockId::Hash(self.pivot)).unwrap(),
116101
},
117102
}
118103
}
119104

120-
pub fn pivot_score(&self) -> U256 {
121-
self.pivot.total_score
122-
}
123-
124105
pub fn is_idle(&self) -> bool {
125-
let can_request = self.request_time.is_none() && self.total_score > self.pivot.total_score;
106+
let can_request = self.request_time.is_none() && self.best_hash != self.pivot;
126107

127108
self.is_valid() && (can_request || self.is_expired())
128109
}
129110

111+
pub fn is_caught_up(&self) -> bool {
112+
self.pivot == self.best_hash
113+
}
114+
130115
pub fn create_request(&mut self) -> Option<RequestMessage> {
131116
if !self.is_idle() {
132117
return None
@@ -154,37 +139,30 @@ impl HeaderDownloader {
154139
let pivot_header = self.pivot_header();
155140

156141
// This happens when best_hash is imported by other peer.
157-
if self.best_hash == self.pivot.hash {
142+
if self.best_hash == self.pivot {
158143
ctrace!(SYNC, "Ignore received headers, pivot already reached the best hash");
159-
} else if first_header_hash == self.pivot.hash {
144+
} else if first_header_hash == self.pivot {
160145
for header in headers.iter() {
161146
self.downloaded.insert(header.hash(), header.clone());
162147
}
163148

164149
// FIXME: skip known headers
165-
let new_scores = headers[1..].iter().fold(U256::zero(), |acc, header| acc + header.score());
166-
self.pivot = Pivot {
167-
hash: headers.last().expect("Last downloaded header must exist").hash(),
168-
total_score: self.pivot.total_score + new_scores,
169-
}
150+
self.pivot = headers.last().expect("Last downloaded header must exist").hash();
170151
} else if first_header_number < pivot_header.number() {
171152
ctrace!(
172153
SYNC,
173154
"Ignore received headers, pivot is already updated since headers are imported by other peers"
174155
);
175156
} else if first_header_number == pivot_header.number() {
176157
if pivot_header.number() != 0 {
177-
self.pivot = Pivot {
178-
hash: pivot_header.parent_hash(),
179-
total_score: self.pivot.total_score - pivot_header.score(),
180-
}
158+
self.pivot = pivot_header.parent_hash();
181159
}
182160
} else {
183161
cerror!(
184162
SYNC,
185-
"Invalid header update state. best_hash: {}, self.pivot.hash: {}, first_header_hash: {}",
163+
"Invalid header update state. best_hash: {}, self.pivot: {}, first_header_hash: {}",
186164
self.best_hash,
187-
self.pivot.hash,
165+
self.pivot,
188166
first_header_hash
189167
);
190168
}
@@ -203,10 +181,7 @@ impl HeaderDownloader {
203181
self.downloaded.remove(&hash);
204182

205183
if self.best_hash == hash {
206-
self.pivot = Pivot {
207-
hash,
208-
total_score: self.total_score,
209-
}
184+
self.pivot = hash;
210185
}
211186
}
212187
self.queued.shrink_to_fit();

sync/src/block/extension.rs

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use std::time::Duration;
2121

2222
use ccore::encoded::Header as EncodedHeader;
2323
use ccore::{
24-
Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, ChainNotify, Client, ImportBlock, ImportError,
25-
UnverifiedTransaction,
24+
Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, BlockStatus, ChainNotify, Client, ImportBlock,
25+
ImportError, UnverifiedTransaction,
2626
};
2727
use cmerkle::TrieFactory;
2828
use cnetwork::{Api, EventSender, NetworkExtension, NodeId};
@@ -74,6 +74,7 @@ pub struct Extension {
7474
client: Arc<Client>,
7575
api: Box<dyn Api>,
7676
last_request: u64,
77+
nonce: u64,
7778
}
7879

7980
impl Extension {
@@ -124,6 +125,7 @@ impl Extension {
124125
client,
125126
api,
126127
last_request: Default::default(),
128+
nonce: Default::default(),
127129
}
128130
}
129131

@@ -144,6 +146,14 @@ impl Extension {
144146
}
145147

146148
fn send_body_request(&mut self, id: &NodeId) {
149+
if let Some(downloader) = self.header_downloaders.get(&id) {
150+
if self.client.block_status(&BlockId::Hash(downloader.best_hash())) == BlockStatus::InChain {
151+
// Peer is lagging behind the local blockchain.
152+
// We don't need to request block bodies to this peer
153+
return
154+
}
155+
}
156+
147157
self.check_sync_variable();
148158
if let Some(requests) = self.requests.get_mut(id) {
149159
let have_body_request = {
@@ -241,14 +251,15 @@ impl NetworkExtension<Event> for Extension {
241251
id,
242252
Arc::new(
243253
Message::Status {
244-
total_score: chain_info.best_proposal_score,
254+
nonce: U256::from(self.nonce),
245255
best_hash: chain_info.best_proposal_block_hash,
246256
genesis_hash: chain_info.genesis_hash,
247257
}
248258
.rlp_bytes()
249259
.into_vec(),
250260
),
251261
);
262+
self.nonce += 1;
252263
let t = self.connected_nodes.insert(*id);
253264
debug_assert!(t, "{} is already added to peer list", id);
254265

@@ -297,10 +308,10 @@ impl NetworkExtension<Event> for Extension {
297308
if let Ok(received_message) = UntrustedRlp::new(data).as_val() {
298309
match received_message {
299310
Message::Status {
300-
total_score,
311+
nonce,
301312
best_hash,
302313
genesis_hash,
303-
} => self.on_peer_status(id, total_score, best_hash, genesis_hash),
314+
} => self.on_peer_status(id, nonce, best_hash, genesis_hash),
304315
Message::Request(request_id, request) => self.on_peer_request(id, request_id, request),
305316
Message::Response(request_id, response) => self.on_peer_response(id, request_id, response),
306317
}
@@ -326,7 +337,6 @@ impl NetworkExtension<Event> for Extension {
326337
}
327338
State::SnapshotChunk(..) => unimplemented!(),
328339
State::Full => {
329-
let best_proposal_score = self.client.chain_info().best_proposal_score;
330340
for id in &peer_ids {
331341
let request =
332342
self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request);
@@ -337,15 +347,7 @@ impl NetworkExtension<Event> for Extension {
337347
}
338348

339349
for id in peer_ids {
340-
let peer_score = if let Some(peer) = self.header_downloaders.get(&id) {
341-
peer.total_score()
342-
} else {
343-
U256::zero()
344-
};
345-
346-
if peer_score > best_proposal_score {
347-
self.send_body_request(&id);
348-
}
350+
self.send_body_request(&id);
349351
}
350352
}
351353
}
@@ -499,20 +501,21 @@ impl Extension {
499501
id,
500502
Arc::new(
501503
Message::Status {
502-
total_score: chain_info.best_proposal_score,
504+
nonce: U256::from(self.nonce),
503505
best_hash: chain_info.best_proposal_block_hash,
504506
genesis_hash: chain_info.genesis_hash,
505507
}
506508
.rlp_bytes()
507509
.into_vec(),
508510
),
509511
);
512+
self.nonce += 1;
510513
}
511514
}
512515
}
513516

514517
impl Extension {
515-
fn on_peer_status(&mut self, from: &NodeId, total_score: U256, best_hash: BlockHash, genesis_hash: BlockHash) {
518+
fn on_peer_status(&mut self, from: &NodeId, nonce: U256, best_hash: BlockHash, genesis_hash: BlockHash) {
516519
// Validity check
517520
if genesis_hash != self.client.chain_info().genesis_hash {
518521
cinfo!(SYNC, "Genesis hash mismatch with peer {}", from);
@@ -521,17 +524,17 @@ impl Extension {
521524

522525
match self.header_downloaders.entry(*from) {
523526
Entry::Occupied(mut peer) => {
524-
if !peer.get_mut().update(total_score, best_hash) {
527+
if !peer.get_mut().update(nonce, best_hash) {
525528
// FIXME: It should be an error level if the consensus is PoW.
526529
cdebug!(SYNC, "Peer #{} status updated but score is less than before", from);
527530
return
528531
}
529532
}
530533
Entry::Vacant(e) => {
531-
e.insert(HeaderDownloader::new(self.client.clone(), total_score, best_hash));
534+
e.insert(HeaderDownloader::new(self.client.clone(), nonce, best_hash));
532535
}
533536
}
534-
cinfo!(SYNC, "Peer #{} status update: total_score: {}, best_hash: {}", from, total_score, best_hash);
537+
cinfo!(SYNC, "Peer #{} status update: nonce: {}, best_hash: {}", from, nonce, best_hash);
535538
}
536539

537540
fn on_peer_request(&self, from: &NodeId, id: u64, request: RequestMessage) {
@@ -746,14 +749,12 @@ impl Extension {
746749
},
747750
State::SnapshotChunk(..) => {}
748751
State::Full => {
749-
let (mut completed, pivot_score_changed) = if let Some(peer) = self.header_downloaders.get_mut(from) {
750-
let before_pivot_score = peer.pivot_score();
752+
let (mut completed, peer_is_caught_up) = if let Some(peer) = self.header_downloaders.get_mut(from) {
751753
let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect();
752754
peer.import_headers(&encoded);
753-
let after_pivot_score = peer.pivot_score();
754-
(peer.downloaded(), before_pivot_score != after_pivot_score)
755+
(peer.downloaded(), peer.is_caught_up())
755756
} else {
756-
(Vec::new(), false)
757+
(Vec::new(), true)
757758
};
758759
completed.sort_unstable_by_key(EncodedHeader::number);
759760

@@ -779,7 +780,7 @@ impl Extension {
779780
peer.mark_as_imported(exists);
780781
peer.create_request()
781782
});
782-
if pivot_score_changed {
783+
if !peer_is_caught_up {
783784
if let Some(request) = request {
784785
self.send_header_request(from, request);
785786
}
@@ -821,20 +822,11 @@ impl Extension {
821822
}
822823
}
823824

824-
let total_score = self.client.chain_info().best_proposal_score;
825825
let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect();
826826
peer_ids.shuffle(&mut thread_rng());
827827

828828
for id in peer_ids {
829-
let peer_score = if let Some(peer) = self.header_downloaders.get(&id) {
830-
peer.total_score()
831-
} else {
832-
U256::zero()
833-
};
834-
835-
if peer_score > total_score {
836-
self.send_body_request(&id);
837-
}
829+
self.send_body_request(&id);
838830
}
839831
}
840832
}

0 commit comments

Comments
 (0)