9 #include <boost/fiber/all.hpp>
17 std::chrono::milliseconds constexpr WarpCapability::c_backgroundWorkInterval;
21 static size_t const c_freePeerBufferSize = 32;
23 bool validateManifest(RLP
const& _manifestRlp)
25 if (!_manifestRlp.isList() || _manifestRlp.itemCount() != 1)
28 RLP
const manifest = _manifestRlp[0];
30 u256 const version = manifest[0].toInt<
u256>();
34 h256 snapshotBlockHash(RLP
const& _manifestRlp)
36 RLP
const manifest = _manifestRlp[0];
37 return manifest[5].toHash<
h256>();
40 class WarpPeerObserver :
public WarpPeerObserverFace
43 WarpPeerObserver(WarpCapability& _host, BlockChain
const& _blockChain,
44 boost::filesystem::path
const& _snapshotPath)
46 m_hostProtocolVersion(_host.version()),
47 m_hostNetworkId(_host.networkId()),
48 m_hostGenesisHash(_blockChain.genesisHash()),
49 m_daoForkBlock(_blockChain.sealEngine()->chainParams().daoHardforkBlock),
50 m_freePeers(c_freePeerBufferSize),
51 m_snapshotDir(_snapshotPath)
56 m_downloadFiber->join();
59 void onPeerStatus(
NodeID const& _peerID)
override
61 boost::fibers::fiber checkPeerFiber(&WarpPeerObserver::validatePeer,
this, _peerID);
62 checkPeerFiber.detach();
66 m_downloadFiber.reset(
67 new boost::fibers::fiber(&WarpPeerObserver::downloadChunks,
this));
69 boost::this_fiber::yield();
72 void onPeerManifest(
NodeID const& _peerID, RLP
const& _r)
override
74 m_manifests[_peerID].set_value(_r.data().toBytes());
75 boost::this_fiber::yield();
78 void onPeerBlockHeaders(
NodeID const& _peerID, RLP
const& _r)
override
80 m_daoForkHeaders[_peerID].set_value(_r.data().toBytes());
81 boost::this_fiber::yield();
84 void onPeerData(
NodeID const& _peerID, RLP
const& _r)
override
86 if (!_r.isList() || _r.itemCount() != 1)
89 RLP
const data = _r[0];
91 h256 const hash =
sha3(data.toBytesConstRef());
93 auto it = m_requestedChunks.find(_peerID);
94 if (it == m_requestedChunks.end())
97 h256 const askedHash = it->second;
98 m_requestedChunks.erase(it);
100 if (hash == askedHash)
103 writeFile((boost::filesystem::path(m_snapshotDir) /
toHex(hash)).
string(),
104 data.toBytesConstRef());
106 LOG(m_logger) <<
"Saved chunk " << hash <<
" Chunks left: " << m_neededChunks.size()
107 <<
" Requested chunks: " << m_requestedChunks.size();
108 if (m_neededChunks.empty() && m_requestedChunks.empty())
109 LOG(m_logger) <<
"Snapshot download complete!";
112 m_neededChunks.push_back(askedHash);
114 m_freePeers.push(_peerID);
115 boost::this_fiber::yield();
118 void onPeerDisconnect(
NodeID const& _peerID,
Asking _asking)
override
122 auto it = m_manifests.find(_peerID);
123 if (it != m_manifests.end())
124 it->second.set_exception(std::make_exception_ptr(FailedToDownloadManifest()));
128 auto it = m_daoForkHeaders.find(_peerID);
129 if (it != m_daoForkHeaders.end())
130 it->second.set_exception(
131 std::make_exception_ptr(FailedToDownloadDaoForkBlockHeader()));
135 auto it = m_requestedChunks.find(_peerID);
136 if (it != m_requestedChunks.end())
138 m_neededChunks.push_back(it->second);
139 m_requestedChunks.erase(it);
142 boost::this_fiber::yield();
146 void validatePeer(
NodeID _peerID)
148 if (!m_host.validateStatus(
149 _peerID, m_hostGenesisHash, {m_hostProtocolVersion}, m_hostNetworkId))
152 m_host.requestManifest(_peerID);
154 bytes const manifestBytes = waitForManifestResponse(_peerID);
155 if (manifestBytes.empty())
158 RLP manifestRlp(manifestBytes);
159 if (!validateManifest(manifestRlp))
162 m_host.disablePeer(_peerID,
"Invalid snapshot manifest.");
166 u256 const snapshotHash = snapshotBlockHash(manifestRlp);
167 if (m_syncingSnapshotHash)
169 if (snapshotHash == m_syncingSnapshotHash)
170 m_freePeers.push(_peerID);
172 m_host.disablePeer(_peerID,
"Another snapshot.");
178 m_host.requestBlockHeaders(_peerID, m_daoForkBlock, 1, 0,
false);
180 bytes const headerBytes = waitForDaoForkBlockResponse(_peerID);
181 if (headerBytes.empty())
184 RLP headerRlp(headerBytes);
185 if (!verifyDaoChallengeResponse(headerRlp))
187 m_host.disablePeer(_peerID,
"Peer from another fork.");
192 m_syncingSnapshotHash = snapshotHash;
193 m_manifest.set_value(manifestBytes);
194 m_freePeers.push(_peerID);
198 bytes waitForManifestResponse(
NodeID const& _peerID)
202 bytes const result = m_manifests[_peerID].get_future().get();
203 m_manifests.erase(_peerID);
206 catch (Exception
const&)
208 m_manifests.erase(_peerID);
213 bytes waitForDaoForkBlockResponse(
NodeID const& _peerID)
217 bytes const result = m_daoForkHeaders[_peerID].get_future().get();
218 m_daoForkHeaders.erase(_peerID);
221 catch (Exception
const&)
223 m_daoForkHeaders.erase(_peerID);
228 bool verifyDaoChallengeResponse(RLP
const& _r)
230 if (_r.itemCount() != 1)
234 return info.number() == m_daoForkBlock &&
235 info.extraData() ==
fromHex(
"0x64616f2d686172642d666f726b");
238 void downloadChunks()
240 bytes const manifestBytes = m_manifest.get_future().get();
242 RLP manifestRlp(manifestBytes);
243 RLP manifest(manifestRlp[0]);
245 u256 const version = manifest[0].toInt<
u256>();
246 h256s const stateHashes = manifest[1].toVector<
h256>();
247 h256s const blockHashes = manifest[2].toVector<
h256>();
248 h256 const stateRoot = manifest[3].toHash<
h256>();
249 u256 const blockNumber = manifest[4].toInt<
u256>();
250 h256 const blockHash = manifest[5].toHash<
h256>();
252 LOG(m_logger) <<
"MANIFEST: "
253 <<
"version " << version <<
" state root " << stateRoot <<
" block number "
254 << blockNumber <<
" block hash " << blockHash;
257 writeFile((boost::filesystem::path(m_snapshotDir) /
"MANIFEST").
string(), manifest.data());
259 m_neededChunks.assign(stateHashes.begin(), stateHashes.end());
260 m_neededChunks.insert(m_neededChunks.end(), blockHashes.begin(), blockHashes.end());
262 while (!m_neededChunks.empty())
264 h256 const chunkHash(m_neededChunks.front());
269 peerID = m_freePeers.value_pop();
270 }
while (!m_host.requestData(peerID, chunkHash));
272 LOG(m_logger) <<
"Requested chunk " << chunkHash;
274 m_requestedChunks[peerID] = chunkHash;
275 m_neededChunks.pop_front();
279 WarpCapability& m_host;
280 unsigned const m_hostProtocolVersion;
281 u256 const m_hostNetworkId;
282 h256 const m_hostGenesisHash;
283 unsigned const m_daoForkBlock;
284 boost::fibers::promise<bytes> m_manifest;
285 h256 m_syncingSnapshotHash;
286 std::deque<h256> m_neededChunks;
287 boost::fibers::buffered_channel<NodeID> m_freePeers;
288 boost::filesystem::path
const m_snapshotDir;
289 std::map<NodeID, boost::fibers::promise<bytes>> m_manifests;
290 std::map<NodeID, boost::fibers::promise<bytes>> m_daoForkHeaders;
291 std::map<NodeID, h256> m_requestedChunks;
293 std::unique_ptr<boost::fibers::fiber> m_downloadFiber;
303 boost::filesystem::path
const& _snapshotDownloadPath,
304 std::shared_ptr<SnapshotStorageFace> _snapshotStorage)
305 : m_host(
std::move(_host)),
306 m_blockChain(_blockChain),
307 m_networkId(_networkId),
308 m_snapshot(_snapshotStorage),
311 _snapshotDownloadPath.empty() ? nullptr : createPeerObserver(_snapshotDownloadPath))
317 return c_backgroundWorkInterval;
320 std::shared_ptr<WarpPeerObserverFace> WarpCapability::createPeerObserver(
321 boost::filesystem::path
const& _snapshotDownloadPath)
323 return std::make_shared<WarpPeerObserver>(*
this, m_blockChain, _snapshotDownloadPath);
330 u256 snapshotBlockNumber;
331 h256 snapshotBlockHash;
334 bytes const snapshotManifest(m_snapshot->readManifest());
335 RLP manifest(snapshotManifest);
337 BOOST_THROW_EXCEPTION(InvalidSnapshotManifest());
344 m_blockChain.
genesisHash(), snapshotBlockHash, snapshotBlockNumber);
349 auto& peerStatus = m_peers[_peerID];
350 peerStatus.m_lastAsk = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
359 BOOST_THROW_EXCEPTION(InvalidWarpStatusPacket());
365 peerStatus.m_protocolVersion = _r[0].
toInt<
unsigned>();
366 peerStatus.m_networkId = _r[1].
toInt<
u256>();
367 peerStatus.m_totalDifficulty = _r[2].
toInt<
u256>();
368 peerStatus.m_latestHash = _r[3].
toHash<
h256>();
369 peerStatus.m_genesisHash = _r[4].
toHash<
h256>();
370 peerStatus.m_snapshotHash = _r[5].
toHash<
h256>();
371 peerStatus.m_snapshotNumber = _r[6].
toInt<
u256>();
373 cnetlog <<
"Status: "
374 <<
" protocol version " << peerStatus.m_protocolVersion <<
" networkId "
375 << peerStatus.m_networkId <<
" genesis hash " << peerStatus.m_genesisHash
376 <<
" total difficulty " << peerStatus.m_totalDifficulty <<
" latest hash "
377 << peerStatus.m_latestHash <<
" snapshot hash " << peerStatus.m_snapshotHash
378 <<
" snapshot number " << peerStatus.m_snapshotNumber;
380 m_peerObserver->onPeerStatus(_peerID);
390 .appendRaw(m_snapshot->readManifest());
391 m_host->sealAndSend(_peerID, s);
403 .append(m_snapshot->readCompressedChunk(chunkHash));
404 m_host->sealAndSend(_peerID, s);
412 m_host->sealAndSend(_peerID, s);
418 m_peerObserver->onPeerBlockHeaders(_peerID, _r);
424 m_peerObserver->onPeerManifest(_peerID, _r);
430 m_peerObserver->onPeerData(_peerID, _r);
439 cnetlog <<
"Warp Peer causing an Exception: "
440 << boost::current_exception_diagnostic_information() <<
" " << _r;
442 catch (std::exception
const& _e)
444 cnetlog <<
"Warp Peer causing an exception: " << _e.what() <<
" " << _r;
452 m_peerObserver->onPeerDisconnect(_peerID, m_peers[_peerID].m_asking);
453 m_peers.erase(_peerID);
458 for (
auto const& peer : m_peers)
460 time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
461 auto const& status = peer.second;
465 m_host->disconnect(peer.first, p2p::PingTimeout);
471 u256 const& _hostNetworkId,
u256 const& _chainTotalDifficulty,
h256 const& _chainCurrentHash,
472 h256 const& _chainGenesisHash,
h256 const& _snapshotBlockHash,
u256 const& _snapshotBlockNumber)
476 << _hostProtocolVersion << _hostNetworkId << _chainTotalDifficulty << _chainCurrentHash
477 << _chainGenesisHash << _snapshotBlockHash << _snapshotBlockNumber;
478 m_host->sealAndSend(_peerID, s);
483 NodeID const& _peerID,
unsigned _startNumber,
unsigned _count,
unsigned _skip,
bool _reverse)
485 auto itPeerStatus = m_peers.find(_peerID);
486 if (itPeerStatus == m_peers.end())
493 << _startNumber << _count << _skip << (_reverse ? 1 : 0);
494 m_host->sealAndSend(_peerID, s);
499 auto itPeerStatus = m_peers.find(_peerID);
500 if (itPeerStatus == m_peers.end())
507 m_host->sealAndSend(_peerID, s);
512 auto itPeerStatus = m_peers.find(_peerID);
513 if (itPeerStatus == m_peers.end())
521 m_host->sealAndSend(_peerID, s);
525 void WarpCapability::setAsking(
NodeID const& _peerID,
Asking _a)
527 auto itPeerStatus = m_peers.find(_peerID);
528 if (itPeerStatus == m_peers.end())
531 auto& peerStatus = itPeerStatus->second;
533 peerStatus.m_asking = _a;
534 peerStatus.m_lastAsk = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
539 std::vector<unsigned>
const& _protocolVersions,
u256 const& _networkId)
541 auto itPeerStatus = m_peers.find(_peerID);
542 if (itPeerStatus == m_peers.end())
545 auto const& peerStatus = itPeerStatus->second;
547 if (peerStatus.m_genesisHash != _genesisHash)
552 if (find(_protocolVersions.begin(), _protocolVersions.end(), peerStatus.m_protocolVersion) ==
553 _protocolVersions.end())
558 if (peerStatus.m_networkId != _networkId)
560 disablePeer(_peerID,
"Invalid network identifier.");
565 disablePeer(_peerID,
"Peer banned for unexpected status message.");
574 m_host->disableCapability(_peerID,
name(), _problem);