13 #include <libp2p/Host.h>
14 #include <libp2p/Session.h>
24 _out <<
"state: " << EthereumCapability::stateName(_sync.
state) <<
" ";
25 if (_sync.
state == SyncState::Blocks)
33 constexpr
unsigned c_maxPeerUknownNewBlocks = 1024;
34 constexpr
unsigned c_maxRequestHeaders = 1024;
35 constexpr
unsigned c_maxRequestBodies = 1024;
37 template<
typename T>
bool haveItem(std::map<unsigned, T>& _container,
unsigned _number)
39 if (_container.empty())
41 auto lower = _container.lower_bound(_number);
42 if (lower != _container.end() && lower->first == _number)
44 if (lower == _container.begin())
47 return lower->first <= _number && (lower->first + lower->second.size()) > _number;
50 template<
typename T> T
const* findItem(std::map<
unsigned, std::vector<T>>& _container,
unsigned _number)
52 if (_container.empty())
54 auto lower = _container.lower_bound(_number);
55 if (lower != _container.end() && lower->first == _number)
56 return &(*lower->second.begin());
57 if (lower == _container.begin())
60 if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
61 return &lower->second.at(_number - lower->first);
65 template<
typename T>
void removeItem(std::map<
unsigned, std::vector<T>>& _container,
unsigned _number)
67 if (_container.empty())
69 auto lower = _container.lower_bound(_number);
70 if (lower != _container.end() && lower->first == _number)
72 _container.erase(lower);
75 if (lower == _container.begin())
78 if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
79 lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
82 template<
typename T>
void removeAllStartingWith(std::map<
unsigned, std::vector<T>>& _container,
unsigned _number)
84 if (_container.empty())
86 auto lower = _container.lower_bound(_number);
87 if (lower != _container.end() && lower->first == _number)
89 _container.erase(lower, _container.end());
92 if (lower == _container.begin())
98 if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
99 lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
100 _container.erase(++lower, _container.end());
103 template<
typename T>
void mergeInto(std::map<
unsigned, std::vector<T>>& _container,
unsigned _number, T&& _data)
105 assert(!haveItem(_container, _number));
106 auto lower = _container.lower_bound(_number);
107 if (!_container.empty() && lower != _container.begin())
109 if (lower != _container.end() && (lower->first + lower->second.size() == _number))
112 lower->second.emplace_back(_data);
116 if (next != _container.end() && (lower->first + lower->second.size() == next->first))
119 std::move(next->second.begin(), next->second.end(), std::back_inserter(lower->second));
120 _container.erase(next);
127 auto inserted = _container.insert(lower, std::make_pair(_number, std::vector<T> { _data }));
128 auto next = inserted;
130 if (next != _container.end() && next->first == _number + 1)
132 std::move(next->second.begin(), next->second.end(), std::back_inserter(inserted->second));
133 _container.erase(next);
142 m_chainStartBlock(_host.chain().chainStartBlockNumber()),
143 m_startingBlock(_host.chain().number()),
144 m_lastImportedBlock(m_startingBlock),
145 m_lastImportedBlockHash(_host.chain().currentHash())
166 if (_info.
number() > m_lastImportedBlock)
168 m_lastImportedBlock =
static_cast<unsigned>(_info.
number());
169 m_lastImportedBlockHash = _info.
hash();
170 m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
187 if (!peerSessionInfo)
190 std::string disconnectReason;
191 if (peerSessionInfo->clientVersion.find(
"/v0.7.0/") != string::npos)
192 disconnectReason =
"Blacklisted client version.";
195 host().chain().genesisHash(), host().protocolVersion(), host().networkId());
197 if (!disconnectReason.empty())
199 LOG(m_logger) <<
"Peer not suitable for sync: " << disconnectReason;
205 if (!requestDaoForkBlockHeader(_peer.
id()))
208 syncPeer(_peer.
id(),
false);
212 bool BlockChainSync::requestDaoForkBlockHeader(
NodeID const& _peerID)
219 m_daoChallengedPeers.insert(_peerID);
224 void BlockChainSync::syncPeer(
NodeID const& _peerID,
bool _force)
228 LOG(m_loggerDetail) <<
"Can't sync with this peer - outstanding asks.";
236 if (host().bq().isActive())
239 u256 syncingDifficulty = std::max(m_syncingTotalDifficulty, td);
241 auto& peer = m_host.
peer(_peerID);
244 if (_force || peerTotalDifficulty > syncingDifficulty)
246 if (peerTotalDifficulty > syncingDifficulty)
247 LOG(m_logger) <<
"Discovered new highest difficulty";
250 m_syncingTotalDifficulty = peerTotalDifficulty;
253 LOG(m_loggerInfo) <<
"Starting full sync";
256 peer.requestBlockHeaders(peer.latestHash(), 1, 0,
false);
257 peer.setWaitingForTransactions(
true);
263 requestBlocks(_peerID);
268 void BlockChainSync::continueSync()
271 syncPeer(_peerID, false);
276 void BlockChainSync::requestBlocks(
NodeID const& _peerID)
278 clearPeerDownload(_peerID);
279 if (host().bq().knownFull())
281 LOG(m_loggerDetail) <<
"Waiting for block queue before downloading blocks";
286 auto header = m_headers.begin();
288 vector<unsigned> neededNumbers;
290 if (m_haveCommonHeader && !m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1)
292 while (header != m_headers.end() && neededBodies.size() < c_maxRequestBodies && index < header->second.size())
294 unsigned block = header->first + index;
295 if (m_downloadingBodies.count(block) == 0 && !haveItem(m_bodies, block))
297 neededBodies.push_back(header->second[index].hash);
298 neededNumbers.push_back(block);
299 m_downloadingBodies.insert(block);
303 if (index >= header->second.size())
307 if (neededBodies.size() > 0)
309 m_bodySyncPeers[_peerID] = neededNumbers;
316 if (!m_haveCommonHeader)
319 start = m_lastImportedBlock;
320 if (!m_headers.empty())
321 start = std::min(start, m_headers.begin()->first - 1);
322 m_lastImportedBlock = start;
325 if (start <= m_chainStartBlock + 1)
326 m_haveCommonHeader =
true;
328 if (m_haveCommonHeader)
330 start = m_lastImportedBlock + 1;
331 auto next = m_headers.begin();
333 if (!m_headers.empty() && start >= m_headers.begin()->first)
335 start = m_headers.begin()->first + m_headers.begin()->second.size();
339 while (count == 0 && next != m_headers.end())
341 count = std::min(c_maxRequestHeaders, next->first - start);
342 while(count > 0 && m_downloadingHeaders.count(start) != 0)
347 std::vector<unsigned> headers;
348 for (
unsigned block = start; block < start + count; block++)
349 if (m_downloadingHeaders.count(block) == 0)
351 headers.push_back(block);
352 m_downloadingHeaders.insert(block);
354 count = headers.size();
357 m_headerSyncPeers[_peerID] = headers;
358 assert(!haveItem(m_headers, start));
361 else if (start >= next->first)
363 start = next->first + next->second.size();
373 void BlockChainSync::clearPeerDownload(
NodeID const& _peerID)
375 auto syncPeer = m_headerSyncPeers.find(_peerID);
376 if (syncPeer != m_headerSyncPeers.end())
378 for (
unsigned block : syncPeer->second)
379 m_downloadingHeaders.erase(block);
380 m_headerSyncPeers.erase(syncPeer);
382 syncPeer = m_bodySyncPeers.find(_peerID);
383 if (syncPeer != m_bodySyncPeers.end())
385 for (
unsigned block : syncPeer->second)
386 m_downloadingBodies.erase(block);
387 m_bodySyncPeers.erase(syncPeer);
389 m_daoChallengedPeers.erase(_peerID);
392 void BlockChainSync::clearPeerDownload()
394 for (
auto s = m_headerSyncPeers.begin(); s != m_headerSyncPeers.end();)
398 for (
unsigned block : s->second)
399 m_downloadingHeaders.erase(block);
400 m_headerSyncPeers.erase(s++);
405 for (
auto s = m_bodySyncPeers.begin(); s != m_bodySyncPeers.end();)
409 for (
unsigned block : s->second)
410 m_downloadingBodies.erase(block);
411 m_bodySyncPeers.erase(s++);
416 for (
auto s = m_daoChallengedPeers.begin(); s != m_daoChallengedPeers.end();)
419 m_daoChallengedPeers.erase(s++);
425 void BlockChainSync::logNewBlock(
h256 const& _h)
427 m_knownNewHashes.erase(_h);
435 LOG(m_logger) <<
"BlocksHeaders (" << dec << itemCount <<
" entries) "
436 << (itemCount ?
"" :
": NoMoreHeaders");
438 if (m_daoChallengedPeers.find(_peerID) != m_daoChallengedPeers.end())
440 if (verifyDaoChallengeResponse(_r))
441 syncPeer(_peerID,
false);
443 m_host.
disablePeer(_peerID,
"Peer from another fork.");
445 m_daoChallengedPeers.erase(_peerID);
449 clearPeerDownload(_peerID);
452 LOG(m_logger) <<
"Ignoring unexpected blocks";
457 LOG(m_loggerDetail) <<
"Ignored blocks while waiting";
462 LOG(m_loggerDetail) <<
"Peer does not have the blocks requested";
465 for (
unsigned i = 0; i < itemCount; i++)
468 unsigned blockNumber =
static_cast<unsigned>(info.
number());
469 if (blockNumber < m_chainStartBlock)
471 LOG(m_logger) <<
"Skipping too old header " << blockNumber;
474 if (haveItem(m_headers, blockNumber))
476 LOG(m_logger) <<
"Skipping header " << blockNumber <<
" (already downloaded)";
479 if (blockNumber <= m_lastImportedBlock && m_haveCommonHeader)
481 LOG(m_logger) <<
"Skipping header " << blockNumber <<
" (already imported)";
484 if (blockNumber > m_highestBlock)
485 m_highestBlock = blockNumber;
490 m_haveCommonHeader =
true;
491 m_lastImportedBlock = (unsigned)info.
number();
492 m_lastImportedBlockHash = info.
hash();
494 if (!m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1 &&
495 m_headers.begin()->second[0].parent != m_lastImportedBlockHash)
500 <<
"Unknown parent of the downloaded headers, restarting sync";
511 if (m_haveCommonHeader)
513 Header
const* prevBlock = findItem(m_headers, blockNumber - 1);
514 if ((prevBlock && prevBlock->hash != info.
parentHash()) || (blockNumber == m_lastImportedBlock + 1 && info.
parentHash() != m_lastImportedBlockHash))
518 << info.
hash() <<
" (Restart syncing)";
524 Header
const* nextBlock = findItem(m_headers, blockNumber + 1);
525 if (nextBlock && nextBlock->parent != info.
hash())
528 <<
"Unknown block header " << blockNumber + 1 <<
" " << nextBlock->hash;
530 unsigned n = blockNumber + 1;
531 auto headers = m_headers.at(n);
532 for (
auto const& h : headers)
535 m_headerIdToNumber.erase(headerId);
536 m_downloadingBodies.erase(n);
537 m_downloadingHeaders.erase(n);
540 removeAllStartingWith(m_headers, blockNumber + 1);
541 removeAllStartingWith(m_bodies, blockNumber + 1);
545 mergeInto(m_headers, blockNumber, std::move(hdr));
554 mergeInto(m_bodies, blockNumber, std::move(body));
557 m_headerIdToNumber[headerId] = blockNumber;
564 bool BlockChainSync::verifyDaoChallengeResponse(
RLP const& _r)
571 info.extraData() ==
fromHex(
"0x64616f2d686172642d666f726b");
579 LOG(m_logger) <<
"BlocksBodies (" << dec << itemCount <<
" entries) "
580 << (itemCount ?
"" :
": NoMoreBodies");
581 clearPeerDownload(_peerID);
583 LOG(m_logger) <<
"Ignoring unexpected blocks";
588 LOG(m_loggerDetail) <<
"Ignored blocks while waiting";
593 LOG(m_loggerDetail) <<
"Peer does not have the blocks requested";
596 for (
unsigned i = 0; i < itemCount; i++)
600 auto txList = body[0];
601 h256 transactionRoot =
trieRootOver(txList.itemCount(), [&](
unsigned i){ return rlp(i); }, [&](
unsigned i){ return txList[i].data().toBytes(); });
603 HeaderId
id { transactionRoot, uncles };
604 auto iter = m_headerIdToNumber.find(
id);
605 if (iter == m_headerIdToNumber.end() || !haveItem(m_headers, iter->second))
607 LOG(m_loggerDetail) <<
"Ignored unknown block body";
610 unsigned blockNumber = iter->second;
611 if (haveItem(m_bodies, blockNumber))
613 LOG(m_logger) <<
"Skipping already downloaded block body " << blockNumber;
616 m_headerIdToNumber.erase(
id);
617 mergeInto(m_bodies, blockNumber, body.
data().
toBytes());
623 void BlockChainSync::collectBlocks()
625 if (!m_haveCommonHeader || m_headers.empty() || m_bodies.empty())
629 auto& headers = *m_headers.begin();
630 auto& bodies = *m_bodies.begin();
631 if (headers.first != bodies.first || headers.first != m_lastImportedBlock + 1)
634 unsigned success = 0;
637 unsigned unknown = 0;
639 for (; i < headers.second.size() && i < bodies.second.size(); i++)
642 blockStream.appendRaw(headers.second[i].data);
643 RLP body(bodies.second[i]);
644 blockStream.appendRaw(body[0].data());
645 blockStream.appendRaw(body[1].data());
647 blockStream.swapOut(block);
648 switch (host().bq().
import(&block))
652 if (headers.first + i > m_lastImportedBlock)
654 m_lastImportedBlock = headers.first + (unsigned)i;
655 m_lastImportedBlockHash = headers.second[i].hash;
659 LOG(m_logger) <<
"Malformed block #" << headers.first + i <<
". Restarting sync.";
663 LOG(m_logger) <<
"Block from the bad chain, block #" << headers.first + i
664 <<
". Restarting sync.";
676 if (headers.first + i > m_lastImportedBlock)
678 logImported(success, future, got, unknown);
680 <<
"Already known or future time & unknown parent or unknown parent, block #"
681 << headers.first + i <<
". Resetting sync.";
683 m_haveCommonHeader =
false;
691 logImported(success, future, got, unknown);
693 if (host().bq().unknownFull())
700 auto newHeaders = std::move(headers.second);
701 newHeaders.erase(newHeaders.begin(), newHeaders.begin() + i);
702 unsigned newHeaderHead = headers.first + i;
703 auto newBodies = std::move(bodies.second);
704 newBodies.erase(newBodies.begin(), newBodies.begin() + i);
705 unsigned newBodiesHead = bodies.first + i;
706 m_headers.erase(m_headers.begin());
707 m_bodies.erase(m_bodies.begin());
708 if (!newHeaders.empty())
709 m_headers[newHeaderHead] = newHeaders;
710 if (!newBodies.empty())
711 m_bodies[newBodiesHead] = newBodies;
713 if (m_headers.empty())
715 assert(m_bodies.empty());
721 void BlockChainSync::logImported(
722 unsigned _success,
unsigned _future,
unsigned _got,
unsigned _unknown)
724 LOG(m_logger) << dec << _success <<
" imported OK, " << _unknown <<
" with unknown parents, "
725 << _future <<
" with future timestamps, " << _got <<
" already known received.";
735 m_host.
disablePeer(_peerID,
"NewBlock without 2 data fields.");
739 auto h = info.
hash();
740 auto& peer = m_host.
peer(_peerID);
742 unsigned blockNumber =
static_cast<unsigned>(info.
number());
743 if (blockNumber > (m_lastImportedBlock + 1))
745 LOG(m_loggerDetail) <<
"Received unknown new block";
748 peer.setLatestHash(h);
749 syncPeer(_peerID,
true);
752 switch (host().bq().
import(_r[0].data()))
757 if (blockNumber > m_lastImportedBlock)
759 m_lastImportedBlock = max(m_lastImportedBlock, blockNumber);
760 m_lastImportedBlockHash = h;
762 m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
763 m_downloadingBodies.erase(blockNumber);
764 m_downloadingHeaders.erase(blockNumber);
765 removeItem(m_headers, blockNumber);
766 removeItem(m_bodies, blockNumber);
767 if (m_headers.empty())
769 if (!m_bodies.empty())
772 <<
"Block headers map is empty, but block bodies map is not. Force-clearing.";
785 m_host.
disablePeer(_peerID,
"Malformed block received.");
795 peer.incrementUnknownNewBlocks();
796 if (peer.unknownNewBlocks() > c_maxPeerUknownNewBlocks)
798 m_host.
disablePeer(_peerID,
"Too many uknown new blocks");
803 if (totalDifficulty > peer.totalDifficulty())
805 LOG(m_loggerDetail) <<
"Received block with no known parent. Peer needs syncing...";
806 syncPeer(_peerID,
true);
826 void BlockChainSync::resetSync()
828 m_downloadingHeaders.clear();
829 m_downloadingBodies.clear();
832 m_headerSyncPeers.clear();
833 m_bodySyncPeers.clear();
834 m_headerIdToNumber.clear();
835 m_syncingTotalDifficulty = 0;
844 m_haveCommonHeader =
false;
847 m_lastImportedBlock = m_startingBlock;
858 void BlockChainSync::pauseSync()
869 NodeID const& _peerID, std::vector<std::pair<h256, u256>>
const& _hashes)
874 auto& peer = m_host.
peer(_peerID);
875 if (peer.isConversing())
877 LOG(m_loggerDetail) <<
"Ignoring new hashes since we're already downloading.";
880 LOG(m_loggerDetail) <<
"Not syncing and new block hash discovered: syncing.";
882 unsigned unknowns = 0;
883 unsigned maxHeight = 0;
884 for (
auto const& p: _hashes)
886 h256 const& h = p.first;
888 peer.markBlockAsKnown(h);
894 cwarn <<
"block hash bad!" << h <<
". Bailing...";
900 if (p.second > maxHeight)
902 maxHeight = (unsigned)p.second;
903 peer.setLatestHash(h);
909 LOG(m_logger) << knowns <<
" knowns, " << unknowns <<
" unknowns";
912 LOG(m_loggerDetail) <<
"Not syncing and new block hash discovered: syncing.";
913 syncPeer(_peerID,
true);
926 bool BlockChainSync::invariants()
const
929 BOOST_THROW_EXCEPTION(FailedInvariant() <<
errinfo_comment(
"Got headers while not syncing"));
931 BOOST_THROW_EXCEPTION(FailedInvariant() <<
errinfo_comment(
"Got bodies while not syncing"));
932 if (
isSyncing() && m_host.
chain().
number() > 0 && m_haveCommonHeader && m_lastImportedBlock == 0)
933 BOOST_THROW_EXCEPTION(FailedInvariant() <<
errinfo_comment(
"Common block not found"));
934 if (
isSyncing() && !m_headers.empty() && m_lastImportedBlock >= m_headers.begin()->first)
935 BOOST_THROW_EXCEPTION(FailedInvariant() <<
errinfo_comment(
"Header is too old"));
936 if (m_headerSyncPeers.empty() != m_downloadingHeaders.empty())
937 BOOST_THROW_EXCEPTION(FailedInvariant() <<
errinfo_comment(
"Header download map mismatch"));
938 if (m_bodySyncPeers.empty() != m_downloadingBodies.empty() && m_downloadingBodies.size() <= m_headerIdToNumber.size())
939 BOOST_THROW_EXCEPTION(FailedInvariant() <<
errinfo_comment(
"Body download map mismatch"));