Bitcoin ABC  0.26.3
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2018 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_CHECKQUEUE_H
6 #define BITCOIN_CHECKQUEUE_H
7 
8 #include <sync.h>
9 #include <tinyformat.h>
10 #include <util/threadnames.h>
11 
12 #include <algorithm>
13 #include <iterator>
14 #include <vector>
15 
16 template <typename T> class CCheckQueueControl;
17 
28 template <typename T> class CCheckQueue {
29 private:
32 
34  std::condition_variable m_worker_cv;
35 
37  std::condition_variable m_master_cv;
38 
41  std::vector<T> queue GUARDED_BY(m_mutex);
42 
44  int nIdle GUARDED_BY(m_mutex){0};
45 
47  int nTotal GUARDED_BY(m_mutex){0};
48 
50  bool fAllOk GUARDED_BY(m_mutex){true};
51 
57  unsigned int nTodo GUARDED_BY(m_mutex){0};
58 
60  const unsigned int nBatchSize;
61 
62  std::vector<std::thread> m_worker_threads;
63  bool m_request_stop GUARDED_BY(m_mutex){false};
64 
66  bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) {
67  std::condition_variable &cond = fMaster ? m_master_cv : m_worker_cv;
68  std::vector<T> vChecks;
69  vChecks.reserve(nBatchSize);
70  unsigned int nNow = 0;
71  bool fOk = true;
72  do {
73  {
74  WAIT_LOCK(m_mutex, lock);
75  // first do the clean-up of the previous loop run (allowing us
76  // to do it in the same critsect)
77  if (nNow) {
78  fAllOk &= fOk;
79  nTodo -= nNow;
80  if (nTodo == 0 && !fMaster) {
81  // We processed the last element; inform the master it
82  // can exit and return the result
83  m_master_cv.notify_one();
84  }
85  } else {
86  // first iteration
87  nTotal++;
88  }
89  // logically, the do loop starts here
90  while (queue.empty() && !m_request_stop) {
91  if (fMaster && nTodo == 0) {
92  nTotal--;
93  bool fRet = fAllOk;
94  // reset the status for new work later
95  fAllOk = true;
96  // return the current status
97  return fRet;
98  }
99  nIdle++;
100  cond.wait(lock); // wait
101  nIdle--;
102  }
103  if (m_request_stop) {
104  return false;
105  }
106 
107  // Decide how many work units to process now.
108  // * Do not try to do everything at once, but aim for
109  // increasingly smaller batches so all workers finish
110  // approximately simultaneously.
111  // * Try to account for idle jobs which will instantly start
112  // helping.
113  // * Don't do batches smaller than 1 (duh), or larger than
114  // nBatchSize.
115  nNow = std::max(
116  1U, std::min(nBatchSize, (unsigned int)queue.size() /
117  (nTotal + nIdle + 1)));
118  auto start_it = queue.end() - nNow;
119  vChecks.assign(std::make_move_iterator(start_it),
120  std::make_move_iterator(queue.end()));
121  queue.erase(start_it, queue.end());
122  // Check whether we need to do work at all
123  fOk = fAllOk;
124  }
125  // execute work
126  for (T &check : vChecks) {
127  if (fOk) {
128  fOk = check();
129  }
130  }
131  vChecks.clear();
132  } while (true);
133  }
134 
135 public:
138 
140  explicit CCheckQueue(unsigned int nBatchSizeIn)
141  : nBatchSize(nBatchSizeIn) {}
142 
144  void StartWorkerThreads(const int threads_num)
146  {
147  LOCK(m_mutex);
148  nIdle = 0;
149  nTotal = 0;
150  fAllOk = true;
151  }
152  assert(m_worker_threads.empty());
153  for (int n = 0; n < threads_num; ++n) {
154  m_worker_threads.emplace_back([this, n]() {
155  util::ThreadRename(strprintf("scriptch.%i", n));
156  Loop(false /* worker thread */);
157  });
158  }
159  }
160 
164  return Loop(true /* master thread */);
165  }
166 
168  void Add(std::vector<T> &&vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) {
169  LOCK(m_mutex);
170  queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()),
171  std::make_move_iterator(vChecks.end()));
172  nTodo += vChecks.size();
173  if (vChecks.size() == 1) {
174  m_worker_cv.notify_one();
175  } else if (vChecks.size() > 1) {
176  m_worker_cv.notify_all();
177  }
178  }
179 
182  WITH_LOCK(m_mutex, m_request_stop = true);
183  m_worker_cv.notify_all();
184  for (std::thread &t : m_worker_threads) {
185  t.join();
186  }
187  m_worker_threads.clear();
188  WITH_LOCK(m_mutex, m_request_stop = false);
189  }
190 
192 };
193 
198 template <typename T> class CCheckQueueControl {
199 private:
201  bool fDone;
202 
203 public:
204  CCheckQueueControl() = delete;
207  explicit CCheckQueueControl(CCheckQueue<T> *const pqueueIn)
208  : pqueue(pqueueIn), fDone(false) {
209  // passed queue is supposed to be unused, or nullptr
210  if (pqueue != nullptr) {
211  ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
212  }
213  }
214 
215  bool Wait() {
216  if (pqueue == nullptr) {
217  return true;
218  }
219  bool fRet = pqueue->Wait();
220  fDone = true;
221  return fRet;
222  }
223 
224  void Add(std::vector<T> &&vChecks) {
225  if (pqueue != nullptr) {
226  pqueue->Add(std::move(vChecks));
227  }
228  }
229 
231  if (!fDone) {
232  Wait();
233  }
234  if (pqueue != nullptr) {
235  LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
236  }
237  }
238 };
239 
240 #endif // BITCOIN_CHECKQUEUE_H
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:198
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:200
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:207
CCheckQueueControl()=delete
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
void Add(std::vector< T > &&vChecks)
Definition: checkqueue.h:224
CCheckQueueControl(const CCheckQueueControl &)=delete
Queue for verifications that have to be performed.
Definition: checkqueue.h:28
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:57
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition: checkqueue.h:37
bool fAllOk GUARDED_BY(m_mutex)
The temporary evaluation result.
Definition: checkqueue.h:50
bool m_request_stop GUARDED_BY(m_mutex)
Definition: checkqueue.h:63
bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:66
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition: checkqueue.h:47
std::vector< std::thread > m_worker_threads
Definition: checkqueue.h:62
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:137
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition: checkqueue.h:34
Mutex m_mutex
Mutex to protect the inner state.
Definition: checkqueue.h:31
bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:163
void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Create a pool of new worker threads.
Definition: checkqueue.h:144
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:140
void Add(std::vector< T > &&vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Add a batch of checks to the queue.
Definition: checkqueue.h:168
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition: checkqueue.h:44
unsigned int nTodo GUARDED_BY(m_mutex)
Number of verifications that haven't completed yet.
Definition: checkqueue.h:57
void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all of the worker threads.
Definition: checkqueue.h:181
void ThreadRename(std::string &&)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name.
Definition: threadnames.cpp:48
#define WAIT_LOCK(cs, name)
Definition: sync.h:317
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:320
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:326
#define LOCK(cs)
Definition: sync.h:306
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:357
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:56
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1202
assert(!tx.IsCoinBase())