Bitcoin Core  24.99.0
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2022 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 
7 #include <chain.h>
8 #include <chainparams.h>
9 #include <crypto/common.h>
10 #include <kernel/cs_main.h>
11 #include <logging.h>
12 #include <netaddress.h>
13 #include <netbase.h>
14 #include <node/blockstorage.h>
15 #include <primitives/block.h>
16 #include <primitives/transaction.h>
17 #include <rpc/server.h>
18 #include <serialize.h>
19 #include <streams.h>
20 #include <sync.h>
21 #include <uint256.h>
22 #include <version.h>
23 #include <zmq/zmqutil.h>
24 
25 #include <zmq.h>
26 
27 #include <cassert>
28 #include <cstdarg>
29 #include <cstddef>
30 #include <cstdint>
31 #include <cstring>
32 #include <map>
33 #include <optional>
34 #include <string>
35 #include <utility>
36 #include <vector>
37 
38 namespace Consensus {
39 struct Params;
40 }
41 
43 
44 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
45 
46 static const char *MSG_HASHBLOCK = "hashblock";
47 static const char *MSG_HASHTX = "hashtx";
48 static const char *MSG_RAWBLOCK = "rawblock";
49 static const char *MSG_RAWTX = "rawtx";
50 static const char *MSG_SEQUENCE = "sequence";
51 
52 // Internal function to send multipart message
53 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
54 {
55  va_list args;
56  va_start(args, size);
57 
58  while (1)
59  {
60  zmq_msg_t msg;
61 
62  int rc = zmq_msg_init_size(&msg, size);
63  if (rc != 0)
64  {
65  zmqError("Unable to initialize ZMQ msg");
66  va_end(args);
67  return -1;
68  }
69 
70  void *buf = zmq_msg_data(&msg);
71  memcpy(buf, data, size);
72 
73  data = va_arg(args, const void*);
74 
75  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
76  if (rc == -1)
77  {
78  zmqError("Unable to send ZMQ msg");
79  zmq_msg_close(&msg);
80  va_end(args);
81  return -1;
82  }
83 
84  zmq_msg_close(&msg);
85 
86  if (!data)
87  break;
88 
89  size = va_arg(args, size_t);
90  }
91  va_end(args);
92  return 0;
93 }
94 
95 static bool IsZMQAddressIPV6(const std::string &zmq_address)
96 {
97  const std::string tcp_prefix = "tcp://";
98  const size_t tcp_index = zmq_address.rfind(tcp_prefix);
99  const size_t colon_index = zmq_address.rfind(":");
100  if (tcp_index == 0 && colon_index != std::string::npos) {
101  const std::string ip = zmq_address.substr(tcp_prefix.length(), colon_index - tcp_prefix.length());
102  CNetAddr addr;
103  LookupHost(ip, addr, false);
104  if (addr.IsIPv6()) return true;
105  }
106  return false;
107 }
108 
110 {
111  assert(!psocket);
112 
113  // check if address is being used by other publish notifier
114  std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
115 
116  if (i==mapPublishNotifiers.end())
117  {
118  psocket = zmq_socket(pcontext, ZMQ_PUB);
119  if (!psocket)
120  {
121  zmqError("Failed to create socket");
122  return false;
123  }
124 
125  LogPrint(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
126 
127  int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
128  if (rc != 0)
129  {
130  zmqError("Failed to set outbound message high water mark");
131  zmq_close(psocket);
132  return false;
133  }
134 
135  const int so_keepalive_option {1};
136  rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option));
137  if (rc != 0) {
138  zmqError("Failed to set SO_KEEPALIVE");
139  zmq_close(psocket);
140  return false;
141  }
142 
143  // On some systems (e.g. OpenBSD) the ZMQ_IPV6 must not be enabled, if the address to bind isn't IPv6
144  const int enable_ipv6 { IsZMQAddressIPV6(address) ? 1 : 0};
145  rc = zmq_setsockopt(psocket, ZMQ_IPV6, &enable_ipv6, sizeof(enable_ipv6));
146  if (rc != 0) {
147  zmqError("Failed to set ZMQ_IPV6");
148  zmq_close(psocket);
149  return false;
150  }
151 
152  rc = zmq_bind(psocket, address.c_str());
153  if (rc != 0)
154  {
155  zmqError("Failed to bind address");
156  zmq_close(psocket);
157  return false;
158  }
159 
160  // register this notifier for the address, so it can be reused for other publish notifier
161  mapPublishNotifiers.insert(std::make_pair(address, this));
162  return true;
163  }
164  else
165  {
166  LogPrint(BCLog::ZMQ, "Reusing socket for address %s\n", address);
167  LogPrint(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
168 
169  psocket = i->second->psocket;
170  mapPublishNotifiers.insert(std::make_pair(address, this));
171 
172  return true;
173  }
174 }
175 
177 {
178  // Early return if Initialize was not called
179  if (!psocket) return;
180 
181  int count = mapPublishNotifiers.count(address);
182 
183  // remove this notifier from the list of publishers using this address
184  typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
185  std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
186 
187  for (iterator it = iterpair.first; it != iterpair.second; ++it)
188  {
189  if (it->second==this)
190  {
191  mapPublishNotifiers.erase(it);
192  break;
193  }
194  }
195 
196  if (count == 1)
197  {
198  LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address);
199  int linger = 0;
200  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
201  zmq_close(psocket);
202  }
203 
204  psocket = nullptr;
205 }
206 
207 bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void* data, size_t size)
208 {
209  assert(psocket);
210 
211  /* send three parts, command & data & a LE 4byte sequence number */
212  unsigned char msgseq[sizeof(uint32_t)];
213  WriteLE32(msgseq, nSequence);
214  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
215  if (rc == -1)
216  return false;
217 
218  /* increment memory only sequence number after sending */
219  nSequence++;
220 
221  return true;
222 }
223 
225 {
226  uint256 hash = pindex->GetBlockHash();
227  LogPrint(BCLog::ZMQ, "Publish hashblock %s to %s\n", hash.GetHex(), this->address);
228  uint8_t data[32];
229  for (unsigned int i = 0; i < 32; i++) {
230  data[31 - i] = hash.begin()[i];
231  }
232  return SendZmqMessage(MSG_HASHBLOCK, data, 32);
233 }
234 
236 {
237  uint256 hash = transaction.GetHash();
238  LogPrint(BCLog::ZMQ, "Publish hashtx %s to %s\n", hash.GetHex(), this->address);
239  uint8_t data[32];
240  for (unsigned int i = 0; i < 32; i++) {
241  data[31 - i] = hash.begin()[i];
242  }
243  return SendZmqMessage(MSG_HASHTX, data, 32);
244 }
245 
247 {
248  LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
249 
250  const Consensus::Params& consensusParams = Params().GetConsensus();
252  CBlock block;
253  if (!ReadBlockFromDisk(block, pindex, consensusParams)) {
254  zmqError("Can't read block from disk");
255  return false;
256  }
257 
258  ss << block;
259 
260  return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
261 }
262 
264 {
265  uint256 hash = transaction.GetHash();
266  LogPrint(BCLog::ZMQ, "Publish rawtx %s to %s\n", hash.GetHex(), this->address);
268  ss << transaction;
269  return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
270 }
271 
272 // Helper function to send a 'sequence' topic message with the following structure:
273 // <32-byte hash> | <1-byte label> | <8-byte LE sequence> (optional)
274 static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, char label, std::optional<uint64_t> sequence = {})
275 {
276  unsigned char data[sizeof(hash) + sizeof(label) + sizeof(uint64_t)];
277  for (unsigned int i = 0; i < sizeof(hash); ++i) {
278  data[sizeof(hash) - 1 - i] = hash.begin()[i];
279  }
280  data[sizeof(hash)] = label;
281  if (sequence) WriteLE64(data + sizeof(hash) + sizeof(label), *sequence);
282  return notifier.SendZmqMessage(MSG_SEQUENCE, data, sequence ? sizeof(data) : sizeof(hash) + sizeof(label));
283 }
284 
286 {
287  uint256 hash = pindex->GetBlockHash();
288  LogPrint(BCLog::ZMQ, "Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
289  return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C');
290 }
291 
293 {
294  uint256 hash = pindex->GetBlockHash();
295  LogPrint(BCLog::ZMQ, "Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
296  return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D');
297 }
298 
299 bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
300 {
301  uint256 hash = transaction.GetHash();
302  LogPrint(BCLog::ZMQ, "Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
303  return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence);
304 }
305 
306 bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
307 {
308  uint256 hash = transaction.GetHash();
309  LogPrint(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
310  return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence);
311 }
const auto command
const CChainParams & Params()
Return the currently selected parameters.
Definition: block.h:69
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: chain.h:151
uint256 GetBlockHash() const
Definition: chain.h:259
const Consensus::Params & GetConsensus() const
Definition: chainparams.h:82
Network address.
Definition: netaddress.h:120
bool IsIPv6() const
Definition: netaddress.cpp:314
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:295
const uint256 & GetHash() const
Definition: transaction.h:337
bool SendZmqMessage(const char *command, const void *data, size_t size)
uint32_t nSequence
upcounting per message sequence number
bool Initialize(void *pcontext) override
bool NotifyBlock(const CBlockIndex *pindex) override
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyBlock(const CBlockIndex *pindex) override
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override
bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override
bool NotifyBlockConnect(const CBlockIndex *pindex) override
bool NotifyBlockDisconnect(const CBlockIndex *pindex) override
size_type size() const
Definition: streams.h:220
const_iterator begin() const
Definition: streams.h:216
std::string GetHex() const
Definition: uint256.cpp:20
unsigned char * begin()
Definition: uint256.h:61
256-bit opaque blob.
Definition: uint256.h:119
static void WriteLE32(unsigned char *ptr, uint32_t x)
Definition: common.h:44
static void WriteLE64(unsigned char *ptr, uint64_t x)
Definition: common.h:50
static CService ip(uint32_t i)
#define LogPrint(category,...)
Definition: logging.h:245
@ ZMQ
Definition: logging.h:45
Transaction validation functions.
bool ReadBlockFromDisk(CBlock &block, const FlatFilePos &pos, const Consensus::Params &consensusParams)
Functions for disk access for blocks.
bool LookupHost(const std::string &name, std::vector< CNetAddr > &vIP, unsigned int nMaxSolutions, bool fAllowLookup, DNSLookupFn dns_lookup_function)
Resolve a host string to its corresponding network addresses.
Definition: netbase.cpp:170
ArgsManager args
@ SER_NETWORK
Definition: serialize.h:131
int RPCSerializationFlags()
Definition: server.cpp:564
Parameters that influence chain consensus.
Definition: params.h:73
static int count
Definition: tests.c:34
assert(!tx.IsCoinBase())
static const int PROTOCOL_VERSION
network protocol versioning
Definition: version.h:12
static const char * MSG_HASHBLOCK
static const char * MSG_SEQUENCE
static const char * MSG_RAWBLOCK
static bool SendSequenceMsg(CZMQAbstractPublishNotifier &notifier, uint256 hash, char label, std::optional< uint64_t > sequence={})
static bool IsZMQAddressIPV6(const std::string &zmq_address)
static std::multimap< std::string, CZMQAbstractPublishNotifier * > mapPublishNotifiers
static const char * MSG_RAWTX
static int zmq_send_multipart(void *sock, const void *data, size_t size,...)
static const char * MSG_HASHTX
void zmqError(const std::string &str)
Definition: zmqutil.cpp:13