Bitcoin ABC  0.26.3
P2P Digital Currency
rcu.cpp
Go to the documentation of this file.
1 // Copyright (c) 2018-2019 The Bitcoin developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <rcu.h>
6 
7 #include <sync.h>
8 
9 #include <algorithm>
10 #include <chrono>
11 #include <condition_variable>
12 
13 std::atomic<uint64_t> RCUInfos::revision{0};
14 thread_local RCUInfos RCUInfos::infos{};
15 
19 static constexpr int RCU_ACTIVE_LOOP_COUNT = 10;
20 
104 static std::atomic<RCUInfos *> threadInfos{nullptr};
106 
107 RCUInfos::RCUInfos() : state(0), next(nullptr) {
108  RCUInfos *head = threadInfos.load();
109  do {
110  next.store(head);
111  } while (!threadInfos.compare_exchange_weak(head, this));
112 
113  // Release the lock.
114  readFree();
115 }
116 
122  runCleanups();
123  while (cleanups.size() > 0) {
124  synchronize();
125  }
126 
127  while (true) {
129 
130  std::atomic<RCUInfos *> *ptr;
131 
132  {
133  RCULock lock(this);
134  ptr = &threadInfos;
135  while (true) {
136  RCUInfos *current = ptr->load();
137  if (current == this) {
138  break;
139  }
140 
141  assert(current != nullptr);
142  ptr = &current->next;
143  }
144  }
145 
153  RCUInfos *current = this;
154  if (!ptr->compare_exchange_strong(current, next.load())) {
155  continue;
156  }
157 
164  synchronize();
165  break;
166  }
167 }
168 
170  uint64_t syncRev = ++revision;
171 
172  // Loop a few time lock free.
173  for (int i = 0; i < RCU_ACTIVE_LOOP_COUNT; i++) {
174  runCleanups();
175  if (cleanups.empty() && hasSyncedTo(syncRev)) {
176  return;
177  }
178  }
179 
180  // It seems like we have some contention. Let's try to not starve the
181  // system. Let's make sure threads that land here proceed one by one.
182  // XXX: The best option long term is most likely to use a futex on one of
183  // the thread causing synchronization delay so this thread can be waked up
184  // at an appropriate time.
185  static std::condition_variable cond;
186  static Mutex cs;
187  WAIT_LOCK(cs, lock);
188 
189  do {
190  runCleanups();
191  cond.notify_one();
192  } while (!cond.wait_for(lock, std::chrono::microseconds(1), [&] {
193  return cleanups.empty() && hasSyncedTo(syncRev);
194  }));
195 }
196 
199 
200 public:
201  explicit RCUCleanupGuard(RCUInfos *infosIn) : infos(infosIn) {
202  infos->isCleaningUp = true;
203  }
204 
206 };
207 
209  if (isCleaningUp || cleanups.empty()) {
210  // We don't want to run cleanups within cleanups.
211  return;
212  }
213 
214  RCUCleanupGuard guard(this);
215 
216  // By the time we run a set of cleanups, we may have more cleanups
217  // available so we loop until there is nothing available for cleanup.
218  while (!cleanups.empty()) {
219  auto it = cleanups.begin();
220  uint64_t syncedTo = hasSyncedTo(it->first);
221  if (it->first > syncedTo) {
222  // We have nothing more ready to be cleaned up.
223  return;
224  }
225 
226  while (it != cleanups.end() && it->first <= syncedTo) {
227  // Run the cleanup and remove it from the map.
228  auto fun = std::move(it->second);
229  cleanups.erase(it++);
230  fun();
231  }
232  }
233 }
234 
235 uint64_t RCUInfos::hasSyncedTo(uint64_t cutoff) {
236  uint64_t syncedTo = revision.load();
237 
238  // Go over the list and check all threads are past the synchronization
239  // point.
240  RCULock lock(this);
241  RCUInfos *current = threadInfos.load();
242  while (current != nullptr) {
243  syncedTo = std::min(syncedTo, current->state.load());
244  if (syncedTo < cutoff) {
245  return 0;
246  }
247 
248  current = current->next.load();
249  }
250 
251  return syncedTo;
252 }
RCUCleanupGuard(RCUInfos *infosIn)
Definition: rcu.cpp:201
Definition: rcu.h:20
uint64_t hasSyncedTo(uint64_t cutoff=UNLOCKED)
Definition: rcu.cpp:235
std::atomic< RCUInfos * > next
Definition: rcu.h:22
void runCleanups()
Definition: rcu.cpp:208
~RCUInfos()
Definition: rcu.cpp:117
static thread_local RCUInfos infos
Definition: rcu.h:58
std::map< uint64_t, std::function< void()> > cleanups
Definition: rcu.h:25
std::atomic< uint64_t > state
Definition: rcu.h:21
void readFree()
Definition: rcu.h:40
static std::atomic< uint64_t > revision
Definition: rcu.h:57
void synchronize()
Definition: rcu.cpp:169
bool isCleaningUp
Definition: rcu.h:24
RCUInfos()
Definition: rcu.cpp:107
Definition: rcu.h:61
static void pool cs
static std::atomic< RCUInfos * > threadInfos
We maintain a linked list of all the RCUInfos for each active thread.
Definition: rcu.cpp:104
static RecursiveMutex csThreadInfosDelete
Definition: rcu.cpp:105
static constexpr int RCU_ACTIVE_LOOP_COUNT
How many time a busy loop runs before yelding.
Definition: rcu.cpp:19
#define WAIT_LOCK(cs, name)
Definition: sync.h:317
#define LOCK(cs)
Definition: sync.h:306
assert(!tx.IsCoinBase())