Ethereum  PoC-8
The C++ Implementation of Ethereum
BlockChainSync.cpp
Go to the documentation of this file.
1 // Copyright 2019 Aleth Authors.
3 // Licensed under the GNU General Public License, Version 3.
4 
5 #include "BlockChainSync.h"
6 
7 #include "BlockChain.h"
8 #include "BlockQueue.h"
9 #include "EthereumCapability.h"
10 #include <libdevcore/Common.h>
11 #include <libdevcore/TrieHash.h>
12 #include <libethcore/Exceptions.h>
13 #include <libp2p/Host.h>
14 #include <libp2p/Session.h>
15 #include <chrono>
16 
17 using namespace std;
18 using namespace dev;
19 using namespace dev::eth;
20 
21 std::ostream& dev::eth::operator<<(std::ostream& _out, SyncStatus const& _sync)
22 {
23  _out << "protocol: " << _sync.protocolVersion << endl;
24  _out << "state: " << EthereumCapability::stateName(_sync.state) << " ";
25  if (_sync.state == SyncState::Blocks)
26  _out << _sync.currentBlockNumber << "/" << _sync.highestBlockNumber;
27  return _out;
28 }
29 
30 namespace // Helper functions.
31 {
32 
33 constexpr unsigned c_maxPeerUknownNewBlocks = 1024;
34 constexpr unsigned c_maxRequestHeaders = 1024;
35 constexpr unsigned c_maxRequestBodies = 1024;
36 
37 template<typename T> bool haveItem(std::map<unsigned, T>& _container, unsigned _number)
38 {
39  if (_container.empty())
40  return false;
41  auto lower = _container.lower_bound(_number);
42  if (lower != _container.end() && lower->first == _number)
43  return true;
44  if (lower == _container.begin())
45  return false;
46  --lower;
47  return lower->first <= _number && (lower->first + lower->second.size()) > _number;
48 }
49 
50 template<typename T> T const* findItem(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
51 {
52  if (_container.empty())
53  return nullptr;
54  auto lower = _container.lower_bound(_number);
55  if (lower != _container.end() && lower->first == _number)
56  return &(*lower->second.begin());
57  if (lower == _container.begin())
58  return nullptr;
59  --lower;
60  if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
61  return &lower->second.at(_number - lower->first);
62  return nullptr;
63 }
64 
65 template<typename T> void removeItem(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
66 {
67  if (_container.empty())
68  return;
69  auto lower = _container.lower_bound(_number);
70  if (lower != _container.end() && lower->first == _number)
71  {
72  _container.erase(lower);
73  return;
74  }
75  if (lower == _container.begin())
76  return;
77  --lower;
78  if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
79  lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
80 }
81 
82 template<typename T> void removeAllStartingWith(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
83 {
84  if (_container.empty())
85  return;
86  auto lower = _container.lower_bound(_number);
87  if (lower != _container.end() && lower->first == _number)
88  {
89  _container.erase(lower, _container.end());
90  return;
91  }
92  if (lower == _container.begin())
93  {
94  _container.clear();
95  return;
96  }
97  --lower;
98  if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
99  lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
100  _container.erase(++lower, _container.end());
101 }
102 
103 template<typename T> void mergeInto(std::map<unsigned, std::vector<T>>& _container, unsigned _number, T&& _data)
104 {
105  assert(!haveItem(_container, _number));
106  auto lower = _container.lower_bound(_number);
107  if (!_container.empty() && lower != _container.begin())
108  --lower;
109  if (lower != _container.end() && (lower->first + lower->second.size() == _number))
110  {
111  // extend existing chunk
112  lower->second.emplace_back(_data);
113 
114  auto next = lower;
115  ++next;
116  if (next != _container.end() && (lower->first + lower->second.size() == next->first))
117  {
118  // merge with the next chunk
119  std::move(next->second.begin(), next->second.end(), std::back_inserter(lower->second));
120  _container.erase(next);
121  }
122 
123  }
124  else
125  {
126  // insert a new chunk
127  auto inserted = _container.insert(lower, std::make_pair(_number, std::vector<T> { _data }));
128  auto next = inserted;
129  ++next;
130  if (next != _container.end() && next->first == _number + 1)
131  {
132  std::move(next->second.begin(), next->second.end(), std::back_inserter(inserted->second));
133  _container.erase(next);
134  }
135  }
136 }
137 
138 } // Anonymous namespace -- helper functions.
139 
140 BlockChainSync::BlockChainSync(EthereumCapability& _host)
141  : m_host(_host),
142  m_chainStartBlock(_host.chain().chainStartBlockNumber()),
143  m_startingBlock(_host.chain().number()),
144  m_lastImportedBlock(m_startingBlock),
145  m_lastImportedBlockHash(_host.chain().currentHash())
146 {
147  m_bqRoomAvailable = host().bq().onRoomAvailable([this]()
148  {
149  RecursiveGuard l(x_sync);
150  m_state = SyncState::Blocks;
151  continueSync();
152  });
153 }
154 
156 {
157  RecursiveGuard l(x_sync);
158  abortSync();
159 }
160 
162 {
163  //if a block has been added via mining or other block import function
164  //through RPC, then we should count it as a last imported block
165  RecursiveGuard l(x_sync);
166  if (_info.number() > m_lastImportedBlock)
167  {
168  m_lastImportedBlock = static_cast<unsigned>(_info.number());
169  m_lastImportedBlockHash = _info.hash();
170  m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
171  }
172 }
173 
175 {
176  RecursiveGuard l(x_sync);
177  resetSync();
178  onPeerAborting();
179 }
180 
182 {
183  RecursiveGuard l(x_sync);
185 
186  auto peerSessionInfo = m_host.capabilityHost().peerSessionInfo(_peer.id());
187  if (!peerSessionInfo)
188  return; // Expired
189 
190  std::string disconnectReason;
191  if (peerSessionInfo->clientVersion.find("/v0.7.0/") != string::npos)
192  disconnectReason = "Blacklisted client version.";
193  else
194  disconnectReason = _peer.validate(
195  host().chain().genesisHash(), host().protocolVersion(), host().networkId());
196 
197  if (!disconnectReason.empty())
198  {
199  LOG(m_logger) << "Peer not suitable for sync: " << disconnectReason;
200  m_host.capabilityHost().disconnect(_peer.id(), p2p::UserReason);
201  return;
202  }
203 
204  // Before starting to exchange the data with the node, let's verify that it's on our chain
205  if (!requestDaoForkBlockHeader(_peer.id()))
206  {
207  // DAO challenge not needed
208  syncPeer(_peer.id(), false);
209  }
210 }
211 
212 bool BlockChainSync::requestDaoForkBlockHeader(NodeID const& _peerID)
213 {
214  // DAO challenge
215  u256 const daoHardfork = host().chain().sealEngine()->chainParams().daoHardforkBlock;
216  if (daoHardfork == 0 || daoHardfork == c_infiniteBlockNumber)
217  return false;
218 
219  m_daoChallengedPeers.insert(_peerID);
220  m_host.peer(_peerID).requestBlockHeaders(static_cast<unsigned>(daoHardfork), 1, 0, false);
221  return true;
222 }
223 
224 void BlockChainSync::syncPeer(NodeID const& _peerID, bool _force)
225 {
226  if (m_host.peer(_peerID).isConversing())
227  {
228  LOG(m_loggerDetail) << "Can't sync with this peer - outstanding asks.";
229  return;
230  }
231 
232  if (m_state == SyncState::Waiting)
233  return;
234 
235  u256 td = host().chain().details().totalDifficulty;
236  if (host().bq().isActive())
237  td += host().bq().difficulty();
238 
239  u256 syncingDifficulty = std::max(m_syncingTotalDifficulty, td);
240 
241  auto& peer = m_host.peer(_peerID);
242  u256 peerTotalDifficulty = peer.totalDifficulty();
243 
244  if (_force || peerTotalDifficulty > syncingDifficulty)
245  {
246  if (peerTotalDifficulty > syncingDifficulty)
247  LOG(m_logger) << "Discovered new highest difficulty";
248 
249  // start sync
250  m_syncingTotalDifficulty = peerTotalDifficulty;
251  if (m_state == SyncState::Idle || m_state == SyncState::NotSynced)
252  {
253  LOG(m_loggerInfo) << "Starting full sync";
254  m_state = SyncState::Blocks;
255  }
256  peer.requestBlockHeaders(peer.latestHash(), 1, 0, false);
257  peer.setWaitingForTransactions(true);
258  return;
259  }
260 
261  if (m_state == SyncState::Blocks)
262  {
263  requestBlocks(_peerID);
264  return;
265  }
266 }
267 
268 void BlockChainSync::continueSync()
269 {
270  host().capabilityHost().foreachPeer(m_host.name(), [this](NodeID const& _peerID) {
271  syncPeer(_peerID, false);
272  return true;
273  });
274 }
275 
276 void BlockChainSync::requestBlocks(NodeID const& _peerID)
277 {
278  clearPeerDownload(_peerID);
279  if (host().bq().knownFull())
280  {
281  LOG(m_loggerDetail) << "Waiting for block queue before downloading blocks";
282  pauseSync();
283  return;
284  }
285  // check to see if we need to download any block bodies first
286  auto header = m_headers.begin();
287  h256s neededBodies;
288  vector<unsigned> neededNumbers;
289  unsigned index = 0;
290  if (m_haveCommonHeader && !m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1)
291  {
292  while (header != m_headers.end() && neededBodies.size() < c_maxRequestBodies && index < header->second.size())
293  {
294  unsigned block = header->first + index;
295  if (m_downloadingBodies.count(block) == 0 && !haveItem(m_bodies, block))
296  {
297  neededBodies.push_back(header->second[index].hash);
298  neededNumbers.push_back(block);
299  m_downloadingBodies.insert(block);
300  }
301 
302  ++index;
303  if (index >= header->second.size())
304  break; // Download bodies only for validated header chain
305  }
306  }
307  if (neededBodies.size() > 0)
308  {
309  m_bodySyncPeers[_peerID] = neededNumbers;
310  m_host.peer(_peerID).requestBlockBodies(neededBodies);
311  }
312  else
313  {
314  // check if need to download headers
315  unsigned start = 0;
316  if (!m_haveCommonHeader)
317  {
318  // download backwards until common block is found 1 header at a time
319  start = m_lastImportedBlock;
320  if (!m_headers.empty())
321  start = std::min(start, m_headers.begin()->first - 1);
322  m_lastImportedBlock = start;
323  m_lastImportedBlockHash = host().chain().numberHash(start);
324 
325  if (start <= m_chainStartBlock + 1)
326  m_haveCommonHeader = true; //reached chain start
327  }
328  if (m_haveCommonHeader)
329  {
330  start = m_lastImportedBlock + 1;
331  auto next = m_headers.begin();
332  unsigned count = 0;
333  if (!m_headers.empty() && start >= m_headers.begin()->first)
334  {
335  start = m_headers.begin()->first + m_headers.begin()->second.size();
336  ++next;
337  }
338 
339  while (count == 0 && next != m_headers.end())
340  {
341  count = std::min(c_maxRequestHeaders, next->first - start);
342  while(count > 0 && m_downloadingHeaders.count(start) != 0)
343  {
344  start++;
345  count--;
346  }
347  std::vector<unsigned> headers;
348  for (unsigned block = start; block < start + count; block++)
349  if (m_downloadingHeaders.count(block) == 0)
350  {
351  headers.push_back(block);
352  m_downloadingHeaders.insert(block);
353  }
354  count = headers.size();
355  if (count > 0)
356  {
357  m_headerSyncPeers[_peerID] = headers;
358  assert(!haveItem(m_headers, start));
359  m_host.peer(_peerID).requestBlockHeaders(start, count, 0, false);
360  }
361  else if (start >= next->first)
362  {
363  start = next->first + next->second.size();
364  ++next;
365  }
366  }
367  }
368  else
369  m_host.peer(_peerID).requestBlockHeaders(start, 1, 0, false);
370  }
371 }
372 
373 void BlockChainSync::clearPeerDownload(NodeID const& _peerID)
374 {
375  auto syncPeer = m_headerSyncPeers.find(_peerID);
376  if (syncPeer != m_headerSyncPeers.end())
377  {
378  for (unsigned block : syncPeer->second)
379  m_downloadingHeaders.erase(block);
380  m_headerSyncPeers.erase(syncPeer);
381  }
382  syncPeer = m_bodySyncPeers.find(_peerID);
383  if (syncPeer != m_bodySyncPeers.end())
384  {
385  for (unsigned block : syncPeer->second)
386  m_downloadingBodies.erase(block);
387  m_bodySyncPeers.erase(syncPeer);
388  }
389  m_daoChallengedPeers.erase(_peerID);
390 }
391 
392 void BlockChainSync::clearPeerDownload()
393 {
394  for (auto s = m_headerSyncPeers.begin(); s != m_headerSyncPeers.end();)
395  {
396  if (!m_host.capabilityHost().peerSessionInfo(s->first))
397  {
398  for (unsigned block : s->second)
399  m_downloadingHeaders.erase(block);
400  m_headerSyncPeers.erase(s++);
401  }
402  else
403  ++s;
404  }
405  for (auto s = m_bodySyncPeers.begin(); s != m_bodySyncPeers.end();)
406  {
407  if (!m_host.capabilityHost().peerSessionInfo(s->first))
408  {
409  for (unsigned block : s->second)
410  m_downloadingBodies.erase(block);
411  m_bodySyncPeers.erase(s++);
412  }
413  else
414  ++s;
415  }
416  for (auto s = m_daoChallengedPeers.begin(); s != m_daoChallengedPeers.end();)
417  {
418  if (!m_host.capabilityHost().peerSessionInfo(*s))
419  m_daoChallengedPeers.erase(s++);
420  else
421  ++s;
422  }
423 }
424 
425 void BlockChainSync::logNewBlock(h256 const& _h)
426 {
427  m_knownNewHashes.erase(_h);
428 }
429 
430 void BlockChainSync::onPeerBlockHeaders(NodeID const& _peerID, RLP const& _r)
431 {
432  RecursiveGuard l(x_sync);
434  size_t itemCount = _r.itemCount();
435  LOG(m_logger) << "BlocksHeaders (" << dec << itemCount << " entries) "
436  << (itemCount ? "" : ": NoMoreHeaders");
437 
438  if (m_daoChallengedPeers.find(_peerID) != m_daoChallengedPeers.end())
439  {
440  if (verifyDaoChallengeResponse(_r))
441  syncPeer(_peerID, false);
442  else
443  m_host.disablePeer(_peerID, "Peer from another fork.");
444 
445  m_daoChallengedPeers.erase(_peerID);
446  return;
447  }
448 
449  clearPeerDownload(_peerID);
450  if (m_state != SyncState::Blocks && m_state != SyncState::Waiting)
451  {
452  LOG(m_logger) << "Ignoring unexpected blocks";
453  return;
454  }
455  if (m_state == SyncState::Waiting)
456  {
457  LOG(m_loggerDetail) << "Ignored blocks while waiting";
458  return;
459  }
460  if (itemCount == 0)
461  {
462  LOG(m_loggerDetail) << "Peer does not have the blocks requested";
463  m_host.capabilityHost().updateRating(_peerID, -1);
464  }
465  for (unsigned i = 0; i < itemCount; i++)
466  {
467  BlockHeader info(_r[i].data(), HeaderData);
468  unsigned blockNumber = static_cast<unsigned>(info.number());
469  if (blockNumber < m_chainStartBlock)
470  {
471  LOG(m_logger) << "Skipping too old header " << blockNumber;
472  continue;
473  }
474  if (haveItem(m_headers, blockNumber))
475  {
476  LOG(m_logger) << "Skipping header " << blockNumber << " (already downloaded)";
477  continue;
478  }
479  if (blockNumber <= m_lastImportedBlock && m_haveCommonHeader)
480  {
481  LOG(m_logger) << "Skipping header " << blockNumber << " (already imported)";
482  continue;
483  }
484  if (blockNumber > m_highestBlock)
485  m_highestBlock = blockNumber;
486 
487  auto status = host().bq().blockStatus(info.hash());
488  if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(info.hash()))
489  {
490  m_haveCommonHeader = true;
491  m_lastImportedBlock = (unsigned)info.number();
492  m_lastImportedBlockHash = info.hash();
493 
494  if (!m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1 &&
495  m_headers.begin()->second[0].parent != m_lastImportedBlockHash)
496  {
497  // Start of the header chain in m_headers doesn't match our known chain,
498  // probably we've downloaded other fork
499  clog(VerbosityWarning, "sync")
500  << "Unknown parent of the downloaded headers, restarting sync";
501  restartSync();
502  return;
503  }
504  }
505  else
506  {
507  Header hdr { _r[i].data().toBytes(), info.hash(), info.parentHash() };
508 
509  // validate chain
510  HeaderId headerId { info.transactionsRoot(), info.sha3Uncles() };
511  if (m_haveCommonHeader)
512  {
513  Header const* prevBlock = findItem(m_headers, blockNumber - 1);
514  if ((prevBlock && prevBlock->hash != info.parentHash()) || (blockNumber == m_lastImportedBlock + 1 && info.parentHash() != m_lastImportedBlockHash))
515  {
516  // mismatching parent id, delete the previous block and don't add this one
517  clog(VerbosityWarning, "sync") << "Unknown block header " << blockNumber << " "
518  << info.hash() << " (Restart syncing)";
519  m_host.capabilityHost().updateRating(_peerID, -1);
520  restartSync();
521  return ;
522  }
523 
524  Header const* nextBlock = findItem(m_headers, blockNumber + 1);
525  if (nextBlock && nextBlock->parent != info.hash())
526  {
527  LOG(m_loggerDetail)
528  << "Unknown block header " << blockNumber + 1 << " " << nextBlock->hash;
529  // clear following headers
530  unsigned n = blockNumber + 1;
531  auto headers = m_headers.at(n);
532  for (auto const& h : headers)
533  {
534  BlockHeader deletingInfo(h.data, HeaderData);
535  m_headerIdToNumber.erase(headerId);
536  m_downloadingBodies.erase(n);
537  m_downloadingHeaders.erase(n);
538  ++n;
539  }
540  removeAllStartingWith(m_headers, blockNumber + 1);
541  removeAllStartingWith(m_bodies, blockNumber + 1);
542  }
543  }
544 
545  mergeInto(m_headers, blockNumber, std::move(hdr));
546  if (headerId.transactionsRoot == EmptyTrie && headerId.uncles == EmptyListSHA3)
547  {
548  //empty body, just mark as downloaded
549  RLPStream r(2);
552  bytes body;
553  r.swapOut(body);
554  mergeInto(m_bodies, blockNumber, std::move(body));
555  }
556  else
557  m_headerIdToNumber[headerId] = blockNumber;
558  }
559  }
560  collectBlocks();
561  continueSync();
562 }
563 
564 bool BlockChainSync::verifyDaoChallengeResponse(RLP const& _r)
565 {
566  if (_r.itemCount() != 1)
567  return false;
568 
569  BlockHeader info(_r[0].data(), HeaderData);
570  return info.number() == host().chain().sealEngine()->chainParams().daoHardforkBlock &&
571  info.extraData() == fromHex("0x64616f2d686172642d666f726b");
572 }
573 
574 void BlockChainSync::onPeerBlockBodies(NodeID const& _peerID, RLP const& _r)
575 {
576  RecursiveGuard l(x_sync);
578  size_t itemCount = _r.itemCount();
579  LOG(m_logger) << "BlocksBodies (" << dec << itemCount << " entries) "
580  << (itemCount ? "" : ": NoMoreBodies");
581  clearPeerDownload(_peerID);
582  if (m_state != SyncState::Blocks && m_state != SyncState::Waiting) {
583  LOG(m_logger) << "Ignoring unexpected blocks";
584  return;
585  }
586  if (m_state == SyncState::Waiting)
587  {
588  LOG(m_loggerDetail) << "Ignored blocks while waiting";
589  return;
590  }
591  if (itemCount == 0)
592  {
593  LOG(m_loggerDetail) << "Peer does not have the blocks requested";
594  m_host.capabilityHost().updateRating(_peerID, -1);
595  }
596  for (unsigned i = 0; i < itemCount; i++)
597  {
598  RLP body(_r[i]);
599 
600  auto txList = body[0];
601  h256 transactionRoot = trieRootOver(txList.itemCount(), [&](unsigned i){ return rlp(i); }, [&](unsigned i){ return txList[i].data().toBytes(); });
602  h256 uncles = sha3(body[1].data());
603  HeaderId id { transactionRoot, uncles };
604  auto iter = m_headerIdToNumber.find(id);
605  if (iter == m_headerIdToNumber.end() || !haveItem(m_headers, iter->second))
606  {
607  LOG(m_loggerDetail) << "Ignored unknown block body";
608  continue;
609  }
610  unsigned blockNumber = iter->second;
611  if (haveItem(m_bodies, blockNumber))
612  {
613  LOG(m_logger) << "Skipping already downloaded block body " << blockNumber;
614  continue;
615  }
616  m_headerIdToNumber.erase(id);
617  mergeInto(m_bodies, blockNumber, body.data().toBytes());
618  }
619  collectBlocks();
620  continueSync();
621 }
622 
623 void BlockChainSync::collectBlocks()
624 {
625  if (!m_haveCommonHeader || m_headers.empty() || m_bodies.empty())
626  return;
627 
628  // merge headers and bodies
629  auto& headers = *m_headers.begin();
630  auto& bodies = *m_bodies.begin();
631  if (headers.first != bodies.first || headers.first != m_lastImportedBlock + 1)
632  return;
633 
634  unsigned success = 0;
635  unsigned future = 0;
636  unsigned got = 0;
637  unsigned unknown = 0;
638  size_t i = 0;
639  for (; i < headers.second.size() && i < bodies.second.size(); i++)
640  {
641  RLPStream blockStream(3);
642  blockStream.appendRaw(headers.second[i].data);
643  RLP body(bodies.second[i]);
644  blockStream.appendRaw(body[0].data());
645  blockStream.appendRaw(body[1].data());
646  bytes block;
647  blockStream.swapOut(block);
648  switch (host().bq().import(&block))
649  {
651  success++;
652  if (headers.first + i > m_lastImportedBlock)
653  {
654  m_lastImportedBlock = headers.first + (unsigned)i;
655  m_lastImportedBlockHash = headers.second[i].hash;
656  }
657  break;
659  LOG(m_logger) << "Malformed block #" << headers.first + i << ". Restarting sync.";
660  restartSync();
661  return;
663  LOG(m_logger) << "Block from the bad chain, block #" << headers.first + i
664  << ". Restarting sync.";
665  restartSync();
666  return;
667 
669  future++;
670  break;
672  break;
676  if (headers.first + i > m_lastImportedBlock)
677  {
678  logImported(success, future, got, unknown);
679  LOG(m_logger)
680  << "Already known or future time & unknown parent or unknown parent, block #"
681  << headers.first + i << ". Resetting sync.";
682  resetSync();
683  m_haveCommonHeader = false; // fork detected, search for common header again
684  }
685  return;
686 
687  default:;
688  }
689  }
690 
691  logImported(success, future, got, unknown);
692 
693  if (host().bq().unknownFull())
694  {
695  clog(VerbosityWarning, "sync") << "Too many unknown blocks, restarting sync";
696  restartSync();
697  return;
698  }
699 
700  auto newHeaders = std::move(headers.second);
701  newHeaders.erase(newHeaders.begin(), newHeaders.begin() + i);
702  unsigned newHeaderHead = headers.first + i;
703  auto newBodies = std::move(bodies.second);
704  newBodies.erase(newBodies.begin(), newBodies.begin() + i);
705  unsigned newBodiesHead = bodies.first + i;
706  m_headers.erase(m_headers.begin());
707  m_bodies.erase(m_bodies.begin());
708  if (!newHeaders.empty())
709  m_headers[newHeaderHead] = newHeaders;
710  if (!newBodies.empty())
711  m_bodies[newBodiesHead] = newBodies;
712 
713  if (m_headers.empty())
714  {
715  assert(m_bodies.empty());
716  completeSync();
717  }
719 }
720 
721 void BlockChainSync::logImported(
722  unsigned _success, unsigned _future, unsigned _got, unsigned _unknown)
723 {
724  LOG(m_logger) << dec << _success << " imported OK, " << _unknown << " with unknown parents, "
725  << _future << " with future timestamps, " << _got << " already known received.";
726 }
727 
728 void BlockChainSync::onPeerNewBlock(NodeID const& _peerID, RLP const& _r)
729 {
730  RecursiveGuard l(x_sync);
732 
733  if (_r.itemCount() != 2)
734  {
735  m_host.disablePeer(_peerID, "NewBlock without 2 data fields.");
736  return;
737  }
738  BlockHeader info(_r[0][0].data(), HeaderData);
739  auto h = info.hash();
740  auto& peer = m_host.peer(_peerID);
741  peer.markBlockAsKnown(h);
742  unsigned blockNumber = static_cast<unsigned>(info.number());
743  if (blockNumber > (m_lastImportedBlock + 1))
744  {
745  LOG(m_loggerDetail) << "Received unknown new block";
746  // Update the hash of highest known block of the peer.
747  // syncPeer will then request the highest block header to properly restart syncing
748  peer.setLatestHash(h);
749  syncPeer(_peerID, true);
750  return;
751  }
752  switch (host().bq().import(_r[0].data()))
753  {
755  m_host.capabilityHost().updateRating(_peerID, 100);
756  logNewBlock(h);
757  if (blockNumber > m_lastImportedBlock)
758  {
759  m_lastImportedBlock = max(m_lastImportedBlock, blockNumber);
760  m_lastImportedBlockHash = h;
761  }
762  m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
763  m_downloadingBodies.erase(blockNumber);
764  m_downloadingHeaders.erase(blockNumber);
765  removeItem(m_headers, blockNumber);
766  removeItem(m_bodies, blockNumber);
767  if (m_headers.empty())
768  {
769  if (!m_bodies.empty())
770  {
771  LOG(m_loggerDetail)
772  << "Block headers map is empty, but block bodies map is not. Force-clearing.";
773  m_bodies.clear();
774  }
775  completeSync();
776  }
777  break;
779  //TODO: Rating dependent on how far in future it is.
780  break;
781 
784  logNewBlock(h);
785  m_host.disablePeer(_peerID, "Malformed block received.");
786  return;
787 
790  break;
791 
794  {
795  peer.incrementUnknownNewBlocks();
796  if (peer.unknownNewBlocks() > c_maxPeerUknownNewBlocks)
797  {
798  m_host.disablePeer(_peerID, "Too many uknown new blocks");
799  restartSync();
800  }
801  logNewBlock(h);
802  u256 totalDifficulty = _r[1].toInt<u256>();
803  if (totalDifficulty > peer.totalDifficulty())
804  {
805  LOG(m_loggerDetail) << "Received block with no known parent. Peer needs syncing...";
806  syncPeer(_peerID, true);
807  }
808  break;
809  }
810  default:;
811  }
812 }
813 
815 {
816  RecursiveGuard l(x_sync);
817  SyncStatus res;
818  res.state = m_state;
819  res.protocolVersion = 62;
820  res.startBlockNumber = m_startingBlock;
821  res.currentBlockNumber = host().chain().number();
822  res.highestBlockNumber = m_highestBlock;
823  return res;
824 }
825 
826 void BlockChainSync::resetSync()
827 {
828  m_downloadingHeaders.clear();
829  m_downloadingBodies.clear();
830  m_headers.clear();
831  m_bodies.clear();
832  m_headerSyncPeers.clear();
833  m_bodySyncPeers.clear();
834  m_headerIdToNumber.clear();
835  m_syncingTotalDifficulty = 0;
836  m_state = SyncState::NotSynced;
837 }
838 
840 {
841  RecursiveGuard l(x_sync);
842  resetSync();
843  m_highestBlock = 0;
844  m_haveCommonHeader = false;
845  host().bq().clear();
846  m_startingBlock = host().chain().number();
847  m_lastImportedBlock = m_startingBlock;
848  m_lastImportedBlockHash = host().chain().currentHash();
849 }
850 
852 {
853  RecursiveGuard l(x_sync);
854  resetSync();
855  m_state = SyncState::Idle;
856 }
857 
858 void BlockChainSync::pauseSync()
859 {
860  m_state = SyncState::Waiting;
861 }
862 
864 {
865  return m_state != SyncState::Idle;
866 }
867 
869  NodeID const& _peerID, std::vector<std::pair<h256, u256>> const& _hashes)
870 {
871  RecursiveGuard l(x_sync);
873 
874  auto& peer = m_host.peer(_peerID);
875  if (peer.isConversing())
876  {
877  LOG(m_loggerDetail) << "Ignoring new hashes since we're already downloading.";
878  return;
879  }
880  LOG(m_loggerDetail) << "Not syncing and new block hash discovered: syncing.";
881  unsigned knowns = 0;
882  unsigned unknowns = 0;
883  unsigned maxHeight = 0;
884  for (auto const& p: _hashes)
885  {
886  h256 const& h = p.first;
887  m_host.capabilityHost().updateRating(_peerID, 1);
888  peer.markBlockAsKnown(h);
889  auto status = host().bq().blockStatus(h);
890  if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h))
891  knowns++;
892  else if (status == QueueStatus::Bad)
893  {
894  cwarn << "block hash bad!" << h << ". Bailing...";
895  return;
896  }
897  else if (status == QueueStatus::Unknown)
898  {
899  unknowns++;
900  if (p.second > maxHeight)
901  {
902  maxHeight = (unsigned)p.second;
903  peer.setLatestHash(h);
904  }
905  }
906  else
907  knowns++;
908  }
909  LOG(m_logger) << knowns << " knowns, " << unknowns << " unknowns";
910  if (unknowns > 0)
911  {
912  LOG(m_loggerDetail) << "Not syncing and new block hash discovered: syncing.";
913  syncPeer(_peerID, true);
914  }
915 }
916 
918 {
919  RecursiveGuard l(x_sync);
920  // Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
921  clearPeerDownload();
922  continueSync();
924 }
925 
926 bool BlockChainSync::invariants() const
927 {
928  if (!isSyncing() && !m_headers.empty())
929  BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Got headers while not syncing"));
930  if (!isSyncing() && !m_bodies.empty())
931  BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Got bodies while not syncing"));
932  if (isSyncing() && m_host.chain().number() > 0 && m_haveCommonHeader && m_lastImportedBlock == 0)
933  BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Common block not found"));
934  if (isSyncing() && !m_headers.empty() && m_lastImportedBlock >= m_headers.begin()->first)
935  BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Header is too old"));
936  if (m_headerSyncPeers.empty() != m_downloadingHeaders.empty())
937  BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Header download map mismatch"));
938  if (m_bodySyncPeers.empty() != m_downloadingBodies.empty() && m_downloadingBodies.size() <= m_headerIdToNumber.size())
939  BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Body download map mismatch"));
940  return true;
941 }
dev::eth::BlockChain::currentHash
h256 currentHash() const
Get a given block (RLP format). Thread-safe.
Definition: BlockChain.h:228
dev::eth::QueueStatus::Bad
@ Bad
dev::eth::BlockChainSync::onPeerBlockBodies
void onPeerBlockBodies(NodeID const &_peerID, RLP const &_r)
Called by peer once it has new block bodies.
Definition: BlockChainSync.cpp:574
dev::eth::ImportResult::AlreadyKnown
@ AlreadyKnown
dev::EmptyTrie
h256 const EmptyTrie
Definition: TrieCommon.cpp:28
dev::eth::SyncState::NotSynced
@ NotSynced
Initial chain sync has not started yet.
dev::eth::EthereumPeer::requestBlockHeaders
void requestBlockHeaders(h256 const &_startHash, unsigned _count, unsigned _skip, bool _reverse)
Request hashes for given parent hash.
Definition: EthereumPeer.cpp:113
dev::eth::EthereumPeer::requestBlockBodies
void requestBlockBodies(h256s const &_blocks)
Request specified blocks from peer.
Definition: EthereumPeer.cpp:131
BlockQueue.h
dev::eth::SyncStatus::startBlockNumber
unsigned startBlockNumber
Definition: CommonNet.h:99
dev::eth::BlockHeader::hash
h256 hash(IncludeSeal _i=WithSeal) const
Definition: BlockHeader.cpp:119
dev::eth::SyncState::Waiting
@ Waiting
Block downloading paused. Waiting for block queue to process blocks and free space.
dev::eth::HeaderData
@ HeaderData
Definition: BlockHeader.h:69
dev::eth::BlockHeader
Encapsulation of a block header. Class to contain all of a block header's data. It is able to parse a...
Definition: BlockHeader.h:97
dev::eth::BlockHeader::sha3Uncles
h256 const & sha3Uncles() const
Definition: BlockHeader.h:158
dev::eth::BlockChainSync::isSyncing
bool isSyncing() const
Definition: BlockChainSync.cpp:863
dev::eth::SyncStatus
Definition: CommonNet.h:96
dev::sha3
bool sha3(bytesConstRef _input, bytesRef o_output) noexcept
Definition: SHA3.cpp:28
dev::eth::BlockChainSync::onPeerBlockHeaders
void onPeerBlockHeaders(NodeID const &_peerID, RLP const &_r)
Called by peer once it has new block headers during sync.
Definition: BlockChainSync.cpp:430
dev::trieRootOver
h256 trieRootOver(unsigned _itemCount, T const &_getKey, U const &_getValue)
Definition: TrieHash.h:36
dev::eth::BlockChainSync::onPeerNewBlock
void onPeerNewBlock(NodeID const &_peerID, RLP const &_r)
Called by peer once it has new block bodies.
Definition: BlockChainSync.cpp:728
dev::eth::BlockChainSync::abortSync
void abortSync()
Abort all sync activity.
Definition: BlockChainSync.cpp:174
dev::VerbosityWarning
@ VerbosityWarning
Definition: Log.h:69
dev::eth::SyncStatus::protocolVersion
unsigned protocolVersion
Definition: CommonNet.h:98
dev::eth::c_infiniteBlockNumber
constexpr int64_t c_infiniteBlockNumber
Definition: ChainOperationParams.h:66
Exceptions.h
dev::eth::EthereumPeer::markBlockAsKnown
void markBlockAsKnown(h256 const &_hash)
Definition: EthereumPeer.h:67
dev::eth::ImportResult::BadChain
@ BadChain
dev::eth::BlockChain::number
unsigned number(h256 const &_hash) const
Get a number for the given hash (or the most recent mined if none given). Thread-safe.
Definition: BlockChain.h:224
dev::eth::ImportResult::FutureTimeUnknown
@ FutureTimeUnknown
dev::eth::QueueStatus::Ready
@ Ready
dev::eth
Definition: BasicAuthority.h:32
dev::eth::SealEngineFace::chainParams
ChainOperationParams const & chainParams() const
Definition: SealEngine.h:80
dev::eth::BlockChain::sealEngine
SealEngineFace * sealEngine() const
Definition: BlockChain.h:308
dev::eth::EthereumCapability::peer
EthereumPeer const & peer(NodeID const &_peerID) const
Definition: EthereumCapability.cpp:935
dev::eth::BlockChainSync::onBlockImported
void onBlockImported(BlockHeader const &_info)
Called when a blockchain has imported a new block onto the DB.
Definition: BlockChainSync.cpp:161
dev::eth::ChainOperationParams::daoHardforkBlock
u256 daoHardforkBlock
Definition: ChainOperationParams.h:97
dev::eth::ImportResult::Malformed
@ Malformed
dev::FixedHash< 32 >
dev::eth::QueueStatus::Importing
@ Importing
dev::RecursiveGuard
std::lock_guard< std::recursive_mutex > RecursiveGuard
Definition: Guards.h:43
dev::eth::BlockQueue::difficulty
u256 difficulty() const
Definition: BlockQueue.cpp:530
dev::eth::EthereumCapability::chain
BlockChain const & chain() const
Definition: EthereumCapability.h:111
DEV_INVARIANT_CHECK_HERE
#define DEV_INVARIANT_CHECK_HERE
Definition: Common.h:239
LOG
#define LOG
Definition: Log.h:63
dev::eth::BlockChainSync::onPeerAborting
void onPeerAborting()
Called by peer when it is disconnecting.
Definition: BlockChainSync.cpp:917
dev::eth::BlockChain::details
BlockDetails details(h256 const &_hash) const
Get the familial details concerning a block (or the most recent mined if none given)....
Definition: BlockChain.h:157
dev::eth::BlockQueue::clear
void clear()
Clear everything.
Definition: BlockQueue.cpp:67
EthereumCapability.h
dev::eth::EthereumPeer::validate
std::string validate(h256 const &_hostGenesisHash, unsigned _hostProtocolVersion, u256 const &_hostNetworkId) const
Definition: EthereumPeer.cpp:67
dev::eth::EthereumPeer
Definition: EthereumPeer.h:32
dev::h256s
std::vector< h256 > h256s
Definition: FixedHash.h:361
Common.h
dev::eth::BlockDetails::totalDifficulty
u256 totalDifficulty
Definition: BlockDetails.h:52
dev::eth::SyncStatus::highestBlockNumber
unsigned highestBlockNumber
Definition: CommonNet.h:101
dev::eth::BlockHeader::parentHash
h256 const & parentHash() const
Definition: BlockHeader.h:157
dev::eth::ImportResult::AlreadyInChain
@ AlreadyInChain
dev::eth::BlockChainSync::onPeerNewHashes
void onPeerNewHashes(NodeID const &_peerID, std::vector< std::pair< h256, u256 >> const &_hashes)
Definition: BlockChainSync.cpp:868
DEV_INVARIANT_CHECK
#define DEV_INVARIANT_CHECK
Scope guard for invariant check in a class derived from HasInvariants.
Definition: Common.h:238
dev::eth::BlockQueue::blockStatus
QueueStatus blockStatus(h256 const &_h) const
Get some infomration on the given block's status regarding us.
Definition: BlockQueue.cpp:387
dev::eth::BlockChain::numberHash
h256 numberHash(unsigned _i) const
Get the hash for a given block's number.
Definition: BlockChain.h:184
dev::eth::EthereumCapability::name
std::string name() const override
Definition: EthereumCapability.h:91
dev::bytes
std::vector< byte > bytes
Definition: Common.h:72
dev::eth::SyncStatus::currentBlockNumber
unsigned currentBlockNumber
Definition: CommonNet.h:100
dev::eth::NodeID
p2p::NodeID NodeID
Definition: CommonNet.h:105
dev::eth::BlockHeader::number
int64_t number() const
Definition: BlockHeader.h:166
dev::eth::BlockChainSync::status
SyncStatus status() const
Definition: BlockChainSync.cpp:814
TrieHash.h
dev::eth::SyncStatus::state
SyncState state
Definition: CommonNet.h:97
BlockChain.h
dev::eth::BlockChainSync::completeSync
void completeSync()
Definition: BlockChainSync.cpp:851
dev::RLP::data
bytesConstRef data() const
The bare data of the RLP.
Definition: RLP.h:80
dev::eth::EthereumPeer::id
NodeID id() const
Definition: EthereumPeer.h:46
dev::RLPStream
Class for writing to an RLP bytestream.
Definition: RLP.h:370
dev::eth::EthereumCapability::disablePeer
void disablePeer(NodeID const &_peerID, std::string const &_problem)
Definition: EthereumCapability.cpp:930
dev::eth::BlockHeader::transactionsRoot
h256 const & transactionsRoot() const
Definition: BlockHeader.h:163
dev::vector_ref::toBytes
std::vector< unsigned char > toBytes() const
Definition: vector_ref.h:43
dev::RLP::itemCount
size_t itemCount() const
Definition: RLP.h:101
dev::eth::BlockChainSync::onPeerStatus
void onPeerStatus(EthereumPeer const &_peer)
Called by peer to report status.
Definition: BlockChainSync.cpp:181
std
Definition: FixedHash.h:393
dev::eth::Success
@ Success
Definition: Common.h:223
BlockChainSync.h
dev::u256
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 256, 256, boost::multiprecision::unsigned_magnitude, boost::multiprecision::unchecked, void > > u256
Definition: Common.h:121
dev::RLP::toInt
_T toInt(int _flags=Strict) const
Converts to int of type given; if isData(), decodes as big-endian bytestream.
Definition: RLP.h:257
dev::eth::EthereumPeer::isConversing
bool isConversing() const
Definition: EthereumPeer.h:54
dev::eth::EthereumCapability
The EthereumCapability class.
Definition: EthereumCapability.h:85
dev::eth::operator<<
std::ostream & operator<<(std::ostream &_out, BlockHeader const &_bi)
Definition: BlockHeader.h:217
dev::eth::EthereumCapability::bq
BlockQueue & bq()
Definition: EthereumCapability.h:113
dev::RLPStream::swapOut
void swapOut(bytes &_dest)
Swap the contents of the output stream out for some other byte array.
Definition: RLP.h:425
dev::eth::SyncState::Blocks
@ Blocks
Downloading blocks.
dev
Definition: Address.cpp:21
dev::eth::BlockChainSync::restartSync
void restartSync()
Restart sync.
Definition: BlockChainSync.cpp:839
dev::EmptyListSHA3
h256 const EmptyListSHA3
Definition: SHA3.cpp:26
dev::eth::ImportResult::FutureTimeKnown
@ FutureTimeKnown
dev::eth::BlockQueue::onRoomAvailable
Handler onRoomAvailable(std::function< void(void)> _t)
Definition: BlockQueue.h:270
dev::eth::QueueStatus::Unknown
@ Unknown
clog
#define clog(SEVERITY, CHANNEL)
dev::eth::EthereumPeer::totalDifficulty
u256 totalDifficulty() const
Definition: EthereumPeer.h:48
cwarn
#define cwarn
dev::RLPStream::appendRaw
RLPStream & appendRaw(bytesConstRef _rlp, size_t _itemCount=1)
Appends raw (pre-serialised) RLP data. Use with caution.
Definition: RLP.cpp:222
dev::eth::SyncState::Idle
@ Idle
Initial chain sync complete. Waiting for new packets.
dev::eth::ImportResult::UnknownParent
@ UnknownParent
dev::eth::BlockChainSync::~BlockChainSync
~BlockChainSync()
Definition: BlockChainSync.cpp:155
dev::RLP
Definition: RLP.h:48
dev::RLPEmptyList
bytes RLPEmptyList
The empty list in RLP format.
Definition: RLP.cpp:23
dev::fromHex
bytes fromHex(std::string const &_s, WhenError _throw=WhenError::DontThrow)
Definition: CommonData.cpp:81
dev::eth::EthereumCapability::capabilityHost
p2p::CapabilityHostFace & capabilityHost()
Definition: EthereumCapability.h:129
dev::errinfo_comment
boost::error_info< struct tag_comment, std::string > errinfo_comment
Definition: Assertions.h:69