Bitcoin Core  27.99.0
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2022 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 
7 #include <common/args.h>
8 #include <kernel/chain.h>
9 #include <kernel/mempool_entry.h>
10 #include <logging.h>
11 #include <netbase.h>
12 #include <primitives/block.h>
13 #include <primitives/transaction.h>
14 #include <validationinterface.h>
16 #include <zmq/zmqpublishnotifier.h>
17 #include <zmq/zmqutil.h>
18 
19 #include <zmq.h>
20 
21 #include <cassert>
22 #include <map>
23 #include <string>
24 #include <utility>
25 #include <vector>
26 
28 {
29 }
30 
32 {
33  Shutdown();
34 }
35 
36 std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
37 {
38  std::list<const CZMQAbstractNotifier*> result;
39  for (const auto& n : notifiers) {
40  result.push_back(n.get());
41  }
42  return result;
43 }
44 
45 std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<uint8_t>&, const CBlockIndex&)> get_block_by_index)
46 {
47  std::map<std::string, CZMQNotifierFactory> factories;
48  factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
49  factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
50  factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
51  return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
52  };
53  factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
54  factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
55 
56  std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
57  for (const auto& entry : factories)
58  {
59  std::string arg("-zmq" + entry.first);
60  const auto& factory = entry.second;
61  for (std::string& address : gArgs.GetArgs(arg)) {
62  // libzmq uses prefix "ipc://" for UNIX domain sockets
63  if (address.substr(0, ADDR_PREFIX_UNIX.length()) == ADDR_PREFIX_UNIX) {
64  address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
65  }
66 
67  std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
68  notifier->SetType(entry.first);
69  notifier->SetAddress(address);
70  notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
71  notifiers.push_back(std::move(notifier));
72  }
73  }
74 
75  if (!notifiers.empty())
76  {
77  std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
78  notificationInterface->notifiers = std::move(notifiers);
79 
80  if (notificationInterface->Initialize()) {
81  return notificationInterface;
82  }
83  }
84 
85  return nullptr;
86 }
87 
88 // Called at startup to conditionally set up ZMQ socket(s)
90 {
91  int major = 0, minor = 0, patch = 0;
92  zmq_version(&major, &minor, &patch);
93  LogPrint(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
94 
95  LogPrint(BCLog::ZMQ, "Initialize notification interface\n");
96  assert(!pcontext);
97 
98  pcontext = zmq_ctx_new();
99 
100  if (!pcontext)
101  {
102  zmqError("Unable to initialize context");
103  return false;
104  }
105 
106  for (auto& notifier : notifiers) {
107  if (notifier->Initialize(pcontext)) {
108  LogPrint(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
109  } else {
110  LogPrint(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
111  return false;
112  }
113  }
114 
115  return true;
116 }
117 
118 // Called during shutdown sequence
120 {
121  LogPrint(BCLog::ZMQ, "Shutdown notification interface\n");
122  if (pcontext)
123  {
124  for (auto& notifier : notifiers) {
125  LogPrint(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
126  notifier->Shutdown();
127  }
128  zmq_ctx_term(pcontext);
129 
130  pcontext = nullptr;
131  }
132 }
133 
134 namespace {
135 
136 template <typename Function>
137 void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
138 {
139  for (auto i = notifiers.begin(); i != notifiers.end(); ) {
140  CZMQAbstractNotifier* notifier = i->get();
141  if (func(notifier)) {
142  ++i;
143  } else {
144  notifier->Shutdown();
145  i = notifiers.erase(i);
146  }
147  }
148 }
149 
150 } // anonymous namespace
151 
152 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
153 {
154  if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
155  return;
156 
157  TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
158  return notifier->NotifyBlock(pindexNew);
159  });
160 }
161 
163 {
164  const CTransaction& tx = *(ptx.info.m_tx);
165 
166  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
167  return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
168  });
169 }
170 
172 {
173  // Called for all non-block inclusion reasons
174  const CTransaction& tx = *ptx;
175 
176  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
177  return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
178  });
179 }
180 
181 void CZMQNotificationInterface::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
182 {
183  if (role == ChainstateRole::BACKGROUND) {
184  return;
185  }
186  for (const CTransactionRef& ptx : pblock->vtx) {
187  const CTransaction& tx = *ptx;
188  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
189  return notifier->NotifyTransaction(tx);
190  });
191  }
192 
193  // Next we notify BlockConnect listeners for *all* blocks
194  TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
195  return notifier->NotifyBlockConnect(pindexConnected);
196  });
197 }
198 
199 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
200 {
201  for (const CTransactionRef& ptx : pblock->vtx) {
202  const CTransaction& tx = *ptx;
203  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
204  return notifier->NotifyTransaction(tx);
205  });
206  }
207 
208  // Next we notify BlockDisconnect listeners for *all* blocks
209  TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
210  return notifier->NotifyBlockDisconnect(pindexDisconnected);
211  });
212 }
213 
214 std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
ArgsManager gArgs
Definition: args.cpp:41
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: args.cpp:360
int64_t GetIntArg(const std::string &strArg, int64_t nDefault) const
Return integer argument or default value.
Definition: args.cpp:480
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: chain.h:141
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:296
virtual void Shutdown()=0
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
static const int DEFAULT_ZMQ_SNDHWM
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
virtual bool NotifyBlock(const CBlockIndex *pindex)
virtual bool NotifyTransaction(const CTransaction &transaction)
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
static std::unique_ptr< CZMQNotificationInterface > Create(std::function< bool(std::vector< uint8_t > &, const CBlockIndex &)> get_block_by_index)
void TransactionAddedToMempool(const NewMempoolTransactionInfo &tx, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected Provides the block that was disconnected.
void BlockConnected(ChainstateRole role, const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
ChainstateRole
This enum describes the various roles a specific Chainstate instance can take.
Definition: chain.h:25
#define LogPrint(category,...)
Definition: logging.h:263
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal.
@ ZMQ
Definition: logging.h:46
const std::string ADDR_PREFIX_UNIX
Prefix for unix domain socket addresses (which are local filesystem paths)
Definition: netbase.h:31
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:423
const CTransactionRef m_tx
assert(!tx.IsCoinBase())
std::unique_ptr< CZMQNotificationInterface > g_zmq_notification_interface
void zmqError(const std::string &str)
Definition: zmqutil.cpp:13
const std::string ADDR_PREFIX_IPC
Prefix for unix domain socket addresses (which are local filesystem paths)
Definition: zmqutil.h:13