Bitcoin ABC  0.24.7
P2P Digital Currency
scheduler.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <scheduler.h>
6 
7 #include <random.h>
8 
9 #include <cassert>
10 #include <utility>
11 
13 
15  assert(nThreadsServicingQueue == 0);
16  if (stopWhenEmpty) {
17  assert(taskQueue.empty());
18  }
19 }
20 
22  WAIT_LOCK(newTaskMutex, lock);
23  ++nThreadsServicingQueue;
24 
25  // newTaskMutex is locked throughout this loop EXCEPT when the thread is
26  // waiting or when the user's function is called.
27  while (!shouldStop()) {
28  try {
29  while (!shouldStop() && taskQueue.empty()) {
30  // Wait until there is something to do.
31  newTaskScheduled.wait(lock);
32  }
33 
34  // Wait until either there is a new task, or until
35  // the time of the first item on the queue:
36 
37  while (!shouldStop() && !taskQueue.empty()) {
38  std::chrono::system_clock::time_point timeToWaitFor =
39  taskQueue.begin()->first;
40  if (newTaskScheduled.wait_until(lock, timeToWaitFor) ==
41  std::cv_status::timeout) {
42  // Exit loop after timeout, it means we reached the time of
43  // the event
44  break;
45  }
46  }
47 
48  // If there are multiple threads, the queue can empty while we're
49  // waiting (another thread may service the task we were waiting on).
50  if (shouldStop() || taskQueue.empty()) {
51  continue;
52  }
53 
54  Function f = taskQueue.begin()->second;
55  taskQueue.erase(taskQueue.begin());
56 
57  {
58  // Unlock before calling f, so it can reschedule itself or
59  // another task without deadlocking:
60  REVERSE_LOCK(lock);
61  f();
62  }
63  } catch (...) {
64  --nThreadsServicingQueue;
65  throw;
66  }
67  }
68  --nThreadsServicingQueue;
69  newTaskScheduled.notify_one();
70 }
71 
73  std::chrono::system_clock::time_point t) {
74  {
76  taskQueue.insert(std::make_pair(t, f));
77  }
78  newTaskScheduled.notify_one();
79 }
80 
81 void CScheduler::MockForward(std::chrono::seconds delta_seconds) {
82  assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
83 
84  {
86 
87  // use temp_queue to maintain updated schedule
88  std::multimap<std::chrono::system_clock::time_point, Function>
89  temp_queue;
90 
91  for (const auto &element : taskQueue) {
92  temp_queue.emplace_hint(temp_queue.cend(),
93  element.first - delta_seconds,
94  element.second);
95  }
96 
97  // point taskQueue to temp_queue
98  taskQueue = std::move(temp_queue);
99  }
100 
101  // notify that the taskQueue needs to be processed
102  newTaskScheduled.notify_one();
103 }
104 
106  std::chrono::milliseconds delta) {
107  if (p()) {
108  s.scheduleFromNow([=, &s] { Repeat(s, p, delta); }, delta);
109  }
110 }
111 
113  std::chrono::milliseconds delta) {
114  scheduleFromNow([=] { Repeat(*this, p, delta); }, delta);
115 }
116 
117 size_t
118 CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first,
119  std::chrono::system_clock::time_point &last) const {
121  size_t result = taskQueue.size();
122  if (!taskQueue.empty()) {
123  first = taskQueue.begin()->first;
124  last = taskQueue.rbegin()->first;
125  }
126  return result;
127 }
128 
131  return nThreadsServicingQueue;
132 }
133 
135  {
137  // Try to avoid scheduling too many copies here, but if we
138  // accidentally have two ProcessQueue's scheduled at once its
139  // not a big deal.
140  if (m_are_callbacks_running) {
141  return;
142  }
143  if (m_callbacks_pending.empty()) {
144  return;
145  }
146  }
149  std::chrono::system_clock::now());
150 }
151 
153  std::function<void()> callback;
154  {
156  if (m_are_callbacks_running) {
157  return;
158  }
159  if (m_callbacks_pending.empty()) {
160  return;
161  }
162  m_are_callbacks_running = true;
163 
164  callback = std::move(m_callbacks_pending.front());
165  m_callbacks_pending.pop_front();
166  }
167 
168  // RAII the setting of fCallbacksRunning and calling
169  // MaybeScheduleProcessQueue to ensure both happen safely even if callback()
170  // throws.
171  struct RAIICallbacksRunning {
173  explicit RAIICallbacksRunning(SingleThreadedSchedulerClient *_instance)
174  : instance(_instance) {}
175  ~RAIICallbacksRunning() {
176  {
177  LOCK(instance->m_cs_callbacks_pending);
178  instance->m_are_callbacks_running = false;
179  }
180  instance->MaybeScheduleProcessQueue();
181  }
182  } raiicallbacksrunning(this);
183 
184  callback();
185 }
186 
188  std::function<void()> func) {
189  assert(m_pscheduler);
190 
191  {
193  m_callbacks_pending.emplace_back(std::move(func));
194  }
196 }
197 
200  bool should_continue = true;
201  while (should_continue) {
202  ProcessQueue();
204  should_continue = !m_callbacks_pending.empty();
205  }
206 }
207 
210  return m_callbacks_pending.size();
211 }
CScheduler::newTaskScheduled
std::condition_variable newTaskScheduled
Definition: scheduler.h:103
CScheduler
Simple class for background tasks that should be run periodically or once "after a while".
Definition: scheduler.h:35
CScheduler::MockForward
void MockForward(std::chrono::seconds delta_seconds)
Mock the scheduler to fast forward in time.
Definition: scheduler.cpp:81
SingleThreadedSchedulerClient::m_pscheduler
CScheduler * m_pscheduler
Definition: scheduler.h:126
CScheduler::newTaskMutex
Mutex newTaskMutex
Definition: scheduler.h:102
REVERSE_LOCK
#define REVERSE_LOCK(g)
Definition: sync.h:233
CScheduler::CScheduler
CScheduler()
Definition: scheduler.cpp:12
SingleThreadedSchedulerClient::CallbacksPending
size_t CallbacksPending()
Definition: scheduler.cpp:208
SingleThreadedSchedulerClient::ProcessQueue
void ProcessQueue()
Definition: scheduler.cpp:152
CScheduler::~CScheduler
~CScheduler()
Definition: scheduler.cpp:14
scheduler.h
CScheduler::Predicate
std::function< bool()> Predicate
Definition: scheduler.h:41
CScheduler::serviceQueue
void serviceQueue()
Services the queue 'forever'.
Definition: scheduler.cpp:21
CScheduler::scheduleEvery
void scheduleEvery(Predicate p, std::chrono::milliseconds delta)
Repeat p until it return false.
Definition: scheduler.cpp:112
CScheduler::getQueueInfo
size_t getQueueInfo(std::chrono::system_clock::time_point &first, std::chrono::system_clock::time_point &last) const
Returns number of tasks waiting to be serviced, and first and last task times.
Definition: scheduler.cpp:118
random.h
CScheduler::Function
std::function< void()> Function
Definition: scheduler.h:40
SingleThreadedSchedulerClient::m_cs_callbacks_pending
RecursiveMutex m_cs_callbacks_pending
Definition: scheduler.h:128
Repeat
static void Repeat(CScheduler &s, CScheduler::Predicate p, std::chrono::milliseconds delta)
Definition: scheduler.cpp:105
SingleThreadedSchedulerClient::EmptyQueue
void EmptyQueue()
Processes all remaining queue members on the calling thread, blocking until queue is empty.
Definition: scheduler.cpp:198
SingleThreadedSchedulerClient::AddToProcessQueue
void AddToProcessQueue(std::function< void()> func)
Add a callback to be executed.
Definition: scheduler.cpp:187
LOCK
#define LOCK(cs)
Definition: sync.h:241
CScheduler::shouldStop
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
Definition: scheduler.h:109
SingleThreadedSchedulerClient::MaybeScheduleProcessQueue
void MaybeScheduleProcessQueue()
Definition: scheduler.cpp:134
CScheduler::scheduleFromNow
void scheduleFromNow(Function f, std::chrono::milliseconds delta)
Call f once after the delta has passed.
Definition: scheduler.h:47
SingleThreadedSchedulerClient
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:124
CScheduler::AreThreadsServicingQueue
bool AreThreadsServicingQueue() const
Returns true if there are threads actively running in serviceQueue()
Definition: scheduler.cpp:129
CScheduler::schedule
void schedule(Function f, std::chrono::system_clock::time_point t)
Call func at/after time t.
Definition: scheduler.cpp:72
WAIT_LOCK
#define WAIT_LOCK(cs, name)
Definition: sync.h:249