Ethereum  PoC-8
The C++ Implementation of Ethereum
EthereumCapability.cpp
Go to the documentation of this file.
1 // Aleth: Ethereum C++ client, tools and libraries.
2 // Copyright 2019 Aleth Authors.
3 // Licensed under the GNU General Public License, Version 3.
4 
5 #include "EthereumCapability.h"
6 #include "BlockChain.h"
7 #include "BlockChainSync.h"
8 #include "BlockQueue.h"
9 #include "TransactionQueue.h"
10 #include <libethcore/Exceptions.h>
11 #include <libp2p/Common.h>
12 #include <libp2p/Host.h>
13 #include <libp2p/Session.h>
14 #include <chrono>
15 #include <thread>
16 
17 using namespace std;
18 using namespace dev;
19 using namespace dev::eth;
20 
21 char const* const EthereumCapability::c_stateNames[static_cast<int>(SyncState::Size)] = {
22  "NotSynced", "Idle", "Waiting", "Blocks", "State"};
23 
24 std::chrono::milliseconds constexpr EthereumCapability::c_backgroundWorkInterval;
25 
26 namespace
27 {
28 constexpr unsigned c_maxSendTransactions = 256;
29 constexpr unsigned c_maxHeadersToSend = 1024;
30 constexpr unsigned c_maxIncomingNewHashes = 1024;
31 constexpr unsigned c_peerTimeoutSeconds = 10;
32 constexpr int c_minBlockBroadcastPeers = 4;
33 
34 string toString(Asking _a)
35 {
36  switch (_a)
37  {
38  case Asking::BlockHeaders:
39  return "BlockHeaders";
40  case Asking::BlockBodies:
41  return "BlockBodies";
42  case Asking::NodeData:
43  return "NodeData";
44  case Asking::Receipts:
45  return "Receipts";
46  case Asking::Nothing:
47  return "Nothing";
48  case Asking::State:
49  return "State";
50  case Asking::WarpManifest:
51  return "WarpManifest";
52  case Asking::WarpData:
53  return "WarpData";
54  }
55  return "?";
56 }
57 
58 class EthereumPeerObserver: public EthereumPeerObserverFace
59 {
60 public:
61  EthereumPeerObserver(shared_ptr<BlockChainSync> _sync, TransactionQueue& _tq): m_sync(_sync), m_tq(_tq) {}
62 
63  void onPeerStatus(EthereumPeer const& _peer) override
64  {
65  try
66  {
67  m_sync->onPeerStatus(_peer);
68  }
69  catch (FailedInvariant const&)
70  {
71  // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
72  cwarn << "Failed invariant during sync, restarting sync";
73  m_sync->restartSync();
74  }
75  }
76 
77  void onPeerTransactions(NodeID const& _peerID, RLP const& _r) override
78  {
79  unsigned itemCount = _r.itemCount();
80  LOG(m_logger) << "Transactions (" << dec << itemCount << " entries)";
81  m_tq.enqueue(_r, _peerID);
82  }
83 
84  void onPeerAborting() override
85  {
86  try
87  {
88  m_sync->onPeerAborting();
89  }
90  catch (Exception&)
91  {
92  cwarn << "Exception on peer destruciton: " << boost::current_exception_diagnostic_information();
93  }
94  }
95 
96  void onPeerBlockHeaders(NodeID const& _peerID, RLP const& _headers) override
97  {
98  try
99  {
100  m_sync->onPeerBlockHeaders(_peerID, _headers);
101  }
102  catch (FailedInvariant const&)
103  {
104  // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
105  cwarn << "Failed invariant during sync, restarting sync";
106  m_sync->restartSync();
107  }
108  }
109 
110  void onPeerBlockBodies(NodeID const& _peerID, RLP const& _r) override
111  {
112  try
113  {
114  m_sync->onPeerBlockBodies(_peerID, _r);
115  }
116  catch (FailedInvariant const&)
117  {
118  // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
119  cwarn << "Failed invariant during sync, restarting sync";
120  m_sync->restartSync();
121  }
122  }
123 
124  void onPeerNewHashes(
125  NodeID const& _peerID, std::vector<std::pair<h256, u256>> const& _hashes) override
126  {
127  try
128  {
129  m_sync->onPeerNewHashes(_peerID, _hashes);
130  }
131  catch (FailedInvariant const&)
132  {
133  // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
134  cwarn << "Failed invariant during sync, restarting sync";
135  m_sync->restartSync();
136  }
137  }
138 
139  void onPeerNewBlock(NodeID const& _peerID, RLP const& _r) override
140  {
141  try
142  {
143  m_sync->onPeerNewBlock(_peerID, _r);
144  }
145  catch (FailedInvariant const&)
146  {
147  // "fix" for https://github.com/ethereum/webthree-umbrella/issues/300
148  cwarn << "Failed invariant during sync, restarting sync";
149  m_sync->restartSync();
150  }
151  }
152 
153  void onPeerNodeData(NodeID const& /* _peerID */, RLP const& _r) override
154  {
155  unsigned itemCount = _r.itemCount();
156  LOG(m_logger) << "Node Data (" << dec << itemCount << " entries)";
157  }
158 
159  void onPeerReceipts(NodeID const& /* _peerID */, RLP const& _r) override
160  {
161  unsigned itemCount = _r.itemCount();
162  LOG(m_logger) << "Receipts (" << dec << itemCount << " entries)";
163  }
164 
165 private:
166  shared_ptr<BlockChainSync> m_sync;
167  TransactionQueue& m_tq;
168 
169  Logger m_logger{createLogger(VerbosityDebug, "host")};
170 };
171 
172 class EthereumHostData: public EthereumHostDataFace
173 {
174 public:
175  EthereumHostData(BlockChain const& _chain, OverlayDB const& _db): m_chain(_chain), m_db(_db) {}
176 
177  pair<bytes, unsigned> blockHeaders(RLP const& _blockId, unsigned _maxHeaders, u256 _skip, bool _reverse) const override
178  {
179  auto numHeadersToSend = _maxHeaders;
180 
181  auto step = static_cast<unsigned>(_skip) + 1;
182  assert(step > 0 && "step must not be 0");
183 
184  h256 blockHash;
185  if (_blockId.size() == 32) // block id is a hash
186  {
187  blockHash = _blockId.toHash<h256>();
188  cnetlog << "GetBlockHeaders (block (hash): " << blockHash
189  << ", maxHeaders: " << _maxHeaders << ", skip: " << _skip
190  << ", reverse: " << _reverse << ")";
191 
192  if (!m_chain.isKnown(blockHash))
193  blockHash = {};
194  else if (!_reverse)
195  {
196  auto n = m_chain.number(blockHash);
197  if (numHeadersToSend == 0)
198  blockHash = {};
199  else if (n != 0 || blockHash == m_chain.genesisHash())
200  {
201  auto top = n + uint64_t(step) * numHeadersToSend - 1;
202  auto lastBlock = m_chain.number();
203  if (top > lastBlock)
204  {
205  numHeadersToSend = (lastBlock - n) / step + 1;
206  top = n + step * (numHeadersToSend - 1);
207  }
208  assert(top <= lastBlock && "invalid top block calculated");
209  blockHash = m_chain.numberHash(static_cast<unsigned>(top)); // override start block hash with the hash of the top block we have
210  }
211  else
212  blockHash = {};
213  }
214  }
215  else // block id is a number
216  {
217  auto n = _blockId.toInt<bigint>();
218  cnetlog << "GetBlockHeaders (" << n << " max: " << _maxHeaders << " skip: " << _skip
219  << (_reverse ? " reverse" : "") << ")";
220 
221  if (!_reverse)
222  {
223  auto lastBlock = m_chain.number();
224  if (n > lastBlock || numHeadersToSend == 0)
225  blockHash = {};
226  else
227  {
228  bigint top = n + uint64_t(step) * (numHeadersToSend - 1);
229  if (top > lastBlock)
230  {
231  numHeadersToSend = (lastBlock - static_cast<unsigned>(n)) / step + 1;
232  top = n + step * (numHeadersToSend - 1);
233  }
234  assert(top <= lastBlock && "invalid top block calculated");
235  blockHash = m_chain.numberHash(static_cast<unsigned>(top)); // override start block hash with the hash of the top block we have
236  }
237  }
238  else if (n <= std::numeric_limits<unsigned>::max())
239  blockHash = m_chain.numberHash(static_cast<unsigned>(n));
240  else
241  blockHash = {};
242  }
243 
244  auto nextHash = [this](h256 _h, unsigned _step)
245  {
246  static const unsigned c_blockNumberUsageLimit = 1000;
247 
248  const auto lastBlock = m_chain.number();
249  const auto limitBlock = lastBlock > c_blockNumberUsageLimit ? lastBlock - c_blockNumberUsageLimit : 0; // find the number of the block below which we don't expect BC changes.
250 
251  while (_step) // parent hash traversal
252  {
253  auto details = m_chain.details(_h);
254  if (details.number < limitBlock)
255  break; // stop using parent hash traversal, fallback to using block numbers
256  _h = details.parent;
257  --_step;
258  }
259 
260  if (_step) // still need lower block
261  {
262  auto n = m_chain.number(_h);
263  if (n >= _step)
264  _h = m_chain.numberHash(n - _step);
265  else
266  _h = {};
267  }
268 
269 
270  return _h;
271  };
272 
273  bytes rlp;
274  unsigned itemCount = 0;
275  vector<h256> hashes;
276  for (unsigned i = 0; i != numHeadersToSend; ++i)
277  {
278  if (!blockHash || !m_chain.isKnown(blockHash))
279  break;
280 
281  hashes.push_back(blockHash);
282  ++itemCount;
283 
284  blockHash = nextHash(blockHash, step);
285  }
286 
287  for (unsigned i = 0; i < hashes.size() && rlp.size() < c_maxPayload; ++i)
288  rlp += m_chain.headerData(hashes[_reverse ? i : hashes.size() - 1 - i]);
289 
290  return make_pair(rlp, itemCount);
291  }
292 
293  pair<bytes, unsigned> blockBodies(RLP const& _blockHashes) const override
294  {
295  unsigned const count = static_cast<unsigned>(_blockHashes.itemCount());
296 
297  bytes rlp;
298  unsigned n = 0;
299  auto numBodiesToSend = std::min(count, c_maxBlocks);
300  for (unsigned i = 0; i < numBodiesToSend && rlp.size() < c_maxPayload; ++i)
301  {
302  auto h = _blockHashes[i].toHash<h256>();
303  if (m_chain.isKnown(h))
304  {
305  bytes blockBytes = m_chain.block(h);
306  RLP block{blockBytes};
307  RLPStream body;
308  body.appendList(2);
309  body.appendRaw(block[1].data()); // transactions
310  body.appendRaw(block[2].data()); // uncles
311  auto bodyBytes = body.out();
312  rlp.insert(rlp.end(), bodyBytes.begin(), bodyBytes.end());
313  ++n;
314  }
315  }
316  if (count > 20 && n == 0)
317  cnetlog << "all " << count << " unknown blocks requested; peer on different chain?";
318  else
319  cnetlog << n << " blocks known and returned; " << (numBodiesToSend - n)
320  << " blocks unknown; " << (count > c_maxBlocks ? count - c_maxBlocks : 0)
321  << " blocks ignored";
322 
323  return make_pair(rlp, n);
324  }
325 
326  strings nodeData(RLP const& _dataHashes) const override
327  {
328  unsigned const count = static_cast<unsigned>(_dataHashes.itemCount());
329 
330  strings data;
331  size_t payloadSize = 0;
332  auto numItemsToSend = std::min(count, c_maxNodes);
333  for (unsigned i = 0; i < numItemsToSend && payloadSize < c_maxPayload; ++i)
334  {
335  auto h = _dataHashes[i].toHash<h256>();
336  auto node = m_db.lookup(h);
337  if (!node.empty())
338  {
339  payloadSize += node.length();
340  data.push_back(move(node));
341  }
342  }
343  cnetlog << data.size() << " nodes known and returned; " << (numItemsToSend - data.size())
344  << " unknown; " << (count > c_maxNodes ? count - c_maxNodes : 0) << " ignored";
345 
346  return data;
347  }
348 
349  pair<bytes, unsigned> receipts(RLP const& _blockHashes) const override
350  {
351  unsigned const count = static_cast<unsigned>(_blockHashes.itemCount());
352 
353  bytes rlp;
354  unsigned n = 0;
355  auto numItemsToSend = std::min(count, c_maxReceipts);
356  for (unsigned i = 0; i < numItemsToSend && rlp.size() < c_maxPayload; ++i)
357  {
358  auto h = _blockHashes[i].toHash<h256>();
359  if (m_chain.isKnown(h))
360  {
361  auto const receipts = m_chain.receipts(h);
362  auto receiptsRlpList = receipts.rlp();
363  rlp.insert(rlp.end(), receiptsRlpList.begin(), receiptsRlpList.end());
364  ++n;
365  }
366  }
367  cnetlog << n << " receipt lists known and returned; " << (numItemsToSend - n)
368  << " unknown; " << (count > c_maxReceipts ? count - c_maxReceipts : 0)
369  << " ignored";
370 
371  return make_pair(rlp, n);
372  }
373 
374 private:
375  BlockChain const& m_chain;
376  OverlayDB const& m_db;
377 };
378 
379 }
380 
381 EthereumCapability::EthereumCapability(shared_ptr<p2p::CapabilityHostFace> _host,
382  BlockChain const& _ch, OverlayDB const& _db, TransactionQueue& _tq, BlockQueue& _bq,
383  u256 _networkId)
384  : m_host(move(_host)),
385  m_chain(_ch),
386  m_db(_db),
387  m_tq(_tq),
388  m_bq(_bq),
389  m_networkId(_networkId),
390  m_hostData(new EthereumHostData(m_chain, m_db))
391 {
392  // TODO: Composition would be better. Left like that to avoid initialization
393  // issues as BlockChainSync accesses other EthereumHost members.
394  m_sync.reset(new BlockChainSync(*this));
395  m_peerObserver.reset(new EthereumPeerObserver(m_sync, m_tq));
396  m_latestBlockSent = _ch.currentHash();
397  m_tq.onImport([this](ImportResult _ir, h256 const& _h, h512 const& _nodeId) { onTransactionImported(_ir, _h, _nodeId); });
398  std::random_device seed;
399  m_urng = std::mt19937_64(seed());
400 }
401 
402 std::chrono::milliseconds EthereumCapability::backgroundWorkInterval() const
403 {
404  return c_backgroundWorkInterval;
405 }
406 
407 bool EthereumCapability::ensureInitialised()
408 {
409  if (!m_latestBlockSent)
410  {
411  // First time - just initialise.
412  m_latestBlockSent = m_chain.currentHash();
413  LOG(m_logger) << "Initialising: latest=" << m_latestBlockSent;
414 
415  m_transactionsSent = m_tq.knownTransactions();
416  return true;
417  }
418  return false;
419 }
420 
422 {
423  m_sync->abortSync();
424 
425  // reset() can be called from RPC handling thread,
426  // but we access m_latestBlockSent and m_transactionsSent only from the network thread
427  m_host->postWork([this]() {
428  m_latestBlockSent = h256();
429  m_transactionsSent.clear();
430  });
431 }
432 
434 {
435  m_sync->completeSync();
436 }
437 
438 void EthereumCapability::maintainTransactions()
439 {
440  // Send any new transactions.
441  unordered_map<NodeID, std::vector<size_t>> peerTransactions;
442  auto ts = m_tq.topTransactions(c_maxSendTransactions);
443  {
444  for (size_t i = 0; i < ts.size(); ++i)
445  {
446  auto const& t = ts[i];
447  bool unsent = !m_transactionsSent.count(t.sha3());
448 
449  // Build list of peers to send transactions to
450  auto const peers = selectPeers([&](EthereumPeer const& _peer) {
451  return _peer.isWaitingForTransactions() ||
452  (unsent && !_peer.isTransactionKnown(t.sha3()));
453  });
454  for (auto const& p: peers)
455  peerTransactions[p].push_back(i);
456  }
457  for (auto const& t: ts)
458  m_transactionsSent.insert(t.sha3());
459  }
460 
461  // Send transactions to peers
462  for (auto& peer : m_peers)
463  {
464  bytes b;
465  unsigned n = 0;
466  for (auto const& i : peerTransactions[peer.first])
467  {
468  peer.second.markTransactionAsKnown(ts[i].sha3());
469  b += ts[i].rlp();
470  ++n;
471  }
472 
473  if (n || peer.second.isWaitingForTransactions())
474  {
475  RLPStream ts;
476  m_host->prep(peer.first, name(), ts, TransactionsPacket, n).appendRaw(b, n);
477  m_host->sealAndSend(peer.first, ts);
478  LOG(m_logger) << "Sent " << n << " transactions to " << peer.first;
479  }
480  peer.second.setWaitingForTransactions(false);
481  }
482 }
483 
484 vector<NodeID> EthereumCapability::selectPeers(
485  std::function<bool(EthereumPeer const&)> const& _predicate) const
486 {
487  vector<NodeID> allowed;
488  for (auto const& peer : m_peers)
489  {
490  if (_predicate(peer.second))
491  allowed.push_back(peer.first);
492  }
493  return allowed;
494 }
495 
496 std::pair<std::vector<NodeID>, std::vector<NodeID>> EthereumCapability::randomPartitionPeers(
497  std::vector<NodeID> const& _peers, std::size_t _number) const
498 {
499  vector<NodeID> part1(_peers);
500  vector<NodeID> part2;
501 
502  if (_number >= _peers.size())
503  return std::make_pair(part1, part2);
504 
505  std::shuffle(part1.begin(), part1.end(), m_urng);
506 
507  // Remove elements from the end of the shuffled part1 vector and move them to part2.
508  std::move(part1.begin() + _number, part1.end(), std::back_inserter(part2));
509  part1.erase(part1.begin() + _number, part1.end());
510  return std::make_pair(move(part1), move(part2));
511 }
512 
513 void EthereumCapability::maintainBlocks(h256 const& _currentHash)
514 {
515  // Send any new blocks.
516  auto detailsFrom = m_chain.details(m_latestBlockSent);
517  auto detailsTo = m_chain.details(_currentHash);
518  if (detailsFrom.totalDifficulty < detailsTo.totalDifficulty)
519  {
520  if (diff(detailsFrom.number, detailsTo.number) < 20)
521  {
522  // don't be sending more than 20 "new" blocks. if there are any more we were probably waaaay behind.
523  LOG(m_logger) << "Sending new blocks (current is " << _currentHash << ", was "
524  << m_latestBlockSent << ")";
525 
526  h256s blocks = get<0>(m_chain.treeRoute(m_latestBlockSent, _currentHash, false, false, true));
527 
528 
529  auto const peersWithoutBlock = selectPeers(
530  [&](EthereumPeer const& _peer) { return !_peer.isBlockKnown(_currentHash); });
531 
532  auto const peersToSendNumber =
533  std::max<std::size_t>(c_minBlockBroadcastPeers, std::sqrt(m_peers.size()));
534 
535  std::vector<NodeID> peersToSend;
536  std::vector<NodeID> peersToAnnounce;
537  std::tie(peersToSend, peersToAnnounce) =
538  randomPartitionPeers(peersWithoutBlock, peersToSendNumber);
539 
540  for (NodeID const& peerID : peersToSend)
541  for (auto const& b: blocks)
542  {
543  RLPStream ts;
544  m_host->prep(peerID, name(), ts, NewBlockPacket, 2)
545  .appendRaw(m_chain.block(b), 1)
546  .append(m_chain.details(b).totalDifficulty);
547 
548  auto itPeer = m_peers.find(peerID);
549  if (itPeer != m_peers.end())
550  {
551  m_host->sealAndSend(peerID, ts);
552  itPeer->second.clearKnownBlocks();
553  }
554  }
555  if (!peersToSend.empty())
556  LOG(m_logger) << "Sent " << blocks.size() << " block(s) to " << peersToSend.size()
557  << " peers";
558 
559  for (NodeID const& peerID : peersToAnnounce)
560  {
561  RLPStream ts;
562  m_host->prep(peerID, name(), ts, NewBlockHashesPacket, blocks.size());
563  for (auto const& b: blocks)
564  {
565  ts.appendList(2);
566  ts.append(b);
567  ts.append(m_chain.number(b));
568  }
569 
570  auto itPeer = m_peers.find(peerID);
571  if (itPeer != m_peers.end())
572  {
573  m_host->sealAndSend(peerID, ts);
574  itPeer->second.clearKnownBlocks();
575  }
576  }
577  if (!peersToAnnounce.empty())
578  LOG(m_logger) << "Announced " << blocks.size() << " block(s) to "
579  << peersToAnnounce.size() << " peers";
580  }
581  m_latestBlockSent = _currentHash;
582  }
583 }
584 
586 {
587  return m_sync->isSyncing();
588 }
589 
591 {
592  return m_sync->status();
593 }
594 
595 void EthereumCapability::onTransactionImported(
596  ImportResult _ir, h256 const& _h, h512 const& _nodeId)
597 {
598  m_host->postWork([this, _ir, _h, _nodeId]() {
599  auto itPeerStatus = m_peers.find(_nodeId);
600  if (itPeerStatus == m_peers.end())
601  return;
602 
603  auto& peer = itPeerStatus->second;
604 
606  switch (_ir)
607  {
609  m_host->updateRating(_nodeId, -100);
610  break;
612  // if we already had the transaction, then don't bother sending it on.
613  m_transactionsSent.insert(_h);
614  m_host->updateRating(_nodeId, 0);
615  break;
617  m_host->updateRating(_nodeId, 100);
618  break;
619  default:;
620  }
621  });
622 }
623 
624 void EthereumCapability::onConnect(NodeID const& _peerID, u256 const& _peerCapabilityVersion)
625 {
626  m_host->addNote(_peerID, "manners", m_host->isRude(_peerID, name()) ? "RUDE" : "nice");
627 
628  EthereumPeer peer{m_host, _peerID, _peerCapabilityVersion};
629  m_peers.emplace(_peerID, peer);
630  peer.requestStatus(m_networkId, m_chain.details().totalDifficulty, m_chain.currentHash(),
631  m_chain.genesisHash());
632 }
633 
635 {
636  // TODO lower peer's rating or mark as rude if it disconnects when being asked for something
637  m_peerObserver->onPeerAborting();
638 
639  m_peers.erase(_peerID);
640 }
641 
643  NodeID const& _peerID, unsigned _id, RLP const& _r)
644 {
645  auto& peer = m_peers[_peerID];
646  peer.setLastAsk(std::chrono::system_clock::to_time_t(chrono::system_clock::now()));
647 
648  try
649  {
650  switch (_id)
651  {
652  case StatusPacket:
653  {
654  auto const peerProtocolVersion = _r[0].toInt<unsigned>();
655  auto const networkId = _r[1].toInt<u256>();
656  auto const totalDifficulty = _r[2].toInt<u256>();
657  auto const latestHash = _r[3].toHash<h256>();
658  auto const genesisHash = _r[4].toHash<h256>();
659 
660  LOG(m_logger) << "Status: " << peerProtocolVersion << " / " << networkId << " / "
661  << genesisHash << ", TD: " << totalDifficulty << " = " << latestHash;
662 
663  peer.setStatus(
664  peerProtocolVersion, networkId, totalDifficulty, latestHash, genesisHash);
665  setIdle(_peerID);
666  m_peerObserver->onPeerStatus(peer);
667  break;
668  }
669  case TransactionsPacket:
670  {
671  m_peerObserver->onPeerTransactions(_peerID, _r);
672  break;
673  }
675  {
678  const auto blockId = _r[0];
679  const auto maxHeaders = _r[1].toInt<u256>();
680  const auto skip = _r[2].toInt<u256>();
681  const auto reverse = _r[3].toInt<bool>();
682 
683  auto numHeadersToSend = maxHeaders <= c_maxHeadersToSend ?
684  static_cast<unsigned>(maxHeaders) :
685  c_maxHeadersToSend;
686 
687  if (skip > std::numeric_limits<unsigned>::max() - 1)
688  {
689  cnetdetails << "Requested block skip is too big: " << skip;
690  break;
691  }
692 
693  pair<bytes, unsigned> const rlpAndItemCount =
694  m_hostData->blockHeaders(blockId, numHeadersToSend, skip, reverse);
695 
696  RLPStream s;
697  m_host->prep(_peerID, name(), s, BlockHeadersPacket, rlpAndItemCount.second)
698  .appendRaw(rlpAndItemCount.first, rlpAndItemCount.second);
699  m_host->sealAndSend(_peerID, s);
700  m_host->updateRating(_peerID, 0);
701  break;
702  }
703  case BlockHeadersPacket:
704  {
706  LOG(m_loggerImpolite)
707  << "Peer giving us block headers when we didn't ask for them.";
708  else
709  {
710  setIdle(_peerID);
711  m_peerObserver->onPeerBlockHeaders(_peerID, _r);
712  }
713  break;
714  }
716  {
717  unsigned count = static_cast<unsigned>(_r.itemCount());
718  cnetlog << "GetBlockBodies (" << dec << count << " entries)";
719 
720  if (!count)
721  {
722  LOG(m_loggerImpolite) << "Zero-entry GetBlockBodies: Not replying.";
723  m_host->updateRating(_peerID, -10);
724  break;
725  }
726 
727  pair<bytes, unsigned> const rlpAndItemCount = m_hostData->blockBodies(_r);
728 
729  m_host->updateRating(_peerID, 0);
730  RLPStream s;
731  m_host->prep(_peerID, name(), s, BlockBodiesPacket, rlpAndItemCount.second)
732  .appendRaw(rlpAndItemCount.first, rlpAndItemCount.second);
733  m_host->sealAndSend(_peerID, s);
734  break;
735  }
736  case BlockBodiesPacket:
737  {
739  LOG(m_loggerImpolite) << "Peer giving us block bodies when we didn't ask for them.";
740  else
741  {
742  setIdle(_peerID);
743  m_peerObserver->onPeerBlockBodies(_peerID, _r);
744  }
745  break;
746  }
747  case NewBlockPacket:
748  {
749  m_peerObserver->onPeerNewBlock(_peerID, _r);
750  break;
751  }
753  {
754  unsigned itemCount = _r.itemCount();
755 
756  cnetlog << "BlockHashes (" << dec << itemCount << " entries) "
757  << (itemCount ? "" : " : NoMoreHashes");
758 
759  if (itemCount > c_maxIncomingNewHashes)
760  {
761  disablePeer(_peerID, "Too many new hashes");
762  break;
763  }
764 
765  vector<pair<h256, u256>> hashes(itemCount);
766  for (unsigned i = 0; i < itemCount; ++i)
767  hashes[i] = std::make_pair(_r[i][0].toHash<h256>(), _r[i][1].toInt<u256>());
768 
769  m_peerObserver->onPeerNewHashes(_peerID, hashes);
770  break;
771  }
772  case GetNodeDataPacket:
773  {
774  unsigned count = static_cast<unsigned>(_r.itemCount());
775  if (!count)
776  {
777  LOG(m_loggerImpolite) << "Zero-entry GetNodeData: Not replying.";
778  m_host->updateRating(_peerID, -10);
779  break;
780  }
781  cnetlog << "GetNodeData (" << dec << count << " entries)";
782 
783  strings const data = m_hostData->nodeData(_r);
784 
785  m_host->updateRating(_peerID, 0);
786  RLPStream s;
787  m_host->prep(_peerID, name(), s, NodeDataPacket, data.size());
788  for (auto const& element : data)
789  s.append(element);
790  m_host->sealAndSend(_peerID, s);
791  break;
792  }
793  case GetReceiptsPacket:
794  {
795  unsigned count = static_cast<unsigned>(_r.itemCount());
796  if (!count)
797  {
798  LOG(m_loggerImpolite) << "Zero-entry GetReceipts: Not replying.";
799  m_host->updateRating(_peerID, -10);
800  break;
801  }
802  cnetlog << "GetReceipts (" << dec << count << " entries)";
803 
804  pair<bytes, unsigned> const rlpAndItemCount = m_hostData->receipts(_r);
805 
806  m_host->updateRating(_peerID, 0);
807  RLPStream s;
808  m_host->prep(_peerID, name(), s, ReceiptsPacket, rlpAndItemCount.second)
809  .appendRaw(rlpAndItemCount.first, rlpAndItemCount.second);
810  m_host->sealAndSend(_peerID, s);
811  break;
812  }
813  case NodeDataPacket:
814  {
815  if (peer.asking() != Asking::NodeData)
816  LOG(m_loggerImpolite) << "Peer giving us node data when we didn't ask for them.";
817  else
818  {
819  setIdle(_peerID);
820  m_peerObserver->onPeerNodeData(_peerID, _r);
821  }
822  break;
823  }
824  case ReceiptsPacket:
825  {
826  if (peer.asking() != Asking::Receipts)
827  LOG(m_loggerImpolite) << "Peer giving us receipts when we didn't ask for them.";
828  else
829  {
830  setIdle(_peerID);
831  m_peerObserver->onPeerReceipts(_peerID, _r);
832  }
833  break;
834  }
835  default:
836  return false;
837  }
838  }
839  catch (Exception const&)
840  {
841  cnetlog << "Peer causing an Exception: "
842  << boost::current_exception_diagnostic_information() << " " << _r;
843  }
844  catch (std::exception const& _e)
845  {
846  cnetlog << "Peer causing an exception: " << _e.what() << " " << _r;
847  }
848 
849  return true;
850 }
851 
853 {
854  ensureInitialised();
855  auto h = m_chain.currentHash();
856  // If we've finished our initial sync (including getting all the blocks into the chain so as to
857  // reduce invalid transactions), start trading transactions & blocks
858  if (!isSyncing() && m_chain.isKnown(m_latestBlockSent))
859  {
860  if (m_newTransactions)
861  {
862  m_newTransactions = false;
863  maintainTransactions();
864  }
865  if (m_newBlocks)
866  {
867  m_newBlocks = false;
868  maintainBlocks(h);
869  }
870  }
871 
872  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
873  if (now - m_lastTick >= 1)
874  {
875  m_lastTick = now;
876  for (auto const& peer : m_peers)
877  {
878  time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
879 
880  if (now - peer.second.lastAsk() > c_peerTimeoutSeconds && peer.second.isConversing())
881  // timeout
882  m_host->disconnect(peer.first, p2p::PingTimeout);
883  }
884  }
885 }
886 
887 void EthereumCapability::setIdle(NodeID const& _peerID)
888 {
889  setAsking(_peerID, Asking::Nothing);
890 }
891 
892 void EthereumCapability::setAsking(NodeID const& _peerID, Asking _a)
893 {
894  auto itPeerStatus = m_peers.find(_peerID);
895  if (itPeerStatus == m_peers.end())
896  return;
897 
898  auto& peerStatus = itPeerStatus->second;
899 
900  peerStatus.setAsking(_a);
901  peerStatus.setLastAsk(std::chrono::system_clock::to_time_t(chrono::system_clock::now()));
902 
903  m_host->addNote(_peerID, "ask", ::toString(_a));
904  m_host->addNote(_peerID, "sync",
905  string(isCriticalSyncing(_peerID) ? "ONGOING" : "holding") +
906  (needsSyncing(_peerID) ? " & needed" : ""));
907 }
908 
909 bool EthereumCapability::isCriticalSyncing(NodeID const& _peerID) const
910 {
911  auto itPeerStatus = m_peers.find(_peerID);
912  if (itPeerStatus == m_peers.end())
913  return false;
914 
915  auto const& peerStatus = itPeerStatus->second;
916 
917  auto const asking = peerStatus.asking();
918  return asking == Asking::BlockHeaders || asking == Asking::State;
919 }
920 
921 bool EthereumCapability::needsSyncing(NodeID const& _peerID) const
922 {
923  if (m_host->isRude(_peerID, name()))
924  return false;
925 
926  auto peerStatus = m_peers.find(_peerID);
927  return (peerStatus != m_peers.end() && peerStatus->second.latestHash());
928 }
929 
930 void EthereumCapability::disablePeer(NodeID const& _peerID, std::string const& _problem)
931 {
932  m_host->disableCapability(_peerID, name(), _problem);
933 }
934 
935 EthereumPeer const& EthereumCapability::peer(NodeID const& _peerID) const
936 {
937  return const_cast<EthereumCapability*>(this)->peer(_peerID);
938 }
939 
941 {
942  auto peer = m_peers.find(_peerID);
943  if (peer == m_peers.end())
944  BOOST_THROW_EXCEPTION(PeerDisconnected() << errinfo_nodeID(_peerID));
945 
946  return peer->second;
947 }
dev::eth::TransactionQueue::topTransactions
Transactions topTransactions(unsigned _limit, h256Hash const &_avoid=h256Hash()) const
Definition: TransactionQueue.cpp:102
dev::eth::BlockChain::currentHash
h256 currentHash() const
Get a given block (RLP format). Thread-safe.
Definition: BlockChain.h:228
dev::eth::BlockQueue
A queue of blocks. Sits between network or other I/O and the BlockChain. Sorts them ready for blockch...
Definition: BlockQueue.h:224
dev::eth::ImportResult::AlreadyKnown
@ AlreadyKnown
dev::eth::TransactionQueue::knownTransactions
h256Hash knownTransactions() const
Definition: TransactionQueue.cpp:112
dev::eth::EthereumCapability::completeSync
void completeSync()
Don't sync further - used only in test mode.
Definition: EthereumCapability.cpp:433
dev::eth::EthereumHostDataFace
Definition: EthereumCapability.h:64
BlockQueue.h
dev::eth::ImportResult
ImportResult
Definition: Common.h:97
dev::eth::BlockChain::treeRoute
std::tuple< h256s, h256, unsigned > treeRoute(h256 const &_from, h256 const &_to, bool _common=true, bool _pre=true, bool _post=true) const
Definition: BlockChain.cpp:1116
dev::eth::EthereumCapability::backgroundWorkInterval
std::chrono::milliseconds backgroundWorkInterval() const override
Definition: EthereumCapability.cpp:402
dev::eth::SyncStatus
Definition: CommonNet.h:96
dev::sha3
bool sha3(bytesConstRef _input, bytesRef o_output) noexcept
Definition: SHA3.cpp:28
dev::eth::BlockChain::genesisHash
h256 genesisHash() const
Get the hash of the genesis block. Thread-safe.
Definition: BlockChain.h:231
dev::eth::ReceiptsPacket
@ ReceiptsPacket
Definition: CommonNet.h:67
dev::eth::TransactionsPacket
@ TransactionsPacket
Definition: CommonNet.h:57
dev::eth::BlockChain::isKnown
bool isKnown(h256 const &_hash, bool _isCurrent=true) const
Returns true if the given block is known (though not necessarily a part of the canon chain).
Definition: BlockChain.cpp:1404
dev::h256
FixedHash< 32 > h256
Definition: FixedHash.h:356
dev::eth::EthereumCapability::doBackgroundWork
void doBackgroundWork() override
Definition: EthereumCapability.cpp:852
dev::eth::EthereumCapability::interpretCapabilityPacket
bool interpretCapabilityPacket(NodeID const &_peerID, unsigned _id, RLP const &_r) override
Definition: EthereumCapability.cpp:642
dev::RLPStream::out
bytes const & out() const
Read the byte stream.
Definition: RLP.h:419
dev::diff
N diff(N const &_a, N const &_b)
Definition: Common.h:193
Exceptions.h
dev::eth::TransactionQueue::onImport
Handler< ImportResult, h256 const &, h512 const & > onImport(T const &_t)
Register a handler that will be called once asynchronous verification is comeplte an transaction has ...
Definition: TransactionQueue.h:122
dev::eth::BlockChain::number
unsigned number(h256 const &_hash) const
Get a number for the given hash (or the most recent mined if none given). Thread-safe.
Definition: BlockChain.h:224
dev::eth
Definition: BasicAuthority.h:32
dev::eth::EthereumCapability::peer
EthereumPeer const & peer(NodeID const &_peerID) const
Definition: EthereumCapability.cpp:935
dev::toString
std::string toString(std::chrono::time_point< T > const &_e, std::string const &_format="%F %T")
Definition: CommonIO.h:86
dev::rlp
bytes rlp(_T _t)
Export a single item in RLP format, returning a byte array.
Definition: RLP.h:453
dev::eth::ImportResult::Malformed
@ Malformed
dev::FixedHash< 32 >
dev::eth::GetBlockHeadersPacket
@ GetBlockHeadersPacket
Definition: CommonNet.h:58
dev::eth::EthereumCapability::onDisconnect
void onDisconnect(NodeID const &_nodeID) override
Definition: EthereumCapability.cpp:634
dev::errinfo_nodeID
boost::error_info< struct tag_nodeID, h512 > errinfo_nodeID
Definition: Exceptions.h:97
dev::eth::EthereumPeerObserverFace
Definition: EthereumCapability.h:39
dev::eth::StatusPacket
@ StatusPacket
Definition: CommonNet.h:55
dev::Exception
Base class for all exceptions.
Definition: Exceptions.h:39
dev::eth::EthereumCapability::reset
void reset()
Definition: EthereumCapability.cpp:421
LOG
#define LOG
Definition: Log.h:63
dev::eth::BlockChain::details
BlockDetails details(h256 const &_hash) const
Get the familial details concerning a block (or the most recent mined if none given)....
Definition: BlockChain.h:157
dev::eth::EthereumPeer::setWaitingForTransactions
void setWaitingForTransactions(bool _value)
Definition: EthereumPeer.h:61
EthereumCapability.h
dev::RLP::size
size_t size() const
Definition: RLP.h:105
dev::eth::Asking::NodeData
@ NodeData
dev::eth::EthereumCapability::status
SyncStatus status() const
Definition: EthereumCapability.cpp:590
dev::eth::EthereumPeer
Definition: EthereumPeer.h:32
dev::h256s
std::vector< h256 > h256s
Definition: FixedHash.h:361
dev::Logger
boost::log::sources::severity_channel_logger<> Logger
Definition: Log.h:124
dev::eth::Asking::State
@ State
dev::eth::BlockDetails::totalDifficulty
u256 totalDifficulty
Definition: BlockDetails.h:52
dev::eth::EthereumPeer::isTransactionKnown
bool isTransactionKnown(h256 const &_hash) const
Definition: EthereumPeer.h:63
dev::eth::Asking::Receipts
@ Receipts
dev::eth::Asking
Asking
Definition: CommonNet.h:73
dev::eth::GetBlockBodiesPacket
@ GetBlockBodiesPacket
Definition: CommonNet.h:60
dev::eth::EthereumCapability::name
std::string name() const override
Definition: EthereumCapability.h:91
dev::bytes
std::vector< byte > bytes
Definition: Common.h:72
dev::createLogger
Logger createLogger(int _severity, std::string const &_channel)
Definition: Log.h:125
dev::eth::NodeID
p2p::NodeID NodeID
Definition: CommonNet.h:105
dev::eth::BlockChain
Implements the blockchain database. All data this gives is disk-backed. @threadsafe.
Definition: BlockChain.h:105
dev::eth::EthereumPeer::setLastAsk
void setLastAsk(time_t _lastAsk)
Definition: EthereumPeer.h:51
dev::eth::NewBlockPacket
@ NewBlockPacket
Definition: CommonNet.h:62
dev::OverlayDB
Definition: OverlayDB.h:34
BlockChain.h
dev::eth::TransactionQueue
A queue of Transactions, each stored as RLP. Maintains a transaction queue sorted by nonce diff and g...
Definition: TransactionQueue.h:45
dev::eth::NodeDataPacket
@ NodeDataPacket
Definition: CommonNet.h:65
dev::RLPStream
Class for writing to an RLP bytestream.
Definition: RLP.h:370
dev::eth::EthereumPeer::isWaitingForTransactions
bool isWaitingForTransactions() const
Definition: EthereumPeer.h:60
dev::eth::EthereumCapability::disablePeer
void disablePeer(NodeID const &_peerID, std::string const &_problem)
Definition: EthereumCapability.cpp:930
dev::eth::EthereumPeer::requestStatus
void requestStatus(u256 _hostNetworkId, u256 _chainTotalDifficulty, h256 _chainCurrentHash, h256 _chainGenesPeersh)
Definition: EthereumPeer.cpp:83
dev::VerbosityDebug
@ VerbosityDebug
Definition: Log.h:71
dev::eth::EthereumPeer::asking
Asking asking() const
Definition: EthereumPeer.h:53
dev::RLPStream::append
RLPStream & append(unsigned _s)
Append given datum to the byte stream.
Definition: RLP.h:381
dev::eth::EthereumPeer::setStatus
void setStatus(unsigned _protocolVersion, u256 const &_networkId, u256 const &_totalDifficulty, h256 const &_latestHash, h256 const &_genesisHash)
Definition: EthereumPeer.cpp:56
dev::eth::EthereumCapability::isSyncing
bool isSyncing() const
Definition: EthereumCapability.cpp:585
dev::strings
std::vector< std::string > strings
Definition: Common.h:143
dev::eth::EthereumPeer::markTransactionAsKnown
void markTransactionAsKnown(h256 const &_hash)
Definition: EthereumPeer.h:64
dev::bigint
boost::multiprecision::number< boost::multiprecision::cpp_int_backend<> > bigint
Definition: Common.h:118
dev::eth::EthereumPeer::isBlockKnown
bool isBlockKnown(h256 const &_hash) const
Definition: EthereumPeer.h:66
dev::FixedHash::size
@ size
Definition: FixedHash.h:53
dev::eth::BlockBodiesPacket
@ BlockBodiesPacket
Definition: CommonNet.h:61
dev::eth::EthereumCapability::networkId
u256 networkId() const
Definition: EthereumCapability.h:98
dev::eth::GetNodeDataPacket
@ GetNodeDataPacket
Definition: CommonNet.h:64
dev::RLP::itemCount
size_t itemCount() const
Definition: RLP.h:101
std
Definition: FixedHash.h:393
dev::eth::Success
@ Success
Definition: Common.h:223
BlockChainSync.h
dev::eth::BlockHeadersPacket
@ BlockHeadersPacket
Definition: CommonNet.h:59
dev::u256
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 256, 256, boost::multiprecision::unsigned_magnitude, boost::multiprecision::unchecked, void > > u256
Definition: Common.h:121
dev::RLP::toInt
_T toInt(int _flags=Strict) const
Converts to int of type given; if isData(), decodes as big-endian bytestream.
Definition: RLP.h:257
dev::eth::EthereumPeer::isConversing
bool isConversing() const
Definition: EthereumPeer.h:54
dev::eth::EthereumCapability
The EthereumCapability class.
Definition: EthereumCapability.h:85
dev::RLP::toHash
_N toHash(int _flags=Strict) const
Definition: RLP.h:288
dev::eth::BlockChain::block
bytes block(h256 const &_hash) const
Get a block (RLP format) for the given hash (or the most recent mined if none given)....
Definition: BlockChain.cpp:1423
dev::eth::EthereumCapability::onConnect
void onConnect(NodeID const &_nodeID, u256 const &_peerCapabilityVersion) override
Definition: EthereumCapability.cpp:624
dev
Definition: Address.cpp:21
dev::eth::NewBlockHashesPacket
@ NewBlockHashesPacket
Definition: CommonNet.h:56
dev::eth::GetReceiptsPacket
@ GetReceiptsPacket
Definition: CommonNet.h:66
dev::eth::Asking::BlockHeaders
@ BlockHeaders
cwarn
#define cwarn
dev::RLPStream::appendRaw
RLPStream & appendRaw(bytesConstRef _rlp, size_t _itemCount=1)
Appends raw (pre-serialised) RLP data. Use with caution.
Definition: RLP.cpp:222
dev::eth::EthereumPeer::lastAsk
time_t lastAsk() const
Definition: EthereumPeer.h:50
dev::eth::Asking::BlockBodies
@ BlockBodies
dev::RLP
Definition: RLP.h:48
dev::eth::Asking::Nothing
@ Nothing
TransactionQueue.h
dev::eth::BlockChainSync
Base BlockChain synchronization strategy class. Syncs to peers and keeps up to date....
Definition: BlockChainSync.h:32
dev::RLPStream::appendList
RLPStream & appendList(size_t _items)
Appends a list.
Definition: RLP.cpp:268