Skip to content

Commit 6d5af4d

Browse files
remagpieforiequal0
authored andcommitted
Send and receive snapshot chunk requests
1 parent 7490d25 commit 6d5af4d

File tree

4 files changed

+189
-36
lines changed

4 files changed

+189
-36
lines changed

spec/Block-Synchronization-Extension.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,4 @@ Response to `GetStateChunk` message. Snappy algorithm is used for compression of
110110
* Restriction:
111111
* Number and order of chunks included in this message MUST be equal to request information.
112112
* Node corresponding to `chunk_root` in request MUST be included
113-
* If sender doesn’t have a chunk for the requested hash, corresponding chunk MUST be compressed([]), not omitted.
113+
* If sender doesn’t have a chunk for the requested hash, corresponding chunk MUST be `[]`(uncompressed), not omitted.

sync/src/block/extension.rs

Lines changed: 169 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use ccore::{
2424
Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, BlockStatus, ChainNotify, Client, ImportBlock,
2525
ImportError, UnverifiedTransaction,
2626
};
27+
use cmerkle::snapshot::ChunkDecompressor;
28+
use cmerkle::snapshot::Restore as SnapshotRestore;
2729
use cmerkle::TrieFactory;
2830
use cnetwork::{Api, EventSender, NetworkExtension, NodeId};
2931
use cstate::FindActionHandler;
@@ -32,6 +34,7 @@ use ctypes::header::{Header, Seal};
3234
use ctypes::transaction::Action;
3335
use ctypes::{BlockHash, BlockNumber};
3436
use hashdb::AsHashDB;
37+
use kvdb::DBTransaction;
3538
use primitives::{H256, U256};
3639
use rand::prelude::SliceRandom;
3740
use rand::thread_rng;
@@ -58,7 +61,10 @@ pub struct TokenInfo {
5861
#[derive(Debug)]
5962
enum State {
6063
SnapshotHeader(BlockHash, u64),
61-
SnapshotChunk(H256),
64+
SnapshotChunk {
65+
block: BlockHash,
66+
restore: SnapshotRestore,
67+
},
6268
Full,
6369
}
6470

@@ -85,9 +91,13 @@ impl Extension {
8591
Some((hash, num)) => match client.block_header(&BlockId::Number(num)) {
8692
Some(ref header) if *header.hash() == hash => {
8793
let state_db = client.state_db().read();
88-
match TrieFactory::readonly(state_db.as_hashdb(), &header.state_root()) {
94+
let state_root = header.state_root();
95+
match TrieFactory::readonly(state_db.as_hashdb(), &state_root) {
8996
Ok(ref trie) if trie.is_complete() => State::Full,
90-
_ => State::SnapshotChunk(*header.hash()),
97+
_ => State::SnapshotChunk {
98+
block: hash.into(),
99+
restore: SnapshotRestore::new(state_root),
100+
},
91101
}
92102
}
93103
_ => State::SnapshotHeader(hash.into(), num),
@@ -222,6 +232,37 @@ impl Extension {
222232
self.check_sync_variable();
223233
}
224234

235+
fn send_chunk_request(&mut self, block: &BlockHash, root: &H256) {
236+
let have_chunk_request = self.requests.values().flatten().any(|r| match r {
237+
(_, RequestMessage::StateChunk(..)) => true,
238+
_ => false,
239+
});
240+
241+
if !have_chunk_request {
242+
let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect();
243+
peer_ids.shuffle(&mut thread_rng());
244+
if let Some(id) = peer_ids.first() {
245+
if let Some(requests) = self.requests.get_mut(&id) {
246+
let req = RequestMessage::StateChunk(*block, vec![*root]);
247+
cdebug!(SYNC, "Request chunk to {} {:?}", id, req);
248+
let request_id = self.last_request;
249+
self.last_request += 1;
250+
requests.push((request_id, req.clone()));
251+
self.api.send(id, Arc::new(Message::Request(request_id, req).rlp_bytes()));
252+
253+
let token = &self.tokens[id];
254+
let token_info = self.tokens_info.get_mut(token).unwrap();
255+
256+
let _ = self.api.clear_timer(*token);
257+
self.api
258+
.set_timer_once(*token, Duration::from_millis(SYNC_EXPIRE_REQUEST_INTERVAL))
259+
.expect("Timer set succeeds");
260+
token_info.request_id = Some(request_id);
261+
}
262+
}
263+
}
264+
}
265+
225266
fn check_sync_variable(&self) {
226267
let mut has_error = false;
227268
for id in self.header_downloaders.keys() {
@@ -238,6 +279,14 @@ impl Extension {
238279
})
239280
.collect();
240281

282+
let chunk_requests: Vec<RequestMessage> = requests
283+
.iter()
284+
.filter_map(|r| match r {
285+
(_, RequestMessage::StateChunk(..)) => Some(r.1.clone()),
286+
_ => None,
287+
})
288+
.collect();
289+
241290
if body_requests.len() > 1 {
242291
cerror!(SYNC, "Body request length {} > 1, body_requests: {:?}", body_requests.len(), body_requests);
243292
has_error = true;
@@ -246,16 +295,18 @@ impl Extension {
246295
let token = &self.tokens[id];
247296
let token_info = &self.tokens_info[token];
248297

249-
match (token_info.request_id, body_requests.len()) {
298+
match (token_info.request_id, body_requests.len() + chunk_requests.len()) {
250299
(Some(_), 1) => {}
251300
(None, 0) => {}
252301
_ => {
253302
cerror!(
254303
SYNC,
255-
"request_id: {:?}, body_requests.len(): {}, body_requests: {:?}",
304+
"request_id: {:?}, body_requests.len(): {}, body_requests: {:?}, chunk_requests.len(): {}, chunk_requests: {:?}",
256305
token_info.request_id,
257306
body_requests.len(),
258-
body_requests
307+
body_requests,
308+
chunk_requests.len(),
309+
chunk_requests
259310
);
260311
has_error = true;
261312
}
@@ -358,7 +409,17 @@ impl NetworkExtension<Event> for Extension {
358409
});
359410
}
360411
}
361-
State::SnapshotChunk(..) => unimplemented!(),
412+
State::SnapshotChunk {
413+
block,
414+
ref mut restore,
415+
} => {
416+
if let Some(root) = restore.next_to_feed() {
417+
self.send_chunk_request(&block, &root);
418+
} else {
419+
cdebug!(SYNC, "Transitioning state to {:?}", State::Full);
420+
self.state = State::Full;
421+
}
422+
}
362423
State::Full => {
363424
for id in &peer_ids {
364425
let request =
@@ -454,12 +515,17 @@ impl Extension {
454515
State::SnapshotHeader(hash, ..) => {
455516
if imported.contains(&hash) {
456517
let header = self.client.block_header(&BlockId::Hash(hash)).expect("Imported header must exist");
457-
Some(State::SnapshotChunk(header.state_root()))
518+
Some(State::SnapshotChunk {
519+
block: hash,
520+
restore: SnapshotRestore::new(header.state_root()),
521+
})
458522
} else {
459523
None
460524
}
461525
}
462-
State::SnapshotChunk(..) => unimplemented!(),
526+
State::SnapshotChunk {
527+
..
528+
} => None,
463529
State::Full => {
464530
for peer in self.header_downloaders.values_mut() {
465531
peer.mark_as_imported(imported.clone());
@@ -499,12 +565,17 @@ impl Extension {
499565
State::SnapshotHeader(hash, ..) => {
500566
if imported.contains(&hash) {
501567
let header = self.client.block_header(&BlockId::Hash(hash)).expect("Imported header must exist");
502-
Some(State::SnapshotChunk(header.state_root()))
568+
Some(State::SnapshotChunk {
569+
block: hash,
570+
restore: SnapshotRestore::new(header.state_root()),
571+
})
503572
} else {
504573
None
505574
}
506575
}
507-
State::SnapshotChunk(..) => None,
576+
State::SnapshotChunk {
577+
..
578+
} => None,
508579
State::Full => {
509580
self.body_downloader.remove_target(&imported);
510581
self.body_downloader.remove_target(&invalid);
@@ -580,7 +651,7 @@ impl Extension {
580651
RequestMessage::Bodies(hashes) => !hashes.is_empty(),
581652
RequestMessage::StateChunk {
582653
..
583-
} => unimplemented!(),
654+
} => true,
584655
}
585656
}
586657

@@ -659,7 +730,24 @@ impl Extension {
659730
self.on_body_response(hashes, bodies);
660731
self.check_sync_variable();
661732
}
662-
ResponseMessage::StateChunk(..) => unimplemented!(),
733+
ResponseMessage::StateChunk(chunks) => {
734+
let roots = match request {
735+
RequestMessage::StateChunk(_, roots) => roots,
736+
_ => unreachable!(),
737+
};
738+
if let Some(token) = self.tokens.get(from) {
739+
if let Some(token_info) = self.tokens_info.get_mut(token) {
740+
if token_info.request_id.is_none() {
741+
ctrace!(SYNC, "Expired before handling response");
742+
return
743+
}
744+
self.api.clear_timer(*token).expect("Timer clear succeed");
745+
token_info.request_id = None;
746+
}
747+
}
748+
self.dismiss_request(from, id);
749+
self.on_chunk_response(from, &roots, &chunks);
750+
}
663751
}
664752
}
665753
}
@@ -713,12 +801,10 @@ impl Extension {
713801
}
714802
true
715803
}
716-
(
717-
RequestMessage::StateChunk {
718-
..
719-
},
720-
ResponseMessage::StateChunk(..),
721-
) => unimplemented!(),
804+
(RequestMessage::StateChunk(_, roots), ResponseMessage::StateChunk(chunks)) => {
805+
// Check length
806+
roots.len() == chunks.len()
807+
}
722808
_ => {
723809
cwarn!(SYNC, "Invalid response type");
724810
false
@@ -733,7 +819,10 @@ impl Extension {
733819
[header] if header.hash() == hash => {
734820
match self.client.import_bootstrap_header(&header) {
735821
Err(BlockImportError::Import(ImportError::AlreadyInChain)) => {
736-
self.state = State::SnapshotChunk(*header.state_root());
822+
self.state = State::SnapshotChunk {
823+
block: hash,
824+
restore: SnapshotRestore::new(*header.state_root()),
825+
};
737826
}
738827
Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {}
739828
// FIXME: handle import errors
@@ -751,7 +840,9 @@ impl Extension {
751840
headers.len()
752841
),
753842
},
754-
State::SnapshotChunk(..) => {}
843+
State::SnapshotChunk {
844+
..
845+
} => {}
755846
State::Full => {
756847
let (mut completed, peer_is_caught_up) = if let Some(peer) = self.header_downloaders.get_mut(from) {
757848
let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect();
@@ -833,6 +924,63 @@ impl Extension {
833924
self.send_body_request(&id);
834925
}
835926
}
927+
928+
fn on_chunk_response(&mut self, from: &NodeId, roots: &[H256], chunks: &[Vec<u8>]) {
929+
if let State::SnapshotChunk {
930+
block,
931+
ref mut restore,
932+
} = self.state
933+
{
934+
for (r, c) in roots.iter().zip(chunks) {
935+
if c.is_empty() {
936+
cdebug!(SYNC, "Peer {} sent empty response for chunk request {}", from, r);
937+
continue
938+
}
939+
let decompressor = ChunkDecompressor::from_slice(c);
940+
let raw_chunk = match decompressor.decompress() {
941+
Ok(chunk) => chunk,
942+
Err(e) => {
943+
cwarn!(SYNC, "Decode failed for chunk response from peer {}: {}", from, e);
944+
continue
945+
}
946+
};
947+
let recovered = match raw_chunk.recover(*r) {
948+
Ok(chunk) => chunk,
949+
Err(e) => {
950+
cwarn!(SYNC, "Invalid chunk response from peer {}: {}", from, e);
951+
continue
952+
}
953+
};
954+
955+
let batch = {
956+
let mut state_db = self.client.state_db().write();
957+
let hash_db = state_db.as_hashdb_mut();
958+
restore.feed(hash_db, recovered);
959+
960+
let mut batch = DBTransaction::new();
961+
match state_db.journal_under(&mut batch, 0, H256::zero()) {
962+
Ok(_) => batch,
963+
Err(e) => {
964+
cwarn!(SYNC, "Failed to write state chunk to database: {}", e);
965+
continue
966+
}
967+
}
968+
};
969+
self.client.db().write_buffered(batch);
970+
match self.client.db().flush() {
971+
Ok(_) => cdebug!(SYNC, "Wrote state chunk to database: {}", r),
972+
Err(e) => cwarn!(SYNC, "Failed to flush database: {}", e),
973+
}
974+
}
975+
976+
if let Some(root) = restore.next_to_feed() {
977+
self.send_chunk_request(&block, &root);
978+
} else {
979+
cdebug!(SYNC, "Transitioning state to {:?}", State::Full);
980+
self.state = State::Full;
981+
}
982+
}
983+
}
836984
}
837985

838986
pub struct BlockSyncSender(EventSender<Event>);

util/merkle/src/snapshot/compress.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl<R> ChunkDecompressor<R> {
3535
}
3636

3737
impl<'a> ChunkDecompressor<Cursor<&'a [u8]>> {
38-
fn from_slice(slice: &'a [u8]) -> Self {
38+
pub fn from_slice(slice: &'a [u8]) -> Self {
3939
ChunkDecompressor::new(Cursor::new(slice))
4040
}
4141
}

0 commit comments

Comments
 (0)