Bitcoin ABC  0.25.11
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2018 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_CHECKQUEUE_H
6 #define BITCOIN_CHECKQUEUE_H
7 
8 #include <sync.h>
9 #include <tinyformat.h>
10 #include <util/threadnames.h>
11 
12 #include <algorithm>
13 #include <vector>
14 
15 template <typename T> class CCheckQueueControl;
16 
27 template <typename T> class CCheckQueue {
28 private:
31 
33  std::condition_variable m_worker_cv;
34 
36  std::condition_variable m_master_cv;
37 
40  std::vector<T> queue GUARDED_BY(m_mutex);
41 
43  int nIdle GUARDED_BY(m_mutex){0};
44 
46  int nTotal GUARDED_BY(m_mutex){0};
47 
49  bool fAllOk GUARDED_BY(m_mutex){true};
50 
56  unsigned int nTodo GUARDED_BY(m_mutex){0};
57 
59  const unsigned int nBatchSize;
60 
61  std::vector<std::thread> m_worker_threads;
62  bool m_request_stop GUARDED_BY(m_mutex){false};
63 
65  bool Loop(bool fMaster) {
66  std::condition_variable &cond = fMaster ? m_master_cv : m_worker_cv;
67  std::vector<T> vChecks;
68  vChecks.reserve(nBatchSize);
69  unsigned int nNow = 0;
70  bool fOk = true;
71  do {
72  {
73  WAIT_LOCK(m_mutex, lock);
74  // first do the clean-up of the previous loop run (allowing us
75  // to do it in the same critsect)
76  if (nNow) {
77  fAllOk &= fOk;
78  nTodo -= nNow;
79  if (nTodo == 0 && !fMaster) {
80  // We processed the last element; inform the master it
81  // can exit and return the result
82  m_master_cv.notify_one();
83  }
84  } else {
85  // first iteration
86  nTotal++;
87  }
88  // logically, the do loop starts here
89  while (queue.empty() && !m_request_stop) {
90  if (fMaster && nTodo == 0) {
91  nTotal--;
92  bool fRet = fAllOk;
93  // reset the status for new work later
94  fAllOk = true;
95  // return the current status
96  return fRet;
97  }
98  nIdle++;
99  cond.wait(lock); // wait
100  nIdle--;
101  }
102  if (m_request_stop) {
103  return false;
104  }
105 
106  // Decide how many work units to process now.
107  // * Do not try to do everything at once, but aim for
108  // increasingly smaller batches so all workers finish
109  // approximately simultaneously.
110  // * Try to account for idle jobs which will instantly start
111  // helping.
112  // * Don't do batches smaller than 1 (duh), or larger than
113  // nBatchSize.
114  nNow = std::max(
115  1U, std::min(nBatchSize, (unsigned int)queue.size() /
116  (nTotal + nIdle + 1)));
117  vChecks.resize(nNow);
118  for (unsigned int i = 0; i < nNow; i++) {
119  // We want the lock on the m_mutex to be as short as
120  // possible, so swap jobs from the global queue to the local
121  // batch vector instead of copying.
122  vChecks[i].swap(queue.back());
123  queue.pop_back();
124  }
125  // Check whether we need to do work at all
126  fOk = fAllOk;
127  }
128  // execute work
129  for (T &check : vChecks) {
130  if (fOk) {
131  fOk = check();
132  }
133  }
134  vChecks.clear();
135  } while (true);
136  }
137 
138 public:
141 
143  explicit CCheckQueue(unsigned int nBatchSizeIn)
144  : nBatchSize(nBatchSizeIn) {}
145 
147  void StartWorkerThreads(const int threads_num) {
148  {
149  LOCK(m_mutex);
150  nIdle = 0;
151  nTotal = 0;
152  fAllOk = true;
153  }
154  assert(m_worker_threads.empty());
155  for (int n = 0; n < threads_num; ++n) {
156  m_worker_threads.emplace_back([this, n]() {
157  util::ThreadRename(strprintf("scriptch.%i", n));
158  Loop(false /* worker thread */);
159  });
160  }
161  }
162 
165  bool Wait() { return Loop(true /* master thread */); }
166 
168  void Add(std::vector<T> &vChecks) {
169  LOCK(m_mutex);
170  for (T &check : vChecks) {
171  queue.push_back(T());
172  check.swap(queue.back());
173  }
174  nTodo += vChecks.size();
175  if (vChecks.size() == 1) {
176  m_worker_cv.notify_one();
177  } else if (vChecks.size() > 1) {
178  m_worker_cv.notify_all();
179  }
180  }
181 
184  WITH_LOCK(m_mutex, m_request_stop = true);
185  m_worker_cv.notify_all();
186  for (std::thread &t : m_worker_threads) {
187  t.join();
188  }
189  m_worker_threads.clear();
190  WITH_LOCK(m_mutex, m_request_stop = false);
191  }
192 
194 };
195 
200 template <typename T> class CCheckQueueControl {
201 private:
203  bool fDone;
204 
205 public:
206  CCheckQueueControl() = delete;
207  CCheckQueueControl(const CCheckQueueControl &) = delete;
209  explicit CCheckQueueControl(CCheckQueue<T> *const pqueueIn)
210  : pqueue(pqueueIn), fDone(false) {
211  // passed queue is supposed to be unused, or nullptr
212  if (pqueue != nullptr) {
213  ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
214  }
215  }
216 
217  bool Wait() {
218  if (pqueue == nullptr) {
219  return true;
220  }
221  bool fRet = pqueue->Wait();
222  fDone = true;
223  return fRet;
224  }
225 
226  void Add(std::vector<T> &vChecks) {
227  if (pqueue != nullptr) {
228  pqueue->Add(vChecks);
229  }
230  }
231 
233  if (!fDone) {
234  Wait();
235  }
236  if (pqueue != nullptr) {
237  LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
238  }
239  }
240 };
241 
242 #endif // BITCOIN_CHECKQUEUE_H
assert
assert(!tx.IsCoinBase())
CCheckQueue::StopWorkerThreads
void StopWorkerThreads()
Stop all of the worker threads.
Definition: checkqueue.h:183
CCheckQueue::nBatchSize
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:56
CCheckQueue::~CCheckQueue
~CCheckQueue()
Definition: checkqueue.h:193
CCheckQueue::GUARDED_BY
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition: checkqueue.h:46
sync.h
CCheckQueue::Loop
bool Loop(bool fMaster)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:65
CCheckQueue::StartWorkerThreads
void StartWorkerThreads(const int threads_num)
Create a pool of new worker threads.
Definition: checkqueue.h:147
CCheckQueue::GUARDED_BY
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
CCheckQueueControl::operator=
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
WITH_LOCK
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:274
CCheckQueue::CCheckQueue
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:143
AnnotatedMixin< std::mutex >
tinyformat.h
CCheckQueueControl::Add
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:226
ENTER_CRITICAL_SECTION
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:252
CCheckQueue::m_mutex
Mutex m_mutex
Mutex to protect the inner state.
Definition: checkqueue.h:30
CCheckQueueControl::pqueue
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:202
CCheckQueue::GUARDED_BY
bool fAllOk GUARDED_BY(m_mutex)
The temporary evaluation result.
Definition: checkqueue.h:49
CCheckQueue
Queue for verifications that have to be performed.
Definition: checkqueue.h:27
CCheckQueue::GUARDED_BY
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition: checkqueue.h:43
util::ThreadRename
void ThreadRename(std::string &&)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name.
Definition: threadnames.cpp:48
CCheckQueueControl
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:15
CCheckQueue::GUARDED_BY
bool m_request_stop GUARDED_BY(m_mutex)
Definition: checkqueue.h:62
CCheckQueue::m_master_cv
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition: checkqueue.h:36
CCheckQueueControl::CCheckQueueControl
CCheckQueueControl()=delete
strprintf
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1201
CCheckQueue::m_control_mutex
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:140
CCheckQueueControl::Wait
bool Wait()
Definition: checkqueue.h:217
LOCK
#define LOCK(cs)
Definition: sync.h:241
CCheckQueue::m_worker_threads
std::vector< std::thread > m_worker_threads
Definition: checkqueue.h:61
LEAVE_CRITICAL_SECTION
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:258
CCheckQueueControl::fDone
bool fDone
Definition: checkqueue.h:203
CCheckQueueControl::CCheckQueueControl
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:209
CCheckQueue::Add
void Add(std::vector< T > &vChecks)
Add a batch of checks to the queue.
Definition: checkqueue.h:168
threadnames.h
WAIT_LOCK
#define WAIT_LOCK(cs, name)
Definition: sync.h:249
CCheckQueue::m_worker_cv
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition: checkqueue.h:33
CCheckQueueControl::~CCheckQueueControl
~CCheckQueueControl()
Definition: checkqueue.h:232
CCheckQueue::Wait
bool Wait()
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:165