33 TransactionQueue::TransactionQueue(
unsigned _limit,
unsigned _futureLimit):
34 m_current(PriorityCompare { *
this }),
36 m_futureLimit(_futureLimit)
38 unsigned verifierThreads = std::max(thread::hardware_concurrency(), 3U) - 2U;
39 for (
unsigned i = 0; i < verifierThreads; ++i)
40 m_verifiers.emplace_back([=](){
41 setThreadName(
"txcheck" + toString(i));
50 m_queueReady.notify_all();
51 for (
auto& i: m_verifiers)
60 return import(t, _ik);
70 if (m_known.count(_h))
89 auto ir = check_WITH_LOCK(h, _ik);
96 ret = manageImport_WITH_LOCK(h, _transaction);
106 for (
auto t = m_current.begin(); ret.size() < _limit && t != m_current.end(); ++t)
107 if (!_avoid.count(t->transaction.sha3()))
108 ret.push_back(t->transaction);
122 assert(_h == _transaction.
sha3());
125 auto cs = m_currentByAddressAndNonce.find(_transaction.
from());
126 if (cs != m_currentByAddressAndNonce.end())
128 auto t = cs->second.find(_transaction.
nonce());
129 if (t != cs->second.end())
131 if (_transaction.
gasPrice() < (*t->second).transaction.gasPrice())
135 h256 dropped = (*t->second).transaction.sha3();
136 remove_WITH_LOCK(dropped);
137 m_onReplaced(dropped);
141 auto fs = m_future.find(_transaction.
from());
142 if (fs != m_future.end())
144 auto t = fs->second.find(_transaction.
nonce());
145 if (t != fs->second.end())
147 if (_transaction.
gasPrice() < t->second.transaction.gasPrice())
153 if (fs->second.empty())
159 insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
160 LOG(m_loggerDetail) <<
"Queued vaguely legit-looking transaction " << _h;
162 while (m_current.size() > m_limit)
164 LOG(m_loggerDetail) <<
"Dropping out of bounds transaction " << _h;
165 remove_WITH_LOCK(m_current.rbegin()->transaction.sha3());
172 LOG(m_loggerDetail) <<
"Ignoring invalid transaction: " << diagnostic_information(_e);
175 catch (std::exception
const& _e)
177 LOG(m_loggerDetail) <<
"Ignoring invalid transaction: " << _e.what();
187 return maxNonce_WITH_LOCK(_a);
190 u256 TransactionQueue::maxNonce_WITH_LOCK(
Address const& _a)
const
193 auto cs = m_currentByAddressAndNonce.find(_a);
194 if (cs != m_currentByAddressAndNonce.end() && !cs->second.empty())
195 ret = cs->second.rbegin()->first + 1;
196 auto fs = m_future.find(_a);
197 if (fs != m_future.end() && !fs->second.empty())
198 ret = std::max(ret, fs->second.rbegin()->first + 1);
202 void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction>
const& _p)
204 if (m_currentByHash.count(_p.first))
206 cwarn <<
"Transaction hash" << _p.first <<
"already in current?!";
212 auto inserted = m_currentByAddressAndNonce[t.
from()].insert(std::make_pair(t.
nonce(), PriorityQueue::iterator()));
213 PriorityQueue::iterator handle = m_current.emplace(VerifiedTransaction(t));
214 inserted.first->second = handle;
215 m_currentByHash[_p.first] = handle;
218 makeCurrent_WITH_LOCK(t);
219 m_known.insert(_p.first);
222 bool TransactionQueue::remove_WITH_LOCK(
h256 const& _txHash)
224 auto t = m_currentByHash.find(_txHash);
225 if (t == m_currentByHash.end())
228 Address from = (*t->second).transaction.from();
229 auto it = m_currentByAddressAndNonce.find(from);
230 assert (it != m_currentByAddressAndNonce.end());
231 it->second.erase((*t->second).transaction.nonce());
232 m_current.erase(t->second);
233 m_currentByHash.erase(t);
234 if (it->second.empty())
235 m_currentByAddressAndNonce.erase(it);
236 m_known.erase(_txHash);
244 auto cs = m_currentByAddressAndNonce.find(_a);
245 if (cs != m_currentByAddressAndNonce.end())
246 ret = cs->second.size();
247 auto fs = m_future.find(_a);
248 if (fs != m_future.end())
249 ret += fs->second.size();
256 auto it = m_currentByHash.find(_txHash);
257 if (it == m_currentByHash.end())
260 VerifiedTransaction
const& st = *(it->second);
262 Address from = st.transaction.from();
263 auto& queue = m_currentByAddressAndNonce[from];
264 auto& target = m_future[from];
265 auto cutoff = queue.lower_bound(st.transaction.nonce());
266 for (
auto m = cutoff; m != queue.end(); ++m)
268 VerifiedTransaction& t =
const_cast<VerifiedTransaction&
>(*(m->second));
269 m_currentByHash.erase(t.transaction.sha3());
270 target.emplace(t.transaction.nonce(), move(t));
271 m_current.erase(m->second);
274 queue.erase(cutoff, queue.end());
276 m_currentByAddressAndNonce.erase(from);
279 void TransactionQueue::makeCurrent_WITH_LOCK(
Transaction const& _t)
281 bool newCurrent =
false;
282 auto fs = m_future.find(_t.
from());
283 if (fs != m_future.end())
286 auto fb = fs->second.find(nonce);
287 if (fb != fs->second.end())
290 while (ft != fs->second.end() && ft->second.transaction.nonce() == nonce)
292 auto inserted = m_currentByAddressAndNonce[_t.
from()].insert(std::make_pair(ft->second.transaction.nonce(), PriorityQueue::iterator()));
293 PriorityQueue::iterator handle = m_current.emplace(move(ft->second));
294 inserted.first->second = handle;
295 m_currentByHash[(*handle).transaction.sha3()] = handle;
301 fs->second.erase(fb, ft);
302 if (fs->second.empty())
303 m_future.erase(_t.
from());
307 while (m_futureSize > m_futureLimit)
312 LOG(m_loggerDetail) <<
"Dropping out of bounds future transaction "
313 << m_future.begin()->second.rbegin()->second.transaction.sha3();
314 m_future.begin()->second.erase(--m_future.begin()->second.end());
315 if (m_future.begin()->second.empty())
316 m_future.erase(m_future.begin());
327 if (!m_known.count(_txHash))
331 m_dropped.insert(_txHash);
332 remove_WITH_LOCK(_txHash);
338 makeCurrent_WITH_LOCK(_t);
339 if (!m_known.count(_t.
sha3()))
341 remove_WITH_LOCK(_t.
sha3());
350 m_currentByAddressAndNonce.clear();
351 m_currentByHash.clear();
362 for (
unsigned i = 0; i < itemCount; ++i)
366 LOG(m_logger) <<
"Transaction verification queue is full. Dropping "
367 << itemCount - i <<
" transactions";
370 m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId));
375 m_queueReady.notify_all();
378 void TransactionQueue::verifierBody()
382 UnverifiedTransaction work;
385 unique_lock<Mutex> l(x_queue);
386 m_queueReady.wait(l, [&](){
return !m_unverified.empty() || m_aborting; });
389 work = move(m_unverified.front());
390 m_unverified.pop_front();
397 m_onImport(ir, t.
sha3(), work.nodeId);
402 cwarn <<
"Bad transaction:" << boost::current_exception_diagnostic_information();