Bitcoin ABC  0.26.3
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 
7 #include <chain.h>
8 #include <chainparams.h>
9 #include <config.h>
10 #include <node/blockstorage.h>
11 #include <primitives/blockhash.h>
12 #include <primitives/txid.h>
13 #include <rpc/server.h>
14 #include <streams.h>
15 #include <util/system.h>
16 #include <zmq/zmqutil.h>
17 
18 #include <zmq.h>
19 
20 #include <cstdarg>
21 #include <cstddef>
22 #include <map>
23 #include <string>
24 #include <utility>
25 
27 
28 static std::multimap<std::string, CZMQAbstractPublishNotifier *>
30 
31 static const char *MSG_HASHBLOCK = "hashblock";
32 static const char *MSG_HASHTX = "hashtx";
33 static const char *MSG_RAWBLOCK = "rawblock";
34 static const char *MSG_RAWTX = "rawtx";
35 static const char *MSG_SEQUENCE = "sequence";
36 
37 // Internal function to send multipart message
38 static int zmq_send_multipart(void *sock, const void *data, size_t size, ...) {
39  va_list args;
40  va_start(args, size);
41 
42  while (1) {
43  zmq_msg_t msg;
44 
45  int rc = zmq_msg_init_size(&msg, size);
46  if (rc != 0) {
47  zmqError("Unable to initialize ZMQ msg");
48  va_end(args);
49  return -1;
50  }
51 
52  void *buf = zmq_msg_data(&msg);
53  memcpy(buf, data, size);
54 
55  data = va_arg(args, const void *);
56 
57  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
58  if (rc == -1) {
59  zmqError("Unable to send ZMQ msg");
60  zmq_msg_close(&msg);
61  va_end(args);
62  return -1;
63  }
64 
65  zmq_msg_close(&msg);
66 
67  if (!data) {
68  break;
69  }
70 
71  size = va_arg(args, size_t);
72  }
73  va_end(args);
74  return 0;
75 }
76 
78  assert(!psocket);
79 
80  // check if address is being used by other publish notifier
81  std::multimap<std::string, CZMQAbstractPublishNotifier *>::iterator i =
83 
84  if (i == mapPublishNotifiers.end()) {
85  psocket = zmq_socket(pcontext, ZMQ_PUB);
86  if (!psocket) {
87  zmqError("Failed to create socket");
88  return false;
89  }
90 
92  "zmq: Outbound message high water mark for %s at %s is %d\n",
94 
95  int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM,
98  if (rc != 0) {
99  zmqError("Failed to set outbound message high water mark");
100  zmq_close(psocket);
101  return false;
102  }
103 
104  const int so_keepalive_option{1};
105  rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option,
106  sizeof(so_keepalive_option));
107  if (rc != 0) {
108  zmqError("Failed to set SO_KEEPALIVE");
109  zmq_close(psocket);
110  return false;
111  }
112 
113  rc = zmq_bind(psocket, address.c_str());
114  if (rc != 0) {
115  zmqError("Failed to bind address");
116  zmq_close(psocket);
117  return false;
118  }
119 
120  // register this notifier for the address, so it can be reused for other
121  // publish notifier
122  mapPublishNotifiers.insert(std::make_pair(address, this));
123  return true;
124  } else {
125  LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
127  "zmq: Outbound message high water mark for %s at %s is %d\n",
129 
130  psocket = i->second->psocket;
131  mapPublishNotifiers.insert(std::make_pair(address, this));
132 
133  return true;
134  }
135 }
136 
138  // Early return if Initialize was not called
139  if (!psocket) {
140  return;
141  }
142 
143  int count = mapPublishNotifiers.count(address);
144 
145  // remove this notifier from the list of publishers using this address
146  typedef std::multimap<std::string, CZMQAbstractPublishNotifier *>::iterator
147  iterator;
148  std::pair<iterator, iterator> iterpair =
149  mapPublishNotifiers.equal_range(address);
150 
151  for (iterator it = iterpair.first; it != iterpair.second; ++it) {
152  if (it->second == this) {
153  mapPublishNotifiers.erase(it);
154  break;
155  }
156  }
157 
158  if (count == 1) {
159  LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
160  int linger = 0;
161  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
162  zmq_close(psocket);
163  }
164 
165  psocket = nullptr;
166 }
167 
169  const void *data,
170  size_t size) {
171  assert(psocket);
172 
173  /* send three parts, command & data & a LE 4byte sequence number */
174  uint8_t msgseq[sizeof(uint32_t)];
175  WriteLE32(msgseq, nSequence);
176  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size,
177  msgseq, (size_t)sizeof(uint32_t), nullptr);
178  if (rc == -1) {
179  return false;
180  }
181 
182  /* increment memory only sequence number after sending */
183  nSequence++;
184 
185  return true;
186 }
187 
189  BlockHash hash = pindex->GetBlockHash();
190  LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(),
191  this->address);
192  uint8_t data[32];
193  for (unsigned int i = 0; i < 32; i++) {
194  data[31 - i] = hash.begin()[i];
195  }
196  return SendZmqMessage(MSG_HASHBLOCK, data, 32);
197 }
198 
200  const CTransaction &transaction) {
201  TxId txid = transaction.GetId();
202  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", txid.GetHex(),
203  this->address);
204  uint8_t data[32];
205  for (unsigned int i = 0; i < 32; i++) {
206  data[31 - i] = txid.begin()[i];
207  }
208  return SendZmqMessage(MSG_HASHTX, data, 32);
209 }
210 
212  LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n",
213  pindex->GetBlockHash().GetHex(), this->address);
214 
215  const Config &config = GetConfig();
217  CBlock block;
218  if (!ReadBlockFromDisk(block, pindex,
219  config.GetChainParams().GetConsensus())) {
220  zmqError("Can't read block from disk");
221  return false;
222  }
223 
224  ss << block;
225 
226  return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
227 }
228 
230  const CTransaction &transaction) {
231  TxId txid = transaction.GetId();
232  LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", txid.GetHex(),
233  this->address);
235  ss << transaction;
236  return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
237 }
238 
239 // TODO: Dedup this code to take label char, log string
241  const CBlockIndex *pindex) {
242  BlockHash hash = pindex->GetBlockHash();
243  LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n",
244  hash.GetHex(), this->address);
245  char data[sizeof(BlockHash) + 1];
246  for (unsigned int i = 0; i < sizeof(BlockHash); i++) {
247  data[sizeof(BlockHash) - 1 - i] = hash.begin()[i];
248  }
249  // Block (C)onnect
250  data[sizeof(data) - 1] = 'C';
251  return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
252 }
253 
255  const CBlockIndex *pindex) {
256  BlockHash hash = pindex->GetBlockHash();
257  LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n",
258  hash.GetHex(), this->address);
259  char data[sizeof(BlockHash) + 1];
260  for (unsigned int i = 0; i < sizeof(BlockHash); i++) {
261  data[sizeof(BlockHash) - 1 - i] = hash.begin()[i];
262  }
263  // Block (D)isconnect
264  data[sizeof(data) - 1] = 'D';
265  return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
266 }
267 
269  const CTransaction &transaction, uint64_t mempool_sequence) {
270  TxId txid = transaction.GetId();
271  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n",
272  txid.GetHex(), this->address);
273  uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1];
274  for (unsigned int i = 0; i < sizeof(TxId); i++) {
275  data[sizeof(TxId) - 1 - i] = txid.begin()[i];
276  }
277  // Mempool (A)cceptance
278  data[sizeof(TxId)] = 'A';
279  WriteLE64(data + sizeof(TxId) + 1, mempool_sequence);
280  return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
281 }
282 
284  const CTransaction &transaction, uint64_t mempool_sequence) {
285  TxId txid = transaction.GetId();
286  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n",
287  txid.GetHex(), this->address);
288  uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1];
289  for (unsigned int i = 0; i < sizeof(TxId); i++) {
290  data[sizeof(TxId) - 1 - i] = txid.begin()[i];
291  }
292  // Mempool (R)emoval
293  data[sizeof(TxId)] = 'R';
294  WriteLE64(data + sizeof(TxId) + 1, mempool_sequence);
295  return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
296 }
Definition: block.h:60
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: blockindex.h:26
BlockHash GetBlockHash() const
Definition: blockindex.h:147
const Consensus::Params & GetConsensus() const
Definition: chainparams.h:86
Double ended buffer combining vector and stream-like interfaces.
Definition: streams.h:199
const_iterator begin() const
Definition: streams.h:242
size_type size() const
Definition: streams.h:246
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:192
const TxId GetId() const
Definition: transaction.h:240
bool SendZmqMessage(const char *command, const void *data, size_t size)
uint32_t nSequence
upcounting per message sequence number
bool Initialize(void *pcontext) override
bool NotifyBlock(const CBlockIndex *pindex) override
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyBlock(const CBlockIndex *pindex) override
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override
bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override
bool NotifyBlockConnect(const CBlockIndex *pindex) override
bool NotifyBlockDisconnect(const CBlockIndex *pindex) override
Definition: config.h:17
virtual const CChainParams & GetChainParams() const =0
uint8_t * begin()
Definition: uint256.h:83
std::string GetHex() const
Definition: uint256.cpp:16
const Config & GetConfig()
Definition: config.cpp:34
static void WriteLE32(uint8_t *ptr, uint32_t x)
Definition: common.h:40
static void WriteLE64(uint8_t *ptr, uint64_t x)
Definition: common.h:45
#define LogPrint(category,...)
Definition: logging.h:210
@ ZMQ
Definition: logging.h:45
bool ReadBlockFromDisk(CBlock &block, const FlatFilePos &pos, const Consensus::Params &params)
Functions for disk access for blocks.
@ SER_NETWORK
Definition: serialize.h:166
int RPCSerializationFlags()
Retrieves any serialization flags requested in command line argument.
Definition: server.cpp:606
A BlockHash is a unqiue identifier for a block.
Definition: blockhash.h:13
A TxId is the identifier of a transaction.
Definition: txid.h:14
static int count
Definition: tests.c:31
assert(!tx.IsCoinBase())
static const int PROTOCOL_VERSION
network protocol versioning
Definition: version.h:11
static const char * MSG_HASHBLOCK
static const char * MSG_SEQUENCE
static std::multimap< std::string, CZMQAbstractPublishNotifier * > mapPublishNotifiers
static const char * MSG_RAWBLOCK
static const char * MSG_RAWTX
static int zmq_send_multipart(void *sock, const void *data, size_t size,...)
static const char * MSG_HASHTX
void zmqError(const char *str)
Definition: zmqutil.cpp:11