Bitcoin ABC  0.26.3
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 
7 #include <chain.h>
8 #include <chainparams.h>
9 #include <common/system.h>
10 #include <config.h>
11 #include <logging.h>
12 #include <node/blockstorage.h>
13 #include <primitives/blockhash.h>
14 #include <primitives/txid.h>
15 #include <rpc/server.h>
16 #include <streams.h>
17 #include <zmq/zmqutil.h>
18 
19 #include <zmq.h>
20 
21 #include <cstdarg>
22 #include <cstddef>
23 #include <map>
24 #include <string>
25 #include <utility>
26 
27 static std::multimap<std::string, CZMQAbstractPublishNotifier *>
29 
30 static const char *MSG_HASHBLOCK = "hashblock";
31 static const char *MSG_HASHTX = "hashtx";
32 static const char *MSG_RAWBLOCK = "rawblock";
33 static const char *MSG_RAWTX = "rawtx";
34 static const char *MSG_SEQUENCE = "sequence";
35 
36 // Internal function to send multipart message
37 static int zmq_send_multipart(void *sock, const void *data, size_t size, ...) {
38  va_list args;
39  va_start(args, size);
40 
41  while (1) {
42  zmq_msg_t msg;
43 
44  int rc = zmq_msg_init_size(&msg, size);
45  if (rc != 0) {
46  zmqError("Unable to initialize ZMQ msg");
47  va_end(args);
48  return -1;
49  }
50 
51  void *buf = zmq_msg_data(&msg);
52  memcpy(buf, data, size);
53 
54  data = va_arg(args, const void *);
55 
56  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
57  if (rc == -1) {
58  zmqError("Unable to send ZMQ msg");
59  zmq_msg_close(&msg);
60  va_end(args);
61  return -1;
62  }
63 
64  zmq_msg_close(&msg);
65 
66  if (!data) {
67  break;
68  }
69 
70  size = va_arg(args, size_t);
71  }
72  va_end(args);
73  return 0;
74 }
75 
77  assert(!psocket);
78 
79  // check if address is being used by other publish notifier
80  std::multimap<std::string, CZMQAbstractPublishNotifier *>::iterator i =
82 
83  if (i == mapPublishNotifiers.end()) {
84  psocket = zmq_socket(pcontext, ZMQ_PUB);
85  if (!psocket) {
86  zmqError("Failed to create socket");
87  return false;
88  }
89 
91  "zmq: Outbound message high water mark for %s at %s is %d\n",
93 
94  int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM,
97  if (rc != 0) {
98  zmqError("Failed to set outbound message high water mark");
99  zmq_close(psocket);
100  return false;
101  }
102 
103  const int so_keepalive_option{1};
104  rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option,
105  sizeof(so_keepalive_option));
106  if (rc != 0) {
107  zmqError("Failed to set SO_KEEPALIVE");
108  zmq_close(psocket);
109  return false;
110  }
111 
112  rc = zmq_bind(psocket, address.c_str());
113  if (rc != 0) {
114  zmqError("Failed to bind address");
115  zmq_close(psocket);
116  return false;
117  }
118 
119  // register this notifier for the address, so it can be reused for other
120  // publish notifier
121  mapPublishNotifiers.insert(std::make_pair(address, this));
122  return true;
123  } else {
124  LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
126  "zmq: Outbound message high water mark for %s at %s is %d\n",
128 
129  psocket = i->second->psocket;
130  mapPublishNotifiers.insert(std::make_pair(address, this));
131 
132  return true;
133  }
134 }
135 
137  // Early return if Initialize was not called
138  if (!psocket) {
139  return;
140  }
141 
142  int count = mapPublishNotifiers.count(address);
143 
144  // remove this notifier from the list of publishers using this address
145  typedef std::multimap<std::string, CZMQAbstractPublishNotifier *>::iterator
146  iterator;
147  std::pair<iterator, iterator> iterpair =
148  mapPublishNotifiers.equal_range(address);
149 
150  for (iterator it = iterpair.first; it != iterpair.second; ++it) {
151  if (it->second == this) {
152  mapPublishNotifiers.erase(it);
153  break;
154  }
155  }
156 
157  if (count == 1) {
158  LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
159  int linger = 0;
160  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
161  zmq_close(psocket);
162  }
163 
164  psocket = nullptr;
165 }
166 
168  const void *data,
169  size_t size) {
170  assert(psocket);
171 
172  /* send three parts, command & data & a LE 4byte sequence number */
173  uint8_t msgseq[sizeof(uint32_t)];
174  WriteLE32(msgseq, nSequence);
175  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size,
176  msgseq, (size_t)sizeof(uint32_t), nullptr);
177  if (rc == -1) {
178  return false;
179  }
180 
181  /* increment memory only sequence number after sending */
182  nSequence++;
183 
184  return true;
185 }
186 
188  BlockHash hash = pindex->GetBlockHash();
189  LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(),
190  this->address);
191  uint8_t data[32];
192  for (unsigned int i = 0; i < 32; i++) {
193  data[31 - i] = hash.begin()[i];
194  }
195  return SendZmqMessage(MSG_HASHBLOCK, data, 32);
196 }
197 
199  const CTransaction &transaction) {
200  TxId txid = transaction.GetId();
201  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", txid.GetHex(),
202  this->address);
203  uint8_t data[32];
204  for (unsigned int i = 0; i < 32; i++) {
205  data[31 - i] = txid.begin()[i];
206  }
207  return SendZmqMessage(MSG_HASHTX, data, 32);
208 }
209 
211  LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n",
212  pindex->GetBlockHash().GetHex(), this->address);
213 
215  CBlock block;
216  if (!m_get_block_by_index(block, *pindex)) {
217  zmqError("Can't read block from disk");
218  return false;
219  }
220 
221  ss << block;
222 
223  return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
224 }
225 
227  const CTransaction &transaction) {
228  TxId txid = transaction.GetId();
229  LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", txid.GetHex(),
230  this->address);
232  ss << transaction;
233  return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
234 }
235 
236 // TODO: Dedup this code to take label char, log string
238  const CBlockIndex *pindex) {
239  BlockHash hash = pindex->GetBlockHash();
240  LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n",
241  hash.GetHex(), this->address);
242  char data[sizeof(BlockHash) + 1];
243  for (unsigned int i = 0; i < sizeof(BlockHash); i++) {
244  data[sizeof(BlockHash) - 1 - i] = hash.begin()[i];
245  }
246  // Block (C)onnect
247  data[sizeof(data) - 1] = 'C';
248  return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
249 }
250 
252  const CBlockIndex *pindex) {
253  BlockHash hash = pindex->GetBlockHash();
254  LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n",
255  hash.GetHex(), this->address);
256  char data[sizeof(BlockHash) + 1];
257  for (unsigned int i = 0; i < sizeof(BlockHash); i++) {
258  data[sizeof(BlockHash) - 1 - i] = hash.begin()[i];
259  }
260  // Block (D)isconnect
261  data[sizeof(data) - 1] = 'D';
262  return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
263 }
264 
266  const CTransaction &transaction, uint64_t mempool_sequence) {
267  TxId txid = transaction.GetId();
268  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n",
269  txid.GetHex(), this->address);
270  uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1];
271  for (unsigned int i = 0; i < sizeof(TxId); i++) {
272  data[sizeof(TxId) - 1 - i] = txid.begin()[i];
273  }
274  // Mempool (A)cceptance
275  data[sizeof(TxId)] = 'A';
276  WriteLE64(data + sizeof(TxId) + 1, mempool_sequence);
277  return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
278 }
279 
281  const CTransaction &transaction, uint64_t mempool_sequence) {
282  TxId txid = transaction.GetId();
283  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n",
284  txid.GetHex(), this->address);
285  uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1];
286  for (unsigned int i = 0; i < sizeof(TxId); i++) {
287  data[sizeof(TxId) - 1 - i] = txid.begin()[i];
288  }
289  // Mempool (R)emoval
290  data[sizeof(TxId)] = 'R';
291  WriteLE64(data + sizeof(TxId) + 1, mempool_sequence);
292  return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
293 }
Definition: block.h:60
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: blockindex.h:25
BlockHash GetBlockHash() const
Definition: blockindex.h:146
Double ended buffer combining vector and stream-like interfaces.
Definition: streams.h:177
const_iterator begin() const
Definition: streams.h:219
size_type size() const
Definition: streams.h:223
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:192
const TxId GetId() const
Definition: transaction.h:240
bool SendZmqMessage(const char *command, const void *data, size_t size)
uint32_t nSequence
upcounting per message sequence number
bool Initialize(void *pcontext) override
bool NotifyBlock(const CBlockIndex *pindex) override
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyBlock(const CBlockIndex *pindex) override
const std::function< bool(CBlock &, const CBlockIndex &)> m_get_block_by_index
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override
bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override
bool NotifyBlockConnect(const CBlockIndex *pindex) override
bool NotifyBlockDisconnect(const CBlockIndex *pindex) override
uint8_t * begin()
Definition: uint256.h:85
std::string GetHex() const
Definition: uint256.cpp:16
static void WriteLE32(uint8_t *ptr, uint32_t x)
Definition: common.h:40
static void WriteLE64(uint8_t *ptr, uint64_t x)
Definition: common.h:45
#define LogPrint(category,...)
Definition: logging.h:211
@ ZMQ
Definition: logging.h:45
@ SER_NETWORK
Definition: serialize.h:152
int RPCSerializationFlags()
Retrieves any serialization flags requested in command line argument.
Definition: server.cpp:679
A BlockHash is a unqiue identifier for a block.
Definition: blockhash.h:13
A TxId is the identifier of a transaction.
Definition: txid.h:14
static int count
Definition: tests.c:31
assert(!tx.IsCoinBase())
static const int PROTOCOL_VERSION
network protocol versioning
Definition: version.h:11
static const char * MSG_HASHBLOCK
static const char * MSG_SEQUENCE
static std::multimap< std::string, CZMQAbstractPublishNotifier * > mapPublishNotifiers
static const char * MSG_RAWBLOCK
static const char * MSG_RAWTX
static int zmq_send_multipart(void *sock, const void *data, size_t size,...)
static const char * MSG_HASHTX
void zmqError(const char *str)
Definition: zmqutil.cpp:11