Bitcoin ABC  0.24.7
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2018 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
7 #include <zmq/zmqutil.h>
8 
9 #include <zmq.h>
10 
11 #include <primitives/block.h>
12 #include <util/system.h>
13 
15 
17  Shutdown();
18 }
19 
20 std::list<const CZMQAbstractNotifier *>
22  std::list<const CZMQAbstractNotifier *> result;
23  for (const auto &n : notifiers) {
24  result.push_back(n.get());
25  }
26  return result;
27 }
28 
30  std::map<std::string, CZMQNotifierFactory> factories;
31  factories["pubhashblock"] =
32  CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
33  factories["pubhashtx"] =
34  CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
35  factories["pubrawblock"] =
36  CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
37  factories["pubrawtx"] =
38  CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
39  factories["pubsequence"] =
40  CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
41 
42  std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
43  for (const auto &entry : factories) {
44  std::string arg("-zmq" + entry.first);
45  const auto &factory = entry.second;
46  for (const std::string &address : gArgs.GetArgs(arg)) {
47  std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
48  notifier->SetType(entry.first);
49  notifier->SetAddress(address);
50  notifier->SetOutboundMessageHighWaterMark(
51  static_cast<int>(gArgs.GetArg(
53  notifiers.push_back(std::move(notifier));
54  }
55  }
56 
57  if (!notifiers.empty()) {
58  std::unique_ptr<CZMQNotificationInterface> notificationInterface(
60  notificationInterface->notifiers = std::move(notifiers);
61 
62  if (notificationInterface->Initialize()) {
63  return notificationInterface.release();
64  }
65  }
66 
67  return nullptr;
68 }
69 
70 // Called at startup to conditionally set up ZMQ socket(s)
72  int major = 0, minor = 0, patch = 0;
73  zmq_version(&major, &minor, &patch);
74  LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch);
75 
76  LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
77  assert(!pcontext);
78 
79  pcontext = zmq_ctx_new();
80 
81  if (!pcontext) {
82  zmqError("Unable to initialize context");
83  return false;
84  }
85 
86  for (auto &notifier : notifiers) {
87  if (notifier->Initialize(pcontext)) {
88  LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n",
89  notifier->GetType(), notifier->GetAddress());
90  } else {
91  LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n",
92  notifier->GetType(), notifier->GetAddress());
93  return false;
94  }
95  }
96 
97  return true;
98 }
99 
100 // Called during shutdown sequence
102  LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
103  if (pcontext) {
104  for (auto &notifier : notifiers) {
105  LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n",
106  notifier->GetType(), notifier->GetAddress());
107  notifier->Shutdown();
108  }
109  zmq_ctx_term(pcontext);
110 
111  pcontext = nullptr;
112  }
113 }
114 
115 namespace {
116 
117 template <typename Function>
118 void TryForEachAndRemoveFailed(
119  std::list<std::unique_ptr<CZMQAbstractNotifier>> &notifiers,
120  const Function &func) {
121  for (auto i = notifiers.begin(); i != notifiers.end();) {
122  CZMQAbstractNotifier *notifier = i->get();
123  if (func(notifier)) {
124  ++i;
125  } else {
126  notifier->Shutdown();
127  i = notifiers.erase(i);
128  }
129  }
130 }
131 
132 } // anonymous namespace
133 
135  const CBlockIndex *pindexFork,
136  bool fInitialDownload) {
137  // In IBD or blocks were disconnected without any new ones
138  if (fInitialDownload || pindexNew == pindexFork) {
139  return;
140  }
141 
142  TryForEachAndRemoveFailed(notifiers,
143  [pindexNew](CZMQAbstractNotifier *notifier) {
144  return notifier->NotifyBlock(pindexNew);
145  });
146 }
147 
149  const CTransactionRef &ptx, uint64_t mempool_sequence) {
150  const CTransaction &tx = *ptx;
151 
152  TryForEachAndRemoveFailed(
153  notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier *notifier) {
154  return notifier->NotifyTransaction(tx) &&
155  notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
156  });
157 }
158 
160  const CTransactionRef &ptx, MemPoolRemovalReason reason,
161  uint64_t mempool_sequence) {
162  // Called for all non-block inclusion reasons
163  const CTransaction &tx = *ptx;
164 
165  TryForEachAndRemoveFailed(
166  notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier *notifier) {
167  return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
168  });
169 }
170 
172  const std::shared_ptr<const CBlock> &pblock,
173  const CBlockIndex *pindexConnected) {
174  for (const CTransactionRef &ptx : pblock->vtx) {
175  const CTransaction &tx = *ptx;
176  TryForEachAndRemoveFailed(notifiers,
177  [&tx](CZMQAbstractNotifier *notifier) {
178  return notifier->NotifyTransaction(tx);
179  });
180  }
181 
182  // Next we notify BlockConnect listeners for *all* blocks
183  TryForEachAndRemoveFailed(
184  notifiers, [pindexConnected](CZMQAbstractNotifier *notifier) {
185  return notifier->NotifyBlockConnect(pindexConnected);
186  });
187 }
188 
190  const std::shared_ptr<const CBlock> &pblock,
191  const CBlockIndex *pindexDisconnected) {
192  for (const CTransactionRef &ptx : pblock->vtx) {
193  const CTransaction &tx = *ptx;
194  TryForEachAndRemoveFailed(notifiers,
195  [&tx](CZMQAbstractNotifier *notifier) {
196  return notifier->NotifyTransaction(tx);
197  });
198  }
199 
200  // Next we notify BlockDisconnect listeners for *all* blocks
201  TryForEachAndRemoveFailed(
202  notifiers, [pindexDisconnected](CZMQAbstractNotifier *notifier) {
203  return notifier->NotifyBlockDisconnect(pindexDisconnected);
204  });
205 }
206 
block.h
CZMQNotificationInterface::CZMQNotificationInterface
CZMQNotificationInterface()
Definition: zmqnotificationinterface.cpp:14
BCLog::ZMQ
@ ZMQ
Definition: logging.h:43
CZMQNotificationInterface::pcontext
void * pcontext
Definition: zmqnotificationinterface.h:45
CZMQNotificationInterface::Create
static CZMQNotificationInterface * Create()
Definition: zmqnotificationinterface.cpp:29
CZMQNotificationInterface::BlockConnected
void BlockConnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
Definition: zmqnotificationinterface.cpp:171
CZMQAbstractNotifier::NotifyBlockDisconnect
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
Definition: zmqabstractnotifier.cpp:29
zmqError
void zmqError(const char *str)
Definition: zmqutil.cpp:11
CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM
static const int DEFAULT_ZMQ_SNDHWM
Definition: zmqabstractnotifier.h:19
CZMQAbstractNotifier::NotifyTransactionAcceptance
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
Definition: zmqabstractnotifier.cpp:34
CTransaction
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:194
CZMQAbstractNotifier
Definition: zmqabstractnotifier.h:17
zmqutil.h
CZMQAbstractNotifier::Shutdown
virtual void Shutdown()=0
CZMQNotificationInterface::Shutdown
void Shutdown()
Definition: zmqnotificationinterface.cpp:101
CZMQNotificationInterface::BlockDisconnected
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected.
Definition: zmqnotificationinterface.cpp:189
CZMQNotificationInterface::TransactionRemovedFromMempool
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
Definition: zmqnotificationinterface.cpp:159
g_zmq_notification_interface
CZMQNotificationInterface * g_zmq_notification_interface
Definition: zmqnotificationinterface.cpp:207
ArgsManager::GetArg
std::string GetArg(const std::string &strArg, const std::string &strDefault) const
Return string argument or default value.
Definition: system.cpp:498
CZMQNotificationInterface::TransactionAddedToMempool
void TransactionAddedToMempool(const CTransactionRef &tx, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
Definition: zmqnotificationinterface.cpp:148
LogPrint
#define LogPrint(category,...)
Definition: logging.h:193
zmqnotificationinterface.h
zmqpublishnotifier.h
CZMQNotificationInterface::GetActiveNotifiers
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
Definition: zmqnotificationinterface.cpp:21
CZMQAbstractNotifier::NotifyTransactionRemoval
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
Definition: zmqabstractnotifier.cpp:39
system.h
CZMQAbstractNotifier::NotifyTransaction
virtual bool NotifyTransaction(const CTransaction &transaction)
Definition: zmqabstractnotifier.cpp:19
MemPoolRemovalReason
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal.
Definition: txmempool.h:400
gArgs
ArgsManager gArgs
Definition: system.cpp:75
CZMQNotificationInterface::UpdatedBlockTip
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
Definition: zmqnotificationinterface.cpp:134
CTransactionRef
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:319
CZMQNotificationInterface::notifiers
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
Definition: zmqnotificationinterface.h:46
CZMQAbstractNotifier::NotifyBlockConnect
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
Definition: zmqabstractnotifier.cpp:24
CZMQAbstractNotifier::NotifyBlock
virtual bool NotifyBlock(const CBlockIndex *pindex)
Definition: zmqabstractnotifier.cpp:15
ArgsManager::GetArgs
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: system.cpp:391
CBlockIndex
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: blockindex.h:23
CZMQNotificationInterface
Definition: zmqnotificationinterface.h:16
CZMQNotificationInterface::Initialize
bool Initialize()
Definition: zmqnotificationinterface.cpp:71
CZMQNotificationInterface::~CZMQNotificationInterface
virtual ~CZMQNotificationInterface()
Definition: zmqnotificationinterface.cpp:16