11 #include <libp2p/Common.h>
12 #include <libp2p/Host.h>
13 #include <libp2p/Session.h>
21 char const*
const EthereumCapability::c_stateNames[
static_cast<int>(SyncState::Size)] = {
22 "NotSynced",
"Idle",
"Waiting",
"Blocks",
"State"};
24 std::chrono::milliseconds constexpr EthereumCapability::c_backgroundWorkInterval;
28 constexpr
unsigned c_maxSendTransactions = 256;
29 constexpr
unsigned c_maxHeadersToSend = 1024;
30 constexpr
unsigned c_maxIncomingNewHashes = 1024;
31 constexpr
unsigned c_peerTimeoutSeconds = 10;
32 constexpr
int c_minBlockBroadcastPeers = 4;
38 case Asking::BlockHeaders:
39 return "BlockHeaders";
40 case Asking::BlockBodies:
42 case Asking::NodeData:
44 case Asking::Receipts:
50 case Asking::WarpManifest:
51 return "WarpManifest";
52 case Asking::WarpData:
61 EthereumPeerObserver(shared_ptr<BlockChainSync> _sync,
TransactionQueue& _tq): m_sync(_sync), m_tq(_tq) {}
67 m_sync->onPeerStatus(_peer);
69 catch (FailedInvariant
const&)
72 cwarn <<
"Failed invariant during sync, restarting sync";
73 m_sync->restartSync();
77 void onPeerTransactions(
NodeID const& _peerID,
RLP const& _r)
override
80 LOG(m_logger) <<
"Transactions (" << dec << itemCount <<
" entries)";
81 m_tq.enqueue(_r, _peerID);
84 void onPeerAborting()
override
88 m_sync->onPeerAborting();
92 cwarn <<
"Exception on peer destruciton: " << boost::current_exception_diagnostic_information();
96 void onPeerBlockHeaders(
NodeID const& _peerID,
RLP const& _headers)
override
100 m_sync->onPeerBlockHeaders(_peerID, _headers);
102 catch (FailedInvariant
const&)
105 cwarn <<
"Failed invariant during sync, restarting sync";
106 m_sync->restartSync();
110 void onPeerBlockBodies(
NodeID const& _peerID,
RLP const& _r)
override
114 m_sync->onPeerBlockBodies(_peerID, _r);
116 catch (FailedInvariant
const&)
119 cwarn <<
"Failed invariant during sync, restarting sync";
120 m_sync->restartSync();
124 void onPeerNewHashes(
125 NodeID const& _peerID, std::vector<std::pair<h256, u256>>
const& _hashes)
override
129 m_sync->onPeerNewHashes(_peerID, _hashes);
131 catch (FailedInvariant
const&)
134 cwarn <<
"Failed invariant during sync, restarting sync";
135 m_sync->restartSync();
139 void onPeerNewBlock(
NodeID const& _peerID,
RLP const& _r)
override
143 m_sync->onPeerNewBlock(_peerID, _r);
145 catch (FailedInvariant
const&)
148 cwarn <<
"Failed invariant during sync, restarting sync";
149 m_sync->restartSync();
153 void onPeerNodeData(
NodeID const& ,
RLP const& _r)
override
156 LOG(m_logger) <<
"Node Data (" << dec << itemCount <<
" entries)";
159 void onPeerReceipts(
NodeID const& ,
RLP const& _r)
override
162 LOG(m_logger) <<
"Receipts (" << dec << itemCount <<
" entries)";
166 shared_ptr<BlockChainSync> m_sync;
175 EthereumHostData(
BlockChain const& _chain,
OverlayDB const& _db): m_chain(_chain), m_db(_db) {}
177 pair<bytes, unsigned> blockHeaders(
RLP const& _blockId,
unsigned _maxHeaders,
u256 _skip,
bool _reverse)
const override
179 auto numHeadersToSend = _maxHeaders;
181 auto step =
static_cast<unsigned>(_skip) + 1;
182 assert(step > 0 &&
"step must not be 0");
185 if (_blockId.
size() == 32)
188 cnetlog <<
"GetBlockHeaders (block (hash): " << blockHash
189 <<
", maxHeaders: " << _maxHeaders <<
", skip: " << _skip
190 <<
", reverse: " << _reverse <<
")";
192 if (!m_chain.isKnown(blockHash))
196 auto n = m_chain.number(blockHash);
197 if (numHeadersToSend == 0)
199 else if (n != 0 || blockHash == m_chain.genesisHash())
201 auto top = n + uint64_t(step) * numHeadersToSend - 1;
202 auto lastBlock = m_chain.number();
205 numHeadersToSend = (lastBlock - n) / step + 1;
206 top = n + step * (numHeadersToSend - 1);
208 assert(top <= lastBlock &&
"invalid top block calculated");
209 blockHash = m_chain.numberHash(
static_cast<unsigned>(top));
218 cnetlog <<
"GetBlockHeaders (" << n <<
" max: " << _maxHeaders <<
" skip: " << _skip
219 << (_reverse ?
" reverse" :
"") <<
")";
223 auto lastBlock = m_chain.number();
224 if (n > lastBlock || numHeadersToSend == 0)
228 bigint top = n + uint64_t(step) * (numHeadersToSend - 1);
231 numHeadersToSend = (lastBlock -
static_cast<unsigned>(n)) / step + 1;
232 top = n + step * (numHeadersToSend - 1);
234 assert(top <= lastBlock &&
"invalid top block calculated");
235 blockHash = m_chain.numberHash(
static_cast<unsigned>(top));
238 else if (n <= std::numeric_limits<unsigned>::max())
239 blockHash = m_chain.numberHash(
static_cast<unsigned>(n));
244 auto nextHash = [
this](
h256 _h,
unsigned _step)
246 static const unsigned c_blockNumberUsageLimit = 1000;
248 const auto lastBlock = m_chain.number();
249 const auto limitBlock = lastBlock > c_blockNumberUsageLimit ? lastBlock - c_blockNumberUsageLimit : 0;
253 auto details = m_chain.details(_h);
254 if (details.number < limitBlock)
262 auto n = m_chain.number(_h);
264 _h = m_chain.numberHash(n - _step);
274 unsigned itemCount = 0;
276 for (
unsigned i = 0; i != numHeadersToSend; ++i)
278 if (!blockHash || !m_chain.isKnown(blockHash))
281 hashes.push_back(blockHash);
284 blockHash = nextHash(blockHash, step);
287 for (
unsigned i = 0; i < hashes.size() &&
rlp.size() < c_maxPayload; ++i)
288 rlp += m_chain.headerData(hashes[_reverse ? i : hashes.size() - 1 - i]);
290 return make_pair(
rlp, itemCount);
293 pair<bytes, unsigned> blockBodies(
RLP const& _blockHashes)
const override
295 unsigned const count =
static_cast<unsigned>(_blockHashes.
itemCount());
299 auto numBodiesToSend = std::min(count, c_maxBlocks);
300 for (
unsigned i = 0; i < numBodiesToSend &&
rlp.size() < c_maxPayload; ++i)
303 if (m_chain.isKnown(h))
305 bytes blockBytes = m_chain.block(h);
306 RLP block{blockBytes};
311 auto bodyBytes = body.
out();
312 rlp.insert(
rlp.end(), bodyBytes.begin(), bodyBytes.end());
316 if (count > 20 && n == 0)
317 cnetlog <<
"all " << count <<
" unknown blocks requested; peer on different chain?";
319 cnetlog << n <<
" blocks known and returned; " << (numBodiesToSend - n)
320 <<
" blocks unknown; " << (count > c_maxBlocks ? count - c_maxBlocks : 0)
321 <<
" blocks ignored";
323 return make_pair(
rlp, n);
326 strings nodeData(
RLP const& _dataHashes)
const override
328 unsigned const count =
static_cast<unsigned>(_dataHashes.
itemCount());
331 size_t payloadSize = 0;
332 auto numItemsToSend = std::min(count, c_maxNodes);
333 for (
unsigned i = 0; i < numItemsToSend && payloadSize < c_maxPayload; ++i)
336 auto node = m_db.lookup(h);
339 payloadSize += node.length();
340 data.push_back(move(node));
343 cnetlog << data.
size() <<
" nodes known and returned; " << (numItemsToSend - data.size())
344 <<
" unknown; " << (count > c_maxNodes ? count - c_maxNodes : 0) <<
" ignored";
349 pair<bytes, unsigned> receipts(
RLP const& _blockHashes)
const override
351 unsigned const count =
static_cast<unsigned>(_blockHashes.
itemCount());
355 auto numItemsToSend = std::min(count, c_maxReceipts);
356 for (
unsigned i = 0; i < numItemsToSend &&
rlp.size() < c_maxPayload; ++i)
359 if (m_chain.isKnown(h))
361 auto const receipts = m_chain.receipts(h);
362 auto receiptsRlpList = receipts.rlp();
363 rlp.insert(
rlp.end(), receiptsRlpList.begin(), receiptsRlpList.end());
367 cnetlog << n <<
" receipt lists known and returned; " << (numItemsToSend - n)
368 <<
" unknown; " << (count > c_maxReceipts ? count - c_maxReceipts : 0)
371 return make_pair(
rlp, n);
381 EthereumCapability::EthereumCapability(shared_ptr<p2p::CapabilityHostFace> _host,
384 : m_host(move(_host)),
389 m_networkId(_networkId),
390 m_hostData(new EthereumHostData(m_chain, m_db))
395 m_peerObserver.reset(
new EthereumPeerObserver(m_sync, m_tq));
398 std::random_device seed;
399 m_urng = std::mt19937_64(seed());
404 return c_backgroundWorkInterval;
407 bool EthereumCapability::ensureInitialised()
409 if (!m_latestBlockSent)
413 LOG(m_logger) <<
"Initialising: latest=" << m_latestBlockSent;
427 m_host->postWork([
this]() {
428 m_latestBlockSent =
h256();
429 m_transactionsSent.clear();
435 m_sync->completeSync();
438 void EthereumCapability::maintainTransactions()
441 unordered_map<NodeID, std::vector<size_t>> peerTransactions;
444 for (
size_t i = 0; i < ts.size(); ++i)
446 auto const& t = ts[i];
447 bool unsent = !m_transactionsSent.count(t.sha3());
450 auto const peers = selectPeers([&](
EthereumPeer const& _peer) {
454 for (
auto const& p: peers)
455 peerTransactions[p].push_back(i);
457 for (
auto const& t: ts)
458 m_transactionsSent.insert(t.sha3());
462 for (
auto&
peer : m_peers)
466 for (
auto const& i : peerTransactions[
peer.first])
477 m_host->sealAndSend(
peer.first, ts);
478 LOG(m_logger) <<
"Sent " << n <<
" transactions to " <<
peer.first;
484 vector<NodeID> EthereumCapability::selectPeers(
485 std::function<
bool(
EthereumPeer const&)>
const& _predicate)
const
487 vector<NodeID> allowed;
488 for (
auto const&
peer : m_peers)
490 if (_predicate(
peer.second))
491 allowed.push_back(
peer.first);
496 std::pair<std::vector<NodeID>, std::vector<NodeID>> EthereumCapability::randomPartitionPeers(
497 std::vector<NodeID>
const& _peers, std::size_t _number)
const
499 vector<NodeID> part1(_peers);
500 vector<NodeID> part2;
502 if (_number >= _peers.size())
503 return std::make_pair(part1, part2);
505 std::shuffle(part1.begin(), part1.end(), m_urng);
508 std::move(part1.begin() + _number, part1.end(), std::back_inserter(part2));
509 part1.erase(part1.begin() + _number, part1.end());
510 return std::make_pair(move(part1), move(part2));
513 void EthereumCapability::maintainBlocks(
h256 const& _currentHash)
516 auto detailsFrom = m_chain.
details(m_latestBlockSent);
517 auto detailsTo = m_chain.
details(_currentHash);
518 if (detailsFrom.totalDifficulty < detailsTo.totalDifficulty)
520 if (
diff(detailsFrom.number, detailsTo.number) < 20)
523 LOG(m_logger) <<
"Sending new blocks (current is " << _currentHash <<
", was "
524 << m_latestBlockSent <<
")";
526 h256s blocks = get<0>(m_chain.
treeRoute(m_latestBlockSent, _currentHash,
false,
false,
true));
529 auto const peersWithoutBlock = selectPeers(
532 auto const peersToSendNumber =
533 std::max<std::size_t>(c_minBlockBroadcastPeers, std::sqrt(m_peers.size()));
535 std::vector<NodeID> peersToSend;
536 std::vector<NodeID> peersToAnnounce;
537 std::tie(peersToSend, peersToAnnounce) =
538 randomPartitionPeers(peersWithoutBlock, peersToSendNumber);
540 for (
NodeID const& peerID : peersToSend)
541 for (
auto const& b: blocks)
545 .appendRaw(m_chain.
block(b), 1)
548 auto itPeer = m_peers.find(peerID);
549 if (itPeer != m_peers.end())
551 m_host->sealAndSend(peerID, ts);
552 itPeer->second.clearKnownBlocks();
555 if (!peersToSend.empty())
556 LOG(m_logger) <<
"Sent " << blocks.size() <<
" block(s) to " << peersToSend.size()
559 for (
NodeID const& peerID : peersToAnnounce)
563 for (
auto const& b: blocks)
570 auto itPeer = m_peers.find(peerID);
571 if (itPeer != m_peers.end())
573 m_host->sealAndSend(peerID, ts);
574 itPeer->second.clearKnownBlocks();
577 if (!peersToAnnounce.empty())
578 LOG(m_logger) <<
"Announced " << blocks.size() <<
" block(s) to "
579 << peersToAnnounce.size() <<
" peers";
581 m_latestBlockSent = _currentHash;
587 return m_sync->isSyncing();
592 return m_sync->status();
595 void EthereumCapability::onTransactionImported(
598 m_host->postWork([
this, _ir, _h, _nodeId]() {
599 auto itPeerStatus = m_peers.find(_nodeId);
600 if (itPeerStatus == m_peers.end())
603 auto&
peer = itPeerStatus->second;
609 m_host->updateRating(_nodeId, -100);
613 m_transactionsSent.insert(_h);
614 m_host->updateRating(_nodeId, 0);
617 m_host->updateRating(_nodeId, 100);
626 m_host->addNote(_peerID,
"manners", m_host->isRude(_peerID,
name()) ?
"RUDE" :
"nice");
629 m_peers.emplace(_peerID,
peer);
637 m_peerObserver->onPeerAborting();
639 m_peers.erase(_peerID);
643 NodeID const& _peerID,
unsigned _id,
RLP const& _r)
645 auto&
peer = m_peers[_peerID];
646 peer.
setLastAsk(std::chrono::system_clock::to_time_t(chrono::system_clock::now()));
654 auto const peerProtocolVersion = _r[0].
toInt<
unsigned>();
656 auto const totalDifficulty = _r[2].
toInt<
u256>();
658 auto const genesisHash = _r[4].
toHash<
h256>();
660 LOG(m_logger) <<
"Status: " << peerProtocolVersion <<
" / " <<
networkId <<
" / "
661 << genesisHash <<
", TD: " << totalDifficulty <<
" = " << latestHash;
664 peerProtocolVersion,
networkId, totalDifficulty, latestHash, genesisHash);
666 m_peerObserver->onPeerStatus(
peer);
671 m_peerObserver->onPeerTransactions(_peerID, _r);
678 const auto blockId = _r[0];
679 const auto maxHeaders = _r[1].
toInt<
u256>();
681 const auto reverse = _r[3].
toInt<
bool>();
683 auto numHeadersToSend = maxHeaders <= c_maxHeadersToSend ?
684 static_cast<unsigned>(maxHeaders) :
687 if (skip > std::numeric_limits<unsigned>::max() - 1)
689 cnetdetails <<
"Requested block skip is too big: " << skip;
693 pair<bytes, unsigned>
const rlpAndItemCount =
694 m_hostData->blockHeaders(blockId, numHeadersToSend, skip, reverse);
698 .appendRaw(rlpAndItemCount.first, rlpAndItemCount.second);
699 m_host->sealAndSend(_peerID, s);
700 m_host->updateRating(_peerID, 0);
706 LOG(m_loggerImpolite)
707 <<
"Peer giving us block headers when we didn't ask for them.";
711 m_peerObserver->onPeerBlockHeaders(_peerID, _r);
717 unsigned count =
static_cast<unsigned>(_r.
itemCount());
718 cnetlog <<
"GetBlockBodies (" << dec << count <<
" entries)";
722 LOG(m_loggerImpolite) <<
"Zero-entry GetBlockBodies: Not replying.";
723 m_host->updateRating(_peerID, -10);
727 pair<bytes, unsigned>
const rlpAndItemCount = m_hostData->blockBodies(_r);
729 m_host->updateRating(_peerID, 0);
732 .appendRaw(rlpAndItemCount.first, rlpAndItemCount.second);
733 m_host->sealAndSend(_peerID, s);
739 LOG(m_loggerImpolite) <<
"Peer giving us block bodies when we didn't ask for them.";
743 m_peerObserver->onPeerBlockBodies(_peerID, _r);
749 m_peerObserver->onPeerNewBlock(_peerID, _r);
756 cnetlog <<
"BlockHashes (" << dec << itemCount <<
" entries) "
757 << (itemCount ?
"" :
" : NoMoreHashes");
759 if (itemCount > c_maxIncomingNewHashes)
765 vector<pair<h256, u256>> hashes(itemCount);
766 for (
unsigned i = 0; i < itemCount; ++i)
767 hashes[i] = std::make_pair(_r[i][0].toHash<h256>(), _r[i][1].toInt<u256>());
769 m_peerObserver->onPeerNewHashes(_peerID, hashes);
774 unsigned count =
static_cast<unsigned>(_r.
itemCount());
777 LOG(m_loggerImpolite) <<
"Zero-entry GetNodeData: Not replying.";
778 m_host->updateRating(_peerID, -10);
781 cnetlog <<
"GetNodeData (" << dec << count <<
" entries)";
783 strings const data = m_hostData->nodeData(_r);
785 m_host->updateRating(_peerID, 0);
788 for (
auto const& element : data)
790 m_host->sealAndSend(_peerID, s);
795 unsigned count =
static_cast<unsigned>(_r.
itemCount());
798 LOG(m_loggerImpolite) <<
"Zero-entry GetReceipts: Not replying.";
799 m_host->updateRating(_peerID, -10);
802 cnetlog <<
"GetReceipts (" << dec << count <<
" entries)";
804 pair<bytes, unsigned>
const rlpAndItemCount = m_hostData->receipts(_r);
806 m_host->updateRating(_peerID, 0);
809 .appendRaw(rlpAndItemCount.first, rlpAndItemCount.second);
810 m_host->sealAndSend(_peerID, s);
816 LOG(m_loggerImpolite) <<
"Peer giving us node data when we didn't ask for them.";
820 m_peerObserver->onPeerNodeData(_peerID, _r);
827 LOG(m_loggerImpolite) <<
"Peer giving us receipts when we didn't ask for them.";
831 m_peerObserver->onPeerReceipts(_peerID, _r);
841 cnetlog <<
"Peer causing an Exception: "
842 << boost::current_exception_diagnostic_information() <<
" " << _r;
844 catch (std::exception
const& _e)
846 cnetlog <<
"Peer causing an exception: " << _e.what() <<
" " << _r;
860 if (m_newTransactions)
862 m_newTransactions =
false;
863 maintainTransactions();
872 time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
873 if (now - m_lastTick >= 1)
876 for (
auto const&
peer : m_peers)
878 time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
882 m_host->disconnect(
peer.first, p2p::PingTimeout);
887 void EthereumCapability::setIdle(
NodeID const& _peerID)
892 void EthereumCapability::setAsking(
NodeID const& _peerID,
Asking _a)
894 auto itPeerStatus = m_peers.find(_peerID);
895 if (itPeerStatus == m_peers.end())
898 auto& peerStatus = itPeerStatus->second;
900 peerStatus.setAsking(_a);
901 peerStatus.setLastAsk(std::chrono::system_clock::to_time_t(chrono::system_clock::now()));
903 m_host->addNote(_peerID,
"ask", ::
toString(_a));
904 m_host->addNote(_peerID,
"sync",
905 string(isCriticalSyncing(_peerID) ?
"ONGOING" :
"holding") +
906 (needsSyncing(_peerID) ?
" & needed" :
""));
909 bool EthereumCapability::isCriticalSyncing(
NodeID const& _peerID)
const
911 auto itPeerStatus = m_peers.find(_peerID);
912 if (itPeerStatus == m_peers.end())
915 auto const& peerStatus = itPeerStatus->second;
917 auto const asking = peerStatus.asking();
921 bool EthereumCapability::needsSyncing(
NodeID const& _peerID)
const
923 if (m_host->isRude(_peerID,
name()))
926 auto peerStatus = m_peers.find(_peerID);
927 return (peerStatus != m_peers.end() && peerStatus->second.latestHash());
932 m_host->disableCapability(_peerID,
name(), _problem);
942 auto peer = m_peers.find(_peerID);
943 if (
peer == m_peers.end())
944 BOOST_THROW_EXCEPTION(PeerDisconnected() <<
errinfo_nodeID(_peerID));