Bitcoin Core  24.99.0
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2022 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 
7 #include <logging.h>
8 #include <primitives/block.h>
10 #include <util/system.h>
11 #include <validationinterface.h>
13 #include <zmq/zmqpublishnotifier.h>
14 #include <zmq/zmqutil.h>
15 
16 #include <zmq.h>
17 
18 #include <cassert>
19 #include <map>
20 #include <string>
21 #include <utility>
22 #include <vector>
23 
25 {
26 }
27 
29 {
30  Shutdown();
31 }
32 
33 std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
34 {
35  std::list<const CZMQAbstractNotifier*> result;
36  for (const auto& n : notifiers) {
37  result.push_back(n.get());
38  }
39  return result;
40 }
41 
43 {
44  std::map<std::string, CZMQNotifierFactory> factories;
45  factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
46  factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
47  factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
48  factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
49  factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
50 
51  std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
52  for (const auto& entry : factories)
53  {
54  std::string arg("-zmq" + entry.first);
55  const auto& factory = entry.second;
56  for (const std::string& address : gArgs.GetArgs(arg)) {
57  std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
58  notifier->SetType(entry.first);
59  notifier->SetAddress(address);
60  notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
61  notifiers.push_back(std::move(notifier));
62  }
63  }
64 
65  if (!notifiers.empty())
66  {
67  std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
68  notificationInterface->notifiers = std::move(notifiers);
69 
70  if (notificationInterface->Initialize()) {
71  return notificationInterface.release();
72  }
73  }
74 
75  return nullptr;
76 }
77 
78 // Called at startup to conditionally set up ZMQ socket(s)
80 {
81  int major = 0, minor = 0, patch = 0;
82  zmq_version(&major, &minor, &patch);
83  LogPrint(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
84 
85  LogPrint(BCLog::ZMQ, "Initialize notification interface\n");
86  assert(!pcontext);
87 
88  pcontext = zmq_ctx_new();
89 
90  if (!pcontext)
91  {
92  zmqError("Unable to initialize context");
93  return false;
94  }
95 
96  for (auto& notifier : notifiers) {
97  if (notifier->Initialize(pcontext)) {
98  LogPrint(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
99  } else {
100  LogPrint(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
101  return false;
102  }
103  }
104 
105  return true;
106 }
107 
108 // Called during shutdown sequence
110 {
111  LogPrint(BCLog::ZMQ, "Shutdown notification interface\n");
112  if (pcontext)
113  {
114  for (auto& notifier : notifiers) {
115  LogPrint(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
116  notifier->Shutdown();
117  }
118  zmq_ctx_term(pcontext);
119 
120  pcontext = nullptr;
121  }
122 }
123 
124 namespace {
125 
126 template <typename Function>
127 void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
128 {
129  for (auto i = notifiers.begin(); i != notifiers.end(); ) {
130  CZMQAbstractNotifier* notifier = i->get();
131  if (func(notifier)) {
132  ++i;
133  } else {
134  notifier->Shutdown();
135  i = notifiers.erase(i);
136  }
137  }
138 }
139 
140 } // anonymous namespace
141 
142 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
143 {
144  if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
145  return;
146 
147  TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
148  return notifier->NotifyBlock(pindexNew);
149  });
150 }
151 
153 {
154  const CTransaction& tx = *ptx;
155 
156  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
157  return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
158  });
159 }
160 
162 {
163  // Called for all non-block inclusion reasons
164  const CTransaction& tx = *ptx;
165 
166  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
167  return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
168  });
169 }
170 
171 void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
172 {
173  for (const CTransactionRef& ptx : pblock->vtx) {
174  const CTransaction& tx = *ptx;
175  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
176  return notifier->NotifyTransaction(tx);
177  });
178  }
179 
180  // Next we notify BlockConnect listeners for *all* blocks
181  TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
182  return notifier->NotifyBlockConnect(pindexConnected);
183  });
184 }
185 
186 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
187 {
188  for (const CTransactionRef& ptx : pblock->vtx) {
189  const CTransaction& tx = *ptx;
190  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
191  return notifier->NotifyTransaction(tx);
192  });
193  }
194 
195  // Next we notify BlockDisconnect listeners for *all* blocks
196  TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
197  return notifier->NotifyBlockDisconnect(pindexDisconnected);
198  });
199 }
200 
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: system.cpp:478
int64_t GetIntArg(const std::string &strArg, int64_t nDefault) const
Return integer argument or default value.
Definition: system.cpp:616
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: chain.h:151
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:295
virtual void Shutdown()=0
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
static const int DEFAULT_ZMQ_SNDHWM
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
virtual bool NotifyBlock(const CBlockIndex *pindex)
virtual bool NotifyTransaction(const CTransaction &transaction)
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
void BlockConnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
static CZMQNotificationInterface * Create()
void TransactionAddedToMempool(const CTransactionRef &tx, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected.
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
#define LogPrint(category,...)
Definition: logging.h:245
@ ZMQ
Definition: logging.h:45
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:421
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal.
Definition: txmempool.h:231
ArgsManager gArgs
Definition: system.cpp:73
assert(!tx.IsCoinBase())
CZMQNotificationInterface * g_zmq_notification_interface
void zmqError(const std::string &str)
Definition: zmqutil.cpp:13