Bitcoin Core  24.99.0
P2P Digital Currency
sock.cpp
Go to the documentation of this file.
1 // Copyright (c) 2020-2021 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <compat/compat.h>
6 #include <logging.h>
7 #include <threadinterrupt.h>
8 #include <tinyformat.h>
9 #include <util/sock.h>
10 #include <util/syserror.h>
11 #include <util/system.h>
12 #include <util/time.h>
13 
14 #include <memory>
15 #include <stdexcept>
16 #include <string>
17 
18 #ifdef WIN32
19 #include <codecvt>
20 #include <locale>
21 #endif
22 
23 #ifdef USE_POLL
24 #include <poll.h>
25 #endif
26 
27 static inline bool IOErrorIsPermanent(int err)
28 {
29  return err != WSAEAGAIN && err != WSAEINTR && err != WSAEWOULDBLOCK && err != WSAEINPROGRESS;
30 }
31 
32 Sock::Sock() : m_socket(INVALID_SOCKET) {}
33 
34 Sock::Sock(SOCKET s) : m_socket(s) {}
35 
36 Sock::Sock(Sock&& other)
37 {
38  m_socket = other.m_socket;
39  other.m_socket = INVALID_SOCKET;
40 }
41 
43 
45 {
46  Close();
47  m_socket = other.m_socket;
48  other.m_socket = INVALID_SOCKET;
49  return *this;
50 }
51 
52 SOCKET Sock::Get() const { return m_socket; }
53 
54 ssize_t Sock::Send(const void* data, size_t len, int flags) const
55 {
56  return send(m_socket, static_cast<const char*>(data), len, flags);
57 }
58 
59 ssize_t Sock::Recv(void* buf, size_t len, int flags) const
60 {
61  return recv(m_socket, static_cast<char*>(buf), len, flags);
62 }
63 
64 int Sock::Connect(const sockaddr* addr, socklen_t addr_len) const
65 {
66  return connect(m_socket, addr, addr_len);
67 }
68 
69 int Sock::Bind(const sockaddr* addr, socklen_t addr_len) const
70 {
71  return bind(m_socket, addr, addr_len);
72 }
73 
74 int Sock::Listen(int backlog) const
75 {
76  return listen(m_socket, backlog);
77 }
78 
79 std::unique_ptr<Sock> Sock::Accept(sockaddr* addr, socklen_t* addr_len) const
80 {
81 #ifdef WIN32
82  static constexpr auto ERR = INVALID_SOCKET;
83 #else
84  static constexpr auto ERR = SOCKET_ERROR;
85 #endif
86 
87  std::unique_ptr<Sock> sock;
88 
89  const auto socket = accept(m_socket, addr, addr_len);
90  if (socket != ERR) {
91  try {
92  sock = std::make_unique<Sock>(socket);
93  } catch (const std::exception&) {
94 #ifdef WIN32
95  closesocket(socket);
96 #else
97  close(socket);
98 #endif
99  }
100  }
101 
102  return sock;
103 }
104 
105 int Sock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const
106 {
107  return getsockopt(m_socket, level, opt_name, static_cast<char*>(opt_val), opt_len);
108 }
109 
110 int Sock::SetSockOpt(int level, int opt_name, const void* opt_val, socklen_t opt_len) const
111 {
112  return setsockopt(m_socket, level, opt_name, static_cast<const char*>(opt_val), opt_len);
113 }
114 
115 int Sock::GetSockName(sockaddr* name, socklen_t* name_len) const
116 {
117  return getsockname(m_socket, name, name_len);
118 }
119 
120 bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
121 {
122  // We need a `shared_ptr` owning `this` for `WaitMany()`, but don't want
123  // `this` to be destroyed when the `shared_ptr` goes out of scope at the
124  // end of this function. Create it with a custom noop deleter.
125  std::shared_ptr<const Sock> shared{this, [](const Sock*) {}};
126 
127  EventsPerSock events_per_sock{std::make_pair(shared, Events{requested})};
128 
129  if (!WaitMany(timeout, events_per_sock)) {
130  return false;
131  }
132 
133  if (occurred != nullptr) {
134  *occurred = events_per_sock.begin()->second.occurred;
135  }
136 
137  return true;
138 }
139 
140 bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
141 {
142 #ifdef USE_POLL
143  std::vector<pollfd> pfds;
144  for (const auto& [sock, events] : events_per_sock) {
145  pfds.emplace_back();
146  auto& pfd = pfds.back();
147  pfd.fd = sock->m_socket;
148  if (events.requested & RECV) {
149  pfd.events |= POLLIN;
150  }
151  if (events.requested & SEND) {
152  pfd.events |= POLLOUT;
153  }
154  }
155 
156  if (poll(pfds.data(), pfds.size(), count_milliseconds(timeout)) == SOCKET_ERROR) {
157  return false;
158  }
159 
160  assert(pfds.size() == events_per_sock.size());
161  size_t i{0};
162  for (auto& [sock, events] : events_per_sock) {
163  assert(sock->m_socket == static_cast<SOCKET>(pfds[i].fd));
164  events.occurred = 0;
165  if (pfds[i].revents & POLLIN) {
166  events.occurred |= RECV;
167  }
168  if (pfds[i].revents & POLLOUT) {
169  events.occurred |= SEND;
170  }
171  if (pfds[i].revents & (POLLERR | POLLHUP)) {
172  events.occurred |= ERR;
173  }
174  ++i;
175  }
176 
177  return true;
178 #else
179  fd_set recv;
180  fd_set send;
181  fd_set err;
182  FD_ZERO(&recv);
183  FD_ZERO(&send);
184  FD_ZERO(&err);
185  SOCKET socket_max{0};
186 
187  for (const auto& [sock, events] : events_per_sock) {
188  const auto& s = sock->m_socket;
189  if (!IsSelectableSocket(s)) {
190  return false;
191  }
192  if (events.requested & RECV) {
193  FD_SET(s, &recv);
194  }
195  if (events.requested & SEND) {
196  FD_SET(s, &send);
197  }
198  FD_SET(s, &err);
199  socket_max = std::max(socket_max, s);
200  }
201 
202  timeval tv = MillisToTimeval(timeout);
203 
204  if (select(socket_max + 1, &recv, &send, &err, &tv) == SOCKET_ERROR) {
205  return false;
206  }
207 
208  for (auto& [sock, events] : events_per_sock) {
209  const auto& s = sock->m_socket;
210  events.occurred = 0;
211  if (FD_ISSET(s, &recv)) {
212  events.occurred |= RECV;
213  }
214  if (FD_ISSET(s, &send)) {
215  events.occurred |= SEND;
216  }
217  if (FD_ISSET(s, &err)) {
218  events.occurred |= ERR;
219  }
220  }
221 
222  return true;
223 #endif /* USE_POLL */
224 }
225 
226 void Sock::SendComplete(const std::string& data,
227  std::chrono::milliseconds timeout,
228  CThreadInterrupt& interrupt) const
229 {
230  const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
231  size_t sent{0};
232 
233  for (;;) {
234  const ssize_t ret{Send(data.data() + sent, data.size() - sent, MSG_NOSIGNAL)};
235 
236  if (ret > 0) {
237  sent += static_cast<size_t>(ret);
238  if (sent == data.size()) {
239  break;
240  }
241  } else {
242  const int err{WSAGetLastError()};
243  if (IOErrorIsPermanent(err)) {
244  throw std::runtime_error(strprintf("send(): %s", NetworkErrorString(err)));
245  }
246  }
247 
248  const auto now = GetTime<std::chrono::milliseconds>();
249 
250  if (now >= deadline) {
251  throw std::runtime_error(strprintf(
252  "Send timeout (sent only %u of %u bytes before that)", sent, data.size()));
253  }
254 
255  if (interrupt) {
256  throw std::runtime_error(strprintf(
257  "Send interrupted (sent only %u of %u bytes before that)", sent, data.size()));
258  }
259 
260  // Wait for a short while (or the socket to become ready for sending) before retrying
261  // if nothing was sent.
262  const auto wait_time = std::min(deadline - now, std::chrono::milliseconds{MAX_WAIT_FOR_IO});
263  (void)Wait(wait_time, SEND);
264  }
265 }
266 
267 std::string Sock::RecvUntilTerminator(uint8_t terminator,
268  std::chrono::milliseconds timeout,
269  CThreadInterrupt& interrupt,
270  size_t max_data) const
271 {
272  const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
273  std::string data;
274  bool terminator_found{false};
275 
276  // We must not consume any bytes past the terminator from the socket.
277  // One option is to read one byte at a time and check if we have read a terminator.
278  // However that is very slow. Instead, we peek at what is in the socket and only read
279  // as many bytes as possible without crossing the terminator.
280  // Reading 64 MiB of random data with 262526 terminator chars takes 37 seconds to read
281  // one byte at a time VS 0.71 seconds with the "peek" solution below. Reading one byte
282  // at a time is about 50 times slower.
283 
284  for (;;) {
285  if (data.size() >= max_data) {
286  throw std::runtime_error(
287  strprintf("Received too many bytes without a terminator (%u)", data.size()));
288  }
289 
290  char buf[512];
291 
292  const ssize_t peek_ret{Recv(buf, std::min(sizeof(buf), max_data - data.size()), MSG_PEEK)};
293 
294  switch (peek_ret) {
295  case -1: {
296  const int err{WSAGetLastError()};
297  if (IOErrorIsPermanent(err)) {
298  throw std::runtime_error(strprintf("recv(): %s", NetworkErrorString(err)));
299  }
300  break;
301  }
302  case 0:
303  throw std::runtime_error("Connection unexpectedly closed by peer");
304  default:
305  auto end = buf + peek_ret;
306  auto terminator_pos = std::find(buf, end, terminator);
307  terminator_found = terminator_pos != end;
308 
309  const size_t try_len{terminator_found ? terminator_pos - buf + 1 :
310  static_cast<size_t>(peek_ret)};
311 
312  const ssize_t read_ret{Recv(buf, try_len, 0)};
313 
314  if (read_ret < 0 || static_cast<size_t>(read_ret) != try_len) {
315  throw std::runtime_error(
316  strprintf("recv() returned %u bytes on attempt to read %u bytes but previous "
317  "peek claimed %u bytes are available",
318  read_ret, try_len, peek_ret));
319  }
320 
321  // Don't include the terminator in the output.
322  const size_t append_len{terminator_found ? try_len - 1 : try_len};
323 
324  data.append(buf, buf + append_len);
325 
326  if (terminator_found) {
327  return data;
328  }
329  }
330 
331  const auto now = GetTime<std::chrono::milliseconds>();
332 
333  if (now >= deadline) {
334  throw std::runtime_error(strprintf(
335  "Receive timeout (received %u bytes without terminator before that)", data.size()));
336  }
337 
338  if (interrupt) {
339  throw std::runtime_error(strprintf(
340  "Receive interrupted (received %u bytes without terminator before that)",
341  data.size()));
342  }
343 
344  // Wait for a short while (or the socket to become ready for reading) before retrying.
345  const auto wait_time = std::min(deadline - now, std::chrono::milliseconds{MAX_WAIT_FOR_IO});
346  (void)Wait(wait_time, RECV);
347  }
348 }
349 
350 bool Sock::IsConnected(std::string& errmsg) const
351 {
352  if (m_socket == INVALID_SOCKET) {
353  errmsg = "not connected";
354  return false;
355  }
356 
357  char c;
358  switch (Recv(&c, sizeof(c), MSG_PEEK)) {
359  case -1: {
360  const int err = WSAGetLastError();
361  if (IOErrorIsPermanent(err)) {
362  errmsg = NetworkErrorString(err);
363  return false;
364  }
365  return true;
366  }
367  case 0:
368  errmsg = "closed";
369  return false;
370  default:
371  return true;
372  }
373 }
374 
376 {
377  if (m_socket == INVALID_SOCKET) {
378  return;
379  }
380 #ifdef WIN32
381  int ret = closesocket(m_socket);
382 #else
383  int ret = close(m_socket);
384 #endif
385  if (ret) {
386  LogPrintf("Error closing socket %d: %s\n", m_socket, NetworkErrorString(WSAGetLastError()));
387  }
389 }
390 
391 #ifdef WIN32
392 std::string NetworkErrorString(int err)
393 {
394  wchar_t buf[256];
395  buf[0] = 0;
396  if(FormatMessageW(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS | FORMAT_MESSAGE_MAX_WIDTH_MASK,
397  nullptr, err, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
398  buf, ARRAYSIZE(buf), nullptr))
399  {
400  return strprintf("%s (%d)", std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>,wchar_t>().to_bytes(buf), err);
401  }
402  else
403  {
404  return strprintf("Unknown error (%d)", err);
405  }
406 }
407 #else
408 std::string NetworkErrorString(int err)
409 {
410  // On BSD sockets implementations, NetworkErrorString is the same as SysErrorString.
411  return SysErrorString(err);
412 }
413 #endif
int ret
int flags
Definition: bitcoin-tx.cpp:525
RAII helper class that manages a socket.
Definition: sock.h:28
virtual std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const
accept(2) wrapper.
Definition: sock.cpp:79
virtual ssize_t Send(const void *data, size_t len, int flags) const
send(2) wrapper.
Definition: sock.cpp:54
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
Definition: sock.h:146
SOCKET m_socket
Contained socket.
Definition: sock.h:261
virtual void SendComplete(const std::string &data, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt) const
Send the given data, retrying on transient errors.
Definition: sock.cpp:226
virtual int Bind(const sockaddr *addr, socklen_t addr_len) const
bind(2) wrapper.
Definition: sock.cpp:69
virtual bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const
Wait for readiness for input (recv) or output (send).
Definition: sock.cpp:120
virtual ~Sock()
Destructor, close the socket or do nothing if empty.
Definition: sock.cpp:42
uint8_t Event
Definition: sock.h:136
virtual int GetSockName(sockaddr *name, socklen_t *name_len) const
getsockname(2) wrapper.
Definition: sock.cpp:115
void Close()
Close m_socket if it is not INVALID_SOCKET.
Definition: sock.cpp:375
Sock()
Default constructor, creates an empty object that does nothing when destroyed.
Definition: sock.cpp:32
virtual bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const
Same as Wait(), but wait on many sockets within the same timeout.
Definition: sock.cpp:140
static constexpr Event ERR
Ignored if passed to Wait(), but could be set in the occurred events if an exceptional condition has ...
Definition: sock.h:152
virtual bool IsConnected(std::string &errmsg) const
Check if still connected.
Definition: sock.cpp:350
virtual int SetSockOpt(int level, int opt_name, const void *opt_val, socklen_t opt_len) const
setsockopt(2) wrapper.
Definition: sock.cpp:110
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
Definition: sock.h:141
virtual SOCKET Get() const
Get the value of the contained socket.
Definition: sock.cpp:52
virtual int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const
getsockopt(2) wrapper.
Definition: sock.cpp:105
virtual int Connect(const sockaddr *addr, socklen_t addr_len) const
connect(2) wrapper.
Definition: sock.cpp:64
Sock & operator=(const Sock &)=delete
Copy assignment operator, disabled because closing the same socket twice is undesirable.
virtual ssize_t Recv(void *buf, size_t len, int flags) const
recv(2) wrapper.
Definition: sock.cpp:59
virtual std::string RecvUntilTerminator(uint8_t terminator, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt, size_t max_data) const
Read from socket until a terminator character is encountered.
Definition: sock.cpp:267
virtual int Listen(int backlog) const
listen(2) wrapper.
Definition: sock.cpp:74
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
Definition: sock.h:206
#define INVALID_SOCKET
Definition: compat.h:54
#define WSAEWOULDBLOCK
Definition: compat.h:48
#define SOCKET_ERROR
Definition: compat.h:55
#define WSAGetLastError()
Definition: compat.h:46
static bool IsSelectableSocket(const SOCKET &s)
Definition: compat.h:112
#define MSG_NOSIGNAL
Definition: compat.h:122
unsigned int SOCKET
Definition: compat.h:44
#define WSAEINPROGRESS
Definition: compat.h:52
#define WSAEINTR
Definition: compat.h:51
#define WSAEAGAIN
Definition: compat.h:49
#define LogPrintf(...)
Definition: logging.h:234
RPCHelpMan send()
Definition: spend.cpp:1110
const char * name
Definition: rest.cpp:46
static bool IOErrorIsPermanent(int err)
Definition: sock.cpp:27
std::string NetworkErrorString(int err)
Return readable error string for a network error code.
Definition: sock.cpp:408
static constexpr auto MAX_WAIT_FOR_IO
Maximum time to wait for I/O readiness.
Definition: sock.h:21
Auxiliary requested/occurred events to wait for in WaitMany().
Definition: sock.h:171
std::string SysErrorString(int err)
Return system error string from errno value.
Definition: syserror.cpp:15
struct timeval MillisToTimeval(int64_t nTimeout)
Convert milliseconds to a struct timeval for e.g.
Definition: time.cpp:159
constexpr int64_t count_milliseconds(std::chrono::milliseconds t)
Definition: time.h:55
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1164
assert(!tx.IsCoinBase())