Bitcoin Core  27.99.0
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2022 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 
7 #include <common/args.h>
8 #include <kernel/chain.h>
9 #include <kernel/mempool_entry.h>
10 #include <logging.h>
11 #include <netbase.h>
12 #include <primitives/block.h>
13 #include <primitives/transaction.h>
14 #include <validationinterface.h>
16 #include <zmq/zmqpublishnotifier.h>
17 #include <zmq/zmqutil.h>
18 
19 #include <zmq.h>
20 
21 #include <cassert>
22 #include <map>
23 #include <string>
24 #include <utility>
25 #include <vector>
26 
28 
30 {
31  Shutdown();
32 }
33 
34 std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
35 {
36  std::list<const CZMQAbstractNotifier*> result;
37  for (const auto& n : notifiers) {
38  result.push_back(n.get());
39  }
40  return result;
41 }
42 
43 std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<uint8_t>&, const CBlockIndex&)> get_block_by_index)
44 {
45  std::map<std::string, CZMQNotifierFactory> factories;
46  factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
47  factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
48  factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
49  return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
50  };
51  factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
52  factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
53 
54  std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
55  for (const auto& entry : factories)
56  {
57  std::string arg("-zmq" + entry.first);
58  const auto& factory = entry.second;
59  for (std::string& address : gArgs.GetArgs(arg)) {
60  // libzmq uses prefix "ipc://" for UNIX domain sockets
61  if (address.substr(0, ADDR_PREFIX_UNIX.length()) == ADDR_PREFIX_UNIX) {
62  address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
63  }
64 
65  std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
66  notifier->SetType(entry.first);
67  notifier->SetAddress(address);
68  notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
69  notifiers.push_back(std::move(notifier));
70  }
71  }
72 
73  if (!notifiers.empty())
74  {
75  std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
76  notificationInterface->notifiers = std::move(notifiers);
77 
78  if (notificationInterface->Initialize()) {
79  return notificationInterface;
80  }
81  }
82 
83  return nullptr;
84 }
85 
86 // Called at startup to conditionally set up ZMQ socket(s)
88 {
89  int major = 0, minor = 0, patch = 0;
90  zmq_version(&major, &minor, &patch);
91  LogPrint(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
92 
93  LogPrint(BCLog::ZMQ, "Initialize notification interface\n");
94  assert(!pcontext);
95 
96  pcontext = zmq_ctx_new();
97 
98  if (!pcontext)
99  {
100  zmqError("Unable to initialize context");
101  return false;
102  }
103 
104  for (auto& notifier : notifiers) {
105  if (notifier->Initialize(pcontext)) {
106  LogPrint(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
107  } else {
108  LogPrint(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
109  return false;
110  }
111  }
112 
113  return true;
114 }
115 
116 // Called during shutdown sequence
118 {
119  LogPrint(BCLog::ZMQ, "Shutdown notification interface\n");
120  if (pcontext)
121  {
122  for (auto& notifier : notifiers) {
123  LogPrint(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
124  notifier->Shutdown();
125  }
126  zmq_ctx_term(pcontext);
127 
128  pcontext = nullptr;
129  }
130 }
131 
132 namespace {
133 
134 template <typename Function>
135 void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
136 {
137  for (auto i = notifiers.begin(); i != notifiers.end(); ) {
138  CZMQAbstractNotifier* notifier = i->get();
139  if (func(notifier)) {
140  ++i;
141  } else {
142  notifier->Shutdown();
143  i = notifiers.erase(i);
144  }
145  }
146 }
147 
148 } // anonymous namespace
149 
150 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
151 {
152  if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
153  return;
154 
155  TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
156  return notifier->NotifyBlock(pindexNew);
157  });
158 }
159 
161 {
162  const CTransaction& tx = *(ptx.info.m_tx);
163 
164  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
165  return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
166  });
167 }
168 
170 {
171  // Called for all non-block inclusion reasons
172  const CTransaction& tx = *ptx;
173 
174  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
175  return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
176  });
177 }
178 
179 void CZMQNotificationInterface::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
180 {
181  if (role == ChainstateRole::BACKGROUND) {
182  return;
183  }
184  for (const CTransactionRef& ptx : pblock->vtx) {
185  const CTransaction& tx = *ptx;
186  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
187  return notifier->NotifyTransaction(tx);
188  });
189  }
190 
191  // Next we notify BlockConnect listeners for *all* blocks
192  TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
193  return notifier->NotifyBlockConnect(pindexConnected);
194  });
195 }
196 
197 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
198 {
199  for (const CTransactionRef& ptx : pblock->vtx) {
200  const CTransaction& tx = *ptx;
201  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
202  return notifier->NotifyTransaction(tx);
203  });
204  }
205 
206  // Next we notify BlockDisconnect listeners for *all* blocks
207  TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
208  return notifier->NotifyBlockDisconnect(pindexDisconnected);
209  });
210 }
211 
212 std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
ArgsManager gArgs
Definition: args.cpp:41
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: args.cpp:360
int64_t GetIntArg(const std::string &strArg, int64_t nDefault) const
Return integer argument or default value.
Definition: args.cpp:480
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: chain.h:141
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:296
virtual void Shutdown()=0
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
static const int DEFAULT_ZMQ_SNDHWM
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
virtual bool NotifyBlock(const CBlockIndex *pindex)
virtual bool NotifyTransaction(const CTransaction &transaction)
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
static std::unique_ptr< CZMQNotificationInterface > Create(std::function< bool(std::vector< uint8_t > &, const CBlockIndex &)> get_block_by_index)
void TransactionAddedToMempool(const NewMempoolTransactionInfo &tx, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected Provides the block that was disconnected.
void BlockConnected(ChainstateRole role, const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
ChainstateRole
This enum describes the various roles a specific Chainstate instance can take.
Definition: chain.h:25
#define LogPrint(category,...)
Definition: logging.h:263
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal.
@ ZMQ
Definition: logging.h:46
const std::string ADDR_PREFIX_UNIX
Prefix for unix domain socket addresses (which are local filesystem paths)
Definition: netbase.h:31
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:423
const CTransactionRef m_tx
assert(!tx.IsCoinBase())
std::unique_ptr< CZMQNotificationInterface > g_zmq_notification_interface
void zmqError(const std::string &str)
Definition: zmqutil.cpp:13
const std::string ADDR_PREFIX_IPC
Prefix for unix domain socket addresses (which are local filesystem paths)
Definition: zmqutil.h:13