Bitcoin ABC  0.26.3
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2018 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_CHECKQUEUE_H
6 #define BITCOIN_CHECKQUEUE_H
7 
8 #include <sync.h>
9 #include <tinyformat.h>
10 #include <util/threadnames.h>
11 
12 #include <algorithm>
13 #include <vector>
14 
15 template <typename T> class CCheckQueueControl;
16 
27 template <typename T> class CCheckQueue {
28 private:
31 
33  std::condition_variable m_worker_cv;
34 
36  std::condition_variable m_master_cv;
37 
40  std::vector<T> queue GUARDED_BY(m_mutex);
41 
43  int nIdle GUARDED_BY(m_mutex){0};
44 
46  int nTotal GUARDED_BY(m_mutex){0};
47 
49  bool fAllOk GUARDED_BY(m_mutex){true};
50 
56  unsigned int nTodo GUARDED_BY(m_mutex){0};
57 
59  const unsigned int nBatchSize;
60 
61  std::vector<std::thread> m_worker_threads;
62  bool m_request_stop GUARDED_BY(m_mutex){false};
63 
65  bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) {
66  std::condition_variable &cond = fMaster ? m_master_cv : m_worker_cv;
67  std::vector<T> vChecks;
68  vChecks.reserve(nBatchSize);
69  unsigned int nNow = 0;
70  bool fOk = true;
71  do {
72  {
73  WAIT_LOCK(m_mutex, lock);
74  // first do the clean-up of the previous loop run (allowing us
75  // to do it in the same critsect)
76  if (nNow) {
77  fAllOk &= fOk;
78  nTodo -= nNow;
79  if (nTodo == 0 && !fMaster) {
80  // We processed the last element; inform the master it
81  // can exit and return the result
82  m_master_cv.notify_one();
83  }
84  } else {
85  // first iteration
86  nTotal++;
87  }
88  // logically, the do loop starts here
89  while (queue.empty() && !m_request_stop) {
90  if (fMaster && nTodo == 0) {
91  nTotal--;
92  bool fRet = fAllOk;
93  // reset the status for new work later
94  fAllOk = true;
95  // return the current status
96  return fRet;
97  }
98  nIdle++;
99  cond.wait(lock); // wait
100  nIdle--;
101  }
102  if (m_request_stop) {
103  return false;
104  }
105 
106  // Decide how many work units to process now.
107  // * Do not try to do everything at once, but aim for
108  // increasingly smaller batches so all workers finish
109  // approximately simultaneously.
110  // * Try to account for idle jobs which will instantly start
111  // helping.
112  // * Don't do batches smaller than 1 (duh), or larger than
113  // nBatchSize.
114  nNow = std::max(
115  1U, std::min(nBatchSize, (unsigned int)queue.size() /
116  (nTotal + nIdle + 1)));
117  vChecks.resize(nNow);
118  for (unsigned int i = 0; i < nNow; i++) {
119  // We want the lock on the m_mutex to be as short as
120  // possible, so swap jobs from the global queue to the local
121  // batch vector instead of copying.
122  vChecks[i].swap(queue.back());
123  queue.pop_back();
124  }
125  // Check whether we need to do work at all
126  fOk = fAllOk;
127  }
128  // execute work
129  for (T &check : vChecks) {
130  if (fOk) {
131  fOk = check();
132  }
133  }
134  vChecks.clear();
135  } while (true);
136  }
137 
138 public:
141 
143  explicit CCheckQueue(unsigned int nBatchSizeIn)
144  : nBatchSize(nBatchSizeIn) {}
145 
147  void StartWorkerThreads(const int threads_num)
149  {
150  LOCK(m_mutex);
151  nIdle = 0;
152  nTotal = 0;
153  fAllOk = true;
154  }
155  assert(m_worker_threads.empty());
156  for (int n = 0; n < threads_num; ++n) {
157  m_worker_threads.emplace_back([this, n]() {
158  util::ThreadRename(strprintf("scriptch.%i", n));
159  Loop(false /* worker thread */);
160  });
161  }
162  }
163 
167  return Loop(true /* master thread */);
168  }
169 
171  void Add(std::vector<T> &vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) {
172  LOCK(m_mutex);
173  for (T &check : vChecks) {
174  queue.push_back(T());
175  check.swap(queue.back());
176  }
177  nTodo += vChecks.size();
178  if (vChecks.size() == 1) {
179  m_worker_cv.notify_one();
180  } else if (vChecks.size() > 1) {
181  m_worker_cv.notify_all();
182  }
183  }
184 
187  WITH_LOCK(m_mutex, m_request_stop = true);
188  m_worker_cv.notify_all();
189  for (std::thread &t : m_worker_threads) {
190  t.join();
191  }
192  m_worker_threads.clear();
193  WITH_LOCK(m_mutex, m_request_stop = false);
194  }
195 
197 };
198 
203 template <typename T> class CCheckQueueControl {
204 private:
206  bool fDone;
207 
208 public:
209  CCheckQueueControl() = delete;
212  explicit CCheckQueueControl(CCheckQueue<T> *const pqueueIn)
213  : pqueue(pqueueIn), fDone(false) {
214  // passed queue is supposed to be unused, or nullptr
215  if (pqueue != nullptr) {
216  ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
217  }
218  }
219 
220  bool Wait() {
221  if (pqueue == nullptr) {
222  return true;
223  }
224  bool fRet = pqueue->Wait();
225  fDone = true;
226  return fRet;
227  }
228 
229  void Add(std::vector<T> &vChecks) {
230  if (pqueue != nullptr) {
231  pqueue->Add(vChecks);
232  }
233  }
234 
236  if (!fDone) {
237  Wait();
238  }
239  if (pqueue != nullptr) {
240  LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
241  }
242  }
243 };
244 
245 #endif // BITCOIN_CHECKQUEUE_H
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:203
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:205
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:212
CCheckQueueControl()=delete
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:229
CCheckQueueControl(const CCheckQueueControl &)=delete
Queue for verifications that have to be performed.
Definition: checkqueue.h:27
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:56
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition: checkqueue.h:36
bool fAllOk GUARDED_BY(m_mutex)
The temporary evaluation result.
Definition: checkqueue.h:49
bool m_request_stop GUARDED_BY(m_mutex)
Definition: checkqueue.h:62
bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:65
void Add(std::vector< T > &vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Add a batch of checks to the queue.
Definition: checkqueue.h:171
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition: checkqueue.h:46
std::vector< std::thread > m_worker_threads
Definition: checkqueue.h:61
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:140
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition: checkqueue.h:33
Mutex m_mutex
Mutex to protect the inner state.
Definition: checkqueue.h:30
bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:166
void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Create a pool of new worker threads.
Definition: checkqueue.h:147
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:143
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition: checkqueue.h:43
unsigned int nTodo GUARDED_BY(m_mutex)
Number of verifications that haven't completed yet.
Definition: checkqueue.h:56
void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all of the worker threads.
Definition: checkqueue.h:186
void ThreadRename(std::string &&)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name.
Definition: threadnames.cpp:48
#define WAIT_LOCK(cs, name)
Definition: sync.h:317
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:320
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:326
#define LOCK(cs)
Definition: sync.h:306
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:357
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:56
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1202
assert(!tx.IsCoinBase())