Ethereum  PoC-8
The C++ Implementation of Ethereum
TransactionQueue.cpp
Go to the documentation of this file.
1 /*
2  This file is part of cpp-ethereum.
3 
4  cpp-ethereum is free software: you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation, either version 3 of the License, or
7  (at your option) any later version.
8 
9  cpp-ethereum is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
16 */
22 #include "TransactionQueue.h"
23 
24 #include <libdevcore/Log.h>
25 #include <libethcore/Exceptions.h>
26 #include "Transaction.h"
27 using namespace std;
28 using namespace dev;
29 using namespace dev::eth;
30 
31 const size_t c_maxVerificationQueueSize = 8192;
32 
33 TransactionQueue::TransactionQueue(unsigned _limit, unsigned _futureLimit):
34  m_current(PriorityCompare { *this }),
35  m_limit(_limit),
36  m_futureLimit(_futureLimit)
37 {
38  unsigned verifierThreads = std::max(thread::hardware_concurrency(), 3U) - 2U;
39  for (unsigned i = 0; i < verifierThreads; ++i)
40  m_verifiers.emplace_back([=](){
41  setThreadName("txcheck" + toString(i));
42  this->verifierBody();
43  });
44 }
45 
47 {
48  DEV_GUARDED(x_queue)
49  m_aborting = true;
50  m_queueReady.notify_all();
51  for (auto& i: m_verifiers)
52  i.join();
53 }
54 
56 {
57  try
58  {
60  return import(t, _ik);
61  }
62  catch (Exception const&)
63  {
65  }
66 }
67 
68 ImportResult TransactionQueue::check_WITH_LOCK(h256 const& _h, IfDropped _ik)
69 {
70  if (m_known.count(_h))
72 
73  if (m_dropped.count(_h) && _ik == IfDropped::Ignore)
75 
76  return ImportResult::Success;
77 }
78 
80 {
81  if (_transaction.hasZeroSignature())
83  // Check if we already know this transaction.
84  h256 h = _transaction.sha3(WithSignature);
85 
86  ImportResult ret;
87  {
88  UpgradableGuard l(m_lock);
89  auto ir = check_WITH_LOCK(h, _ik);
90  if (ir != ImportResult::Success)
91  return ir;
92 
93  {
94  _transaction.safeSender(); // Perform EC recovery outside of the write lock
95  UpgradeGuard ul(l);
96  ret = manageImport_WITH_LOCK(h, _transaction);
97  }
98  }
99  return ret;
100 }
101 
102 Transactions TransactionQueue::topTransactions(unsigned _limit, h256Hash const& _avoid) const
103 {
104  ReadGuard l(m_lock);
105  Transactions ret;
106  for (auto t = m_current.begin(); ret.size() < _limit && t != m_current.end(); ++t)
107  if (!_avoid.count(t->transaction.sha3()))
108  ret.push_back(t->transaction);
109  return ret;
110 }
111 
113 {
114  ReadGuard l(m_lock);
115  return m_known;
116 }
117 
118 ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction)
119 {
120  try
121  {
122  assert(_h == _transaction.sha3());
123  // Remove any prior transaction with the same nonce but a lower gas price.
124  // Bomb out if there's a prior transaction with higher gas price.
125  auto cs = m_currentByAddressAndNonce.find(_transaction.from());
126  if (cs != m_currentByAddressAndNonce.end())
127  {
128  auto t = cs->second.find(_transaction.nonce());
129  if (t != cs->second.end())
130  {
131  if (_transaction.gasPrice() < (*t->second).transaction.gasPrice())
133  else
134  {
135  h256 dropped = (*t->second).transaction.sha3();
136  remove_WITH_LOCK(dropped);
137  m_onReplaced(dropped);
138  }
139  }
140  }
141  auto fs = m_future.find(_transaction.from());
142  if (fs != m_future.end())
143  {
144  auto t = fs->second.find(_transaction.nonce());
145  if (t != fs->second.end())
146  {
147  if (_transaction.gasPrice() < t->second.transaction.gasPrice())
149  else
150  {
151  fs->second.erase(t);
152  --m_futureSize;
153  if (fs->second.empty())
154  m_future.erase(fs);
155  }
156  }
157  }
158  // If valid, append to transactions.
159  insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
160  LOG(m_loggerDetail) << "Queued vaguely legit-looking transaction " << _h;
161 
162  while (m_current.size() > m_limit)
163  {
164  LOG(m_loggerDetail) << "Dropping out of bounds transaction " << _h;
165  remove_WITH_LOCK(m_current.rbegin()->transaction.sha3());
166  }
167 
168  m_onReady();
169  }
170  catch (Exception const& _e)
171  {
172  LOG(m_loggerDetail) << "Ignoring invalid transaction: " << diagnostic_information(_e);
174  }
175  catch (std::exception const& _e)
176  {
177  LOG(m_loggerDetail) << "Ignoring invalid transaction: " << _e.what();
179  }
180 
181  return ImportResult::Success;
182 }
183 
185 {
186  ReadGuard l(m_lock);
187  return maxNonce_WITH_LOCK(_a);
188 }
189 
190 u256 TransactionQueue::maxNonce_WITH_LOCK(Address const& _a) const
191 {
192  u256 ret = 0;
193  auto cs = m_currentByAddressAndNonce.find(_a);
194  if (cs != m_currentByAddressAndNonce.end() && !cs->second.empty())
195  ret = cs->second.rbegin()->first + 1;
196  auto fs = m_future.find(_a);
197  if (fs != m_future.end() && !fs->second.empty())
198  ret = std::max(ret, fs->second.rbegin()->first + 1);
199  return ret;
200 }
201 
202 void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p)
203 {
204  if (m_currentByHash.count(_p.first))
205  {
206  cwarn << "Transaction hash" << _p.first << "already in current?!";
207  return;
208  }
209 
210  Transaction const& t = _p.second;
211  // Insert into current
212  auto inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(t.nonce(), PriorityQueue::iterator()));
213  PriorityQueue::iterator handle = m_current.emplace(VerifiedTransaction(t));
214  inserted.first->second = handle;
215  m_currentByHash[_p.first] = handle;
216 
217  // Move following transactions from future to current
218  makeCurrent_WITH_LOCK(t);
219  m_known.insert(_p.first);
220 }
221 
222 bool TransactionQueue::remove_WITH_LOCK(h256 const& _txHash)
223 {
224  auto t = m_currentByHash.find(_txHash);
225  if (t == m_currentByHash.end())
226  return false;
227 
228  Address from = (*t->second).transaction.from();
229  auto it = m_currentByAddressAndNonce.find(from);
230  assert (it != m_currentByAddressAndNonce.end());
231  it->second.erase((*t->second).transaction.nonce());
232  m_current.erase(t->second);
233  m_currentByHash.erase(t);
234  if (it->second.empty())
235  m_currentByAddressAndNonce.erase(it);
236  m_known.erase(_txHash);
237  return true;
238 }
239 
240 unsigned TransactionQueue::waiting(Address const& _a) const
241 {
242  ReadGuard l(m_lock);
243  unsigned ret = 0;
244  auto cs = m_currentByAddressAndNonce.find(_a);
245  if (cs != m_currentByAddressAndNonce.end())
246  ret = cs->second.size();
247  auto fs = m_future.find(_a);
248  if (fs != m_future.end())
249  ret += fs->second.size();
250  return ret;
251 }
252 
253 void TransactionQueue::setFuture(h256 const& _txHash)
254 {
255  WriteGuard l(m_lock);
256  auto it = m_currentByHash.find(_txHash);
257  if (it == m_currentByHash.end())
258  return;
259 
260  VerifiedTransaction const& st = *(it->second);
261 
262  Address from = st.transaction.from();
263  auto& queue = m_currentByAddressAndNonce[from];
264  auto& target = m_future[from];
265  auto cutoff = queue.lower_bound(st.transaction.nonce());
266  for (auto m = cutoff; m != queue.end(); ++m)
267  {
268  VerifiedTransaction& t = const_cast<VerifiedTransaction&>(*(m->second)); // set has only const iterators. Since we are moving out of container that's fine
269  m_currentByHash.erase(t.transaction.sha3());
270  target.emplace(t.transaction.nonce(), move(t));
271  m_current.erase(m->second);
272  ++m_futureSize;
273  }
274  queue.erase(cutoff, queue.end());
275  if (queue.empty())
276  m_currentByAddressAndNonce.erase(from);
277 }
278 
279 void TransactionQueue::makeCurrent_WITH_LOCK(Transaction const& _t)
280 {
281  bool newCurrent = false;
282  auto fs = m_future.find(_t.from());
283  if (fs != m_future.end())
284  {
285  u256 nonce = _t.nonce() + 1;
286  auto fb = fs->second.find(nonce);
287  if (fb != fs->second.end())
288  {
289  auto ft = fb;
290  while (ft != fs->second.end() && ft->second.transaction.nonce() == nonce)
291  {
292  auto inserted = m_currentByAddressAndNonce[_t.from()].insert(std::make_pair(ft->second.transaction.nonce(), PriorityQueue::iterator()));
293  PriorityQueue::iterator handle = m_current.emplace(move(ft->second));
294  inserted.first->second = handle;
295  m_currentByHash[(*handle).transaction.sha3()] = handle;
296  --m_futureSize;
297  ++ft;
298  ++nonce;
299  newCurrent = true;
300  }
301  fs->second.erase(fb, ft);
302  if (fs->second.empty())
303  m_future.erase(_t.from());
304  }
305  }
306 
307  while (m_futureSize > m_futureLimit)
308  {
309  // TODO: priority queue for future transactions
310  // For now just drop random chain end
311  --m_futureSize;
312  LOG(m_loggerDetail) << "Dropping out of bounds future transaction "
313  << m_future.begin()->second.rbegin()->second.transaction.sha3();
314  m_future.begin()->second.erase(--m_future.begin()->second.end());
315  if (m_future.begin()->second.empty())
316  m_future.erase(m_future.begin());
317  }
318 
319  if (newCurrent)
320  m_onReady();
321 }
322 
323 void TransactionQueue::drop(h256 const& _txHash)
324 {
325  UpgradableGuard l(m_lock);
326 
327  if (!m_known.count(_txHash))
328  return;
329 
330  UpgradeGuard ul(l);
331  m_dropped.insert(_txHash);
332  remove_WITH_LOCK(_txHash);
333 }
334 
336 {
337  WriteGuard l(m_lock);
338  makeCurrent_WITH_LOCK(_t);
339  if (!m_known.count(_t.sha3()))
340  return;
341  remove_WITH_LOCK(_t.sha3());
342 }
343 
345 {
346  WriteGuard l(m_lock);
347  m_known.clear();
348  m_current.clear();
349  m_dropped.clear();
350  m_currentByAddressAndNonce.clear();
351  m_currentByHash.clear();
352  m_future.clear();
353  m_futureSize = 0;
354 }
355 
356 void TransactionQueue::enqueue(RLP const& _data, h512 const& _nodeId)
357 {
358  bool queued = false;
359  {
360  Guard l(x_queue);
361  unsigned itemCount = _data.itemCount();
362  for (unsigned i = 0; i < itemCount; ++i)
363  {
364  if (m_unverified.size() >= c_maxVerificationQueueSize)
365  {
366  LOG(m_logger) << "Transaction verification queue is full. Dropping "
367  << itemCount - i << " transactions";
368  break;
369  }
370  m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId));
371  queued = true;
372  }
373  }
374  if (queued)
375  m_queueReady.notify_all();
376 }
377 
378 void TransactionQueue::verifierBody()
379 {
380  while (!m_aborting)
381  {
382  UnverifiedTransaction work;
383 
384  {
385  unique_lock<Mutex> l(x_queue);
386  m_queueReady.wait(l, [&](){ return !m_unverified.empty() || m_aborting; });
387  if (m_aborting)
388  return;
389  work = move(m_unverified.front());
390  m_unverified.pop_front();
391  }
392 
393  try
394  {
395  Transaction t(work.transaction, CheckTransaction::Cheap); //Signature will be checked later
396  ImportResult ir = import(t);
397  m_onImport(ir, t.sha3(), work.nodeId);
398  }
399  catch (...)
400  {
401  // should not happen as exceptions are handled in import.
402  cwarn << "Bad transaction:" << boost::current_exception_diagnostic_information();
403  }
404  }
405 }
dev::eth::TransactionQueue::topTransactions
Transactions topTransactions(unsigned _limit, h256Hash const &_avoid=h256Hash()) const
Definition: TransactionQueue.cpp:102
dev::eth::ImportResult::AlreadyKnown
@ AlreadyKnown
dev::eth::TransactionQueue::knownTransactions
h256Hash knownTransactions() const
Definition: TransactionQueue.cpp:112
dev::eth::ImportResult
ImportResult
Definition: Common.h:97
dev::eth::Transaction
Encodes a transaction, ready to be exported to or freshly imported from RLP.
Definition: Transaction.h:86
dev::vector_ref< byte const >
dev::UpgradableGuard
boost::upgrade_lock< boost::shared_mutex > UpgradableGuard
Definition: Guards.h:45
dev::eth::Transactions
std::vector< Transaction > Transactions
Nice name for vector of Transaction.
Definition: Transaction.h:122
dev::eth::TransactionQueue::enqueue
void enqueue(RLP const &_data, h512 const &_nodeId)
Definition: TransactionQueue.cpp:356
dev::eth::CheckTransaction::Cheap
@ Cheap
dev::Guard
std::lock_guard< std::mutex > Guard
Definition: Guards.h:41
dev::eth::CheckTransaction::Everything
@ Everything
Exceptions.h
dev::eth::TransactionBase::from
Address from() const
Synonym for safeSender().
Definition: TransactionBase.h:130
dev::eth
Definition: BasicAuthority.h:32
dev::eth::ImportResult::Malformed
@ Malformed
dev::FixedHash< 32 >
DEV_GUARDED
#define DEV_GUARDED(MUTEX)
Simple block guard. The expression/block following is guarded though the given mutex....
Definition: Guards.h:132
dev::eth::TransactionQueue::dropGood
void dropGood(Transaction const &_t)
Definition: TransactionQueue.cpp:335
dev::Exception
Base class for all exceptions.
Definition: Exceptions.h:39
LOG
#define LOG
Definition: Log.h:63
dev::WriteGuard
boost::unique_lock< boost::shared_mutex > WriteGuard
Definition: Guards.h:47
dev::eth::WithSignature
@ WithSignature
Do include a signature.
Definition: TransactionBase.h:38
dev::h256Hash
std::unordered_set< h256 > h256Hash
Definition: FixedHash.h:365
dev::eth::ImportResult::AlreadyInChain
@ AlreadyInChain
c_maxVerificationQueueSize
const size_t c_maxVerificationQueueSize
Definition: TransactionQueue.cpp:31
dev::eth::IfDropped
IfDropped
Import transaction policy.
Definition: Common.h:215
dev::eth::TransactionBase::hasZeroSignature
bool hasZeroSignature() const
Definition: TransactionBase.h:145
dev::eth::TransactionBase::safeSender
Address const & safeSender() const noexcept
Like sender() but will never throw.
Definition: TransactionBase.cpp:104
Transaction.h
dev::ReadGuard
boost::shared_lock< boost::shared_mutex > ReadGuard
Definition: Guards.h:44
dev::eth::TransactionQueue::~TransactionQueue
~TransactionQueue()
Definition: TransactionQueue.cpp:46
dev::eth::TransactionBase::gasPrice
u256 gasPrice() const
Definition: TransactionBase.h:118
dev::eth::TransactionBase::sha3
h256 sha3(IncludeSignature _sig=WithSignature) const
Definition: TransactionBase.cpp:212
dev::eth::TransactionQueue::setFuture
void setFuture(h256 const &_t)
Definition: TransactionQueue.cpp:253
dev::UpgradeGuard
boost::upgrade_to_unique_lock< boost::shared_mutex > UpgradeGuard
Definition: Guards.h:46
dev::eth::TransactionQueue::clear
void clear()
Clear the queue.
Definition: TransactionQueue.cpp:344
dev::eth::TransactionQueue::maxNonce
u256 maxNonce(Address const &_a) const
Definition: TransactionQueue.cpp:184
dev::RLP::itemCount
size_t itemCount() const
Definition: RLP.h:101
std
Definition: FixedHash.h:393
dev::eth::Success
@ Success
Definition: Common.h:223
dev::eth::ImportResult::OverbidGasPrice
@ OverbidGasPrice
dev::u256
boost::multiprecision::number< boost::multiprecision::cpp_int_backend< 256, 256, boost::multiprecision::unsigned_magnitude, boost::multiprecision::unchecked, void > > u256
Definition: Common.h:121
dev::eth::IfDropped::Ignore
@ Ignore
Don't import transaction that was previously dropped.
dev
Definition: Address.cpp:21
dev::eth::TransactionBase::nonce
u256 nonce() const
Definition: TransactionBase.h:136
dev::eth::ImportResult::ZeroSignature
@ ZeroSignature
dev::eth::TransactionQueue::waiting
unsigned waiting(Address const &_a) const
Definition: TransactionQueue.cpp:240
cwarn
#define cwarn
dev::eth::TransactionQueue::drop
void drop(h256 const &_txHash)
Definition: TransactionQueue.cpp:323
dev::RLP
Definition: RLP.h:48
dev::eth::TransactionQueue::import
ImportResult import(bytes const &_tx, IfDropped _ik=IfDropped::Ignore)
Definition: TransactionQueue.h:64
Log.h
TransactionQueue.h