187 std::unique_ptr<PartiallyDownloadedBlock> partialBlock;
221 std::atomic<ServiceFlags> m_their_services{
NODE_NONE};
224 Mutex m_misbehavior_mutex;
226 int m_misbehavior_score
GUARDED_BY(m_misbehavior_mutex){0};
228 bool m_should_discourage
GUARDED_BY(m_misbehavior_mutex){
false};
231 Mutex m_block_inv_mutex;
235 std::vector<uint256> m_blocks_for_inv_relay
GUARDED_BY(m_block_inv_mutex);
239 std::vector<uint256> m_blocks_for_headers_relay
GUARDED_BY(m_block_inv_mutex);
247 std::atomic<int> m_starting_height{-1};
250 std::atomic<uint64_t> m_ping_nonce_sent{0};
252 std::atomic<std::chrono::microseconds> m_ping_start{0us};
254 std::atomic<bool> m_ping_queued{
false};
257 std::atomic<bool> m_wtxid_relay{
false};
269 bool m_relay_txs
GUARDED_BY(m_bloom_filter_mutex){
false};
271 std::unique_ptr<CBloomFilter> m_bloom_filter
PT_GUARDED_BY(m_bloom_filter_mutex)
GUARDED_BY(m_bloom_filter_mutex){
nullptr};
282 std::set<uint256> m_tx_inventory_to_send
GUARDED_BY(m_tx_inventory_mutex);
286 bool m_send_mempool
GUARDED_BY(m_tx_inventory_mutex){
false};
289 std::chrono::microseconds m_next_inv_send_time
GUARDED_BY(m_tx_inventory_mutex){0};
295 std::atomic<CAmount> m_fee_filter_received{0};
301 LOCK(m_tx_relay_mutex);
303 m_tx_relay = std::make_unique<Peer::TxRelay>();
304 return m_tx_relay.get();
309 return WITH_LOCK(m_tx_relay_mutex,
return m_tx_relay.get());
338 std::atomic_bool m_addr_relay_enabled{
false};
342 mutable Mutex m_addr_send_times_mutex;
344 std::chrono::microseconds m_next_addr_send
GUARDED_BY(m_addr_send_times_mutex){0};
346 std::chrono::microseconds m_next_local_addr_send
GUARDED_BY(m_addr_send_times_mutex){0};
349 std::atomic_bool m_wants_addrv2{
false};
358 std::atomic<uint64_t> m_addr_rate_limited{0};
360 std::atomic<uint64_t> m_addr_processed{0};
366 Mutex m_getdata_requests_mutex;
368 std::deque<CInv> m_getdata_requests
GUARDED_BY(m_getdata_requests_mutex);
374 Mutex m_headers_sync_mutex;
377 std::unique_ptr<HeadersSyncState> m_headers_sync
PT_GUARDED_BY(m_headers_sync_mutex)
GUARDED_BY(m_headers_sync_mutex) {};
380 std::atomic<bool> m_sent_sendheaders{
false};
393 , m_our_services{our_services}
397 mutable Mutex m_tx_relay_mutex;
400 std::unique_ptr<TxRelay> m_tx_relay
GUARDED_BY(m_tx_relay_mutex);
403 using PeerRef = std::shared_ptr<Peer>;
415 uint256 hashLastUnknownBlock{};
421 bool fSyncStarted{
false};
423 std::chrono::microseconds m_stalling_since{0us};
424 std::list<QueuedBlock> vBlocksInFlight;
426 std::chrono::microseconds m_downloading_since{0us};
428 bool fPreferredDownload{
false};
430 bool m_requested_hb_cmpctblocks{
false};
432 bool m_provides_cmpctblocks{
false};
458 struct ChainSyncTimeoutState {
460 std::chrono::seconds m_timeout{0s};
464 bool m_sent_getheaders{
false};
466 bool m_protect{
false};
469 ChainSyncTimeoutState m_chain_sync;
472 int64_t m_last_block_announcement{0};
475 const bool m_is_inbound;
477 CNodeState(
bool is_inbound) : m_is_inbound(is_inbound) {}
503 EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex);
505 EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex);
516 void SetBestHeight(
int height)
override { m_best_height = height; };
519 const std::chrono::microseconds time_received,
const std::atomic<bool>& interruptMsgProc)
override
520 EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex);
545 void Misbehaving(Peer& peer,
int howmuch,
const std::string& message);
558 bool via_compact_block,
const std::string& message =
"")
575 bool MaybeDiscourageAndDisconnect(
CNode& pnode, Peer& peer);
588 bool ProcessOrphanTx(Peer& peer)
598 void ProcessHeadersMessage(
CNode& pfrom, Peer& peer,
600 bool via_compact_block)
612 bool CheckHeadersAreContinuous(const
std::vector<
CBlockHeader>& headers) const;
631 bool IsContinuationOfLowWorkHeadersSync(Peer& peer,
CNode& pfrom,
645 bool TryLowWorkHeadersSync(Peer& peer,
CNode& pfrom,
660 void HeadersDirectFetchBlocks(
CNode& pfrom, const Peer& peer, const
CBlockIndex& last_header);
662 void UpdatePeerStateForReceivedHeaders(
CNode& pfrom, Peer& peer, const
CBlockIndex& last_header,
bool received_new_header,
bool may_have_more_headers)
670 void AddTxAnnouncement(const
CNode&
node, const
GenTxid& gtxid,
std::chrono::microseconds current_time)
675 template <
typename... Args>
676 void MakeAndPushMessage(
CNode&
node, std::string msg_type, Args&&...
args)
const
682 void PushNodeVersion(
CNode& pnode,
const Peer& peer);
688 void MaybeSendPing(
CNode& node_to, Peer& peer, std::chrono::microseconds now);
720 std::unique_ptr<TxReconciliationTracker> m_txreconciliation;
723 std::atomic<int> m_best_height{-1};
728 const Options m_opts;
730 bool RejectIncomingTxs(
const CNode& peer)
const;
738 mutable Mutex m_peer_mutex;
745 std::map<NodeId, PeerRef> m_peer_map
GUARDED_BY(m_peer_mutex);
755 uint32_t GetFetchFlags(
const Peer& peer)
const;
757 std::atomic<std::chrono::microseconds> m_next_inv_to_inbounds{0us};
774 std::atomic<int> m_wtxid_relay_peers{0};
785 bool AlreadyHaveTx(
const GenTxid& gtxid)
840 Mutex m_recent_confirmed_transactions_mutex;
849 std::chrono::microseconds NextInvToInbounds(std::chrono::microseconds now,
850 std::chrono::seconds average_interval);
854 Mutex m_most_recent_block_mutex;
855 std::shared_ptr<const CBlock> m_most_recent_block
GUARDED_BY(m_most_recent_block_mutex);
856 std::shared_ptr<const CBlockHeaderAndShortTxIDs> m_most_recent_compact_block
GUARDED_BY(m_most_recent_block_mutex);
858 std::unique_ptr<const std::map<uint256, CTransactionRef>> m_most_recent_block_txs
GUARDED_BY(m_most_recent_block_mutex);
862 Mutex m_headers_presync_mutex;
870 using HeadersPresyncStats = std::pair<arith_uint256, std::optional<std::pair<int64_t, uint32_t>>>;
872 std::map<NodeId, HeadersPresyncStats> m_headers_presync_stats
GUARDED_BY(m_headers_presync_mutex) {};
876 std::atomic_bool m_headers_presync_should_signal{
false};
946 std::atomic<
std::chrono::seconds> m_last_tip_update{0s};
952 void ProcessGetData(
CNode& pfrom, Peer& peer,
const std::atomic<bool>& interruptMsgProc)
957 void ProcessBlock(
CNode&
node,
const std::shared_ptr<const CBlock>& block,
bool force_processing,
bool min_pow_checked);
985 std::vector<std::pair<uint256, CTransactionRef>> vExtraTxnForCompact
GUARDED_BY(g_msgproc_mutex);
987 size_t vExtraTxnForCompactIt
GUARDED_BY(g_msgproc_mutex) = 0;
1003 void ProcessGetBlockData(
CNode& pfrom, Peer& peer, const
CInv& inv)
1021 bool PrepareBlockFilterRequest(
CNode&
node, Peer& peer,
1023 const
uint256& stop_hash, uint32_t max_height_diff,
1074 std::map<NodeId, CNodeState>::const_iterator it = m_node_states.find(pnode);
1075 if (it == m_node_states.end())
1082 return const_cast<CNodeState*
>(std::as_const(*this).State(pnode));
1090 static bool IsAddrCompatible(
const Peer& peer,
const CAddress& addr)
1095 void PeerManagerImpl::AddAddressKnown(Peer& peer,
const CAddress& addr)
1097 assert(peer.m_addr_known);
1098 peer.m_addr_known->insert(addr.
GetKey());
1101 void PeerManagerImpl::PushAddress(Peer& peer,
const CAddress& addr)
1106 assert(peer.m_addr_known);
1107 if (addr.
IsValid() && !peer.m_addr_known->contains(addr.
GetKey()) && IsAddrCompatible(peer, addr)) {
1109 peer.m_addrs_to_send[m_rng.randrange(peer.m_addrs_to_send.size())] = addr;
1111 peer.m_addrs_to_send.push_back(addr);
1116 static void AddKnownTx(Peer& peer,
const uint256& hash)
1118 auto tx_relay = peer.GetTxRelay();
1119 if (!tx_relay)
return;
1121 LOCK(tx_relay->m_tx_inventory_mutex);
1122 tx_relay->m_tx_inventory_known_filter.insert(hash);
1126 static bool CanServeBlocks(
const Peer& peer)
1133 static bool IsLimitedPeer(
const Peer& peer)
1140 static bool CanServeWitnesses(
const Peer& peer)
1145 std::chrono::microseconds PeerManagerImpl::NextInvToInbounds(std::chrono::microseconds now,
1146 std::chrono::seconds average_interval)
1148 if (m_next_inv_to_inbounds.load() < now) {
1154 return m_next_inv_to_inbounds;
1157 bool PeerManagerImpl::IsBlockRequested(
const uint256& hash)
1159 return mapBlocksInFlight.count(hash);
1162 bool PeerManagerImpl::IsBlockRequestedFromOutbound(
const uint256& hash)
1164 for (
auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) {
1165 auto [nodeid, block_it] = range.first->second;
1166 CNodeState& nodestate = *
Assert(State(nodeid));
1167 if (!nodestate.m_is_inbound)
return true;
1173 void PeerManagerImpl::RemoveBlockRequest(
const uint256& hash, std::optional<NodeId> from_peer)
1175 auto range = mapBlocksInFlight.equal_range(hash);
1176 if (range.first == range.second) {
1184 while (range.first != range.second) {
1185 auto [node_id, list_it] = range.first->second;
1187 if (from_peer && *from_peer != node_id) {
1192 CNodeState& state = *
Assert(State(node_id));
1194 if (state.vBlocksInFlight.begin() == list_it) {
1196 state.m_downloading_since = std::max(state.m_downloading_since, GetTime<std::chrono::microseconds>());
1198 state.vBlocksInFlight.erase(list_it);
1200 if (state.vBlocksInFlight.empty()) {
1202 m_peers_downloading_from--;
1204 state.m_stalling_since = 0us;
1206 range.first = mapBlocksInFlight.erase(range.first);
1210 bool PeerManagerImpl::BlockRequested(
NodeId nodeid,
const CBlockIndex& block, std::list<QueuedBlock>::iterator** pit)
1214 CNodeState *state = State(nodeid);
1215 assert(state !=
nullptr);
1220 for (
auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) {
1221 if (range.first->second.first == nodeid) {
1223 *pit = &range.first->second.second;
1230 RemoveBlockRequest(hash, nodeid);
1232 std::list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(),
1233 {&block, std::unique_ptr<PartiallyDownloadedBlock>(pit ? new PartiallyDownloadedBlock(&m_mempool) : nullptr)});
1234 if (state->vBlocksInFlight.size() == 1) {
1236 state->m_downloading_since = GetTime<std::chrono::microseconds>();
1237 m_peers_downloading_from++;
1239 auto itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it)));
1241 *pit = &itInFlight->second.second;
1246 void PeerManagerImpl::MaybeSetPeerAsAnnouncingHeaderAndIDs(
NodeId nodeid)
1253 if (m_opts.ignore_incoming_txs)
return;
1255 CNodeState* nodestate = State(nodeid);
1256 if (!nodestate || !nodestate->m_provides_cmpctblocks) {
1261 int num_outbound_hb_peers = 0;
1262 for (std::list<NodeId>::iterator it = lNodesAnnouncingHeaderAndIDs.begin(); it != lNodesAnnouncingHeaderAndIDs.end(); it++) {
1263 if (*it == nodeid) {
1264 lNodesAnnouncingHeaderAndIDs.erase(it);
1265 lNodesAnnouncingHeaderAndIDs.push_back(nodeid);
1268 CNodeState *state = State(*it);
1269 if (state !=
nullptr && !state->m_is_inbound) ++num_outbound_hb_peers;
1271 if (nodestate->m_is_inbound) {
1274 if (lNodesAnnouncingHeaderAndIDs.size() >= 3 && num_outbound_hb_peers == 1) {
1275 CNodeState *remove_node = State(lNodesAnnouncingHeaderAndIDs.front());
1276 if (remove_node !=
nullptr && !remove_node->m_is_inbound) {
1279 std::swap(lNodesAnnouncingHeaderAndIDs.front(), *std::next(lNodesAnnouncingHeaderAndIDs.begin()));
1285 if (lNodesAnnouncingHeaderAndIDs.size() >= 3) {
1288 m_connman.
ForNode(lNodesAnnouncingHeaderAndIDs.front(), [
this](
CNode* pnodeStop){
1289 MakeAndPushMessage(*pnodeStop, NetMsgType::SENDCMPCT, false, CMPCTBLOCKS_VERSION);
1291 pnodeStop->m_bip152_highbandwidth_to = false;
1294 lNodesAnnouncingHeaderAndIDs.pop_front();
1299 lNodesAnnouncingHeaderAndIDs.push_back(pfrom->
GetId());
1304 bool PeerManagerImpl::TipMayBeStale()
1308 if (m_last_tip_update.load() == 0s) {
1309 m_last_tip_update = GetTime<std::chrono::seconds>();
1311 return m_last_tip_update.load() < GetTime<std::chrono::seconds>() - std::chrono::seconds{consensusParams.
nPowTargetSpacing * 3} && mapBlocksInFlight.empty();
1314 bool PeerManagerImpl::CanDirectFetch()
1321 if (state->pindexBestKnownBlock && pindex == state->pindexBestKnownBlock->GetAncestor(pindex->nHeight))
1323 if (state->pindexBestHeaderSent && pindex == state->pindexBestHeaderSent->GetAncestor(pindex->nHeight))
1328 void PeerManagerImpl::ProcessBlockAvailability(
NodeId nodeid) {
1329 CNodeState *state = State(nodeid);
1330 assert(state !=
nullptr);
1332 if (!state->hashLastUnknownBlock.IsNull()) {
1335 if (state->pindexBestKnownBlock ==
nullptr || pindex->
nChainWork >= state->pindexBestKnownBlock->nChainWork) {
1336 state->pindexBestKnownBlock = pindex;
1338 state->hashLastUnknownBlock.SetNull();
1343 void PeerManagerImpl::UpdateBlockAvailability(
NodeId nodeid,
const uint256 &hash) {
1344 CNodeState *state = State(nodeid);
1345 assert(state !=
nullptr);
1347 ProcessBlockAvailability(nodeid);
1352 if (state->pindexBestKnownBlock ==
nullptr || pindex->
nChainWork >= state->pindexBestKnownBlock->nChainWork) {
1353 state->pindexBestKnownBlock = pindex;
1357 state->hashLastUnknownBlock = hash;
1362 void PeerManagerImpl::FindNextBlocksToDownload(
const Peer& peer,
unsigned int count, std::vector<const CBlockIndex*>& vBlocks,
NodeId& nodeStaller)
1367 vBlocks.reserve(vBlocks.size() +
count);
1368 CNodeState *state = State(peer.m_id);
1369 assert(state !=
nullptr);
1372 ProcessBlockAvailability(peer.m_id);
1374 if (state->pindexBestKnownBlock ==
nullptr || state->pindexBestKnownBlock->nChainWork < m_chainman.
ActiveChain().
Tip()->
nChainWork || state->pindexBestKnownBlock->nChainWork < m_chainman.
MinimumChainWork()) {
1379 if (state->pindexLastCommonBlock ==
nullptr) {
1382 state->pindexLastCommonBlock = m_chainman.
ActiveChain()[std::min(state->pindexBestKnownBlock->nHeight, m_chainman.
ActiveChain().
Height())];
1387 state->pindexLastCommonBlock =
LastCommonAncestor(state->pindexLastCommonBlock, state->pindexBestKnownBlock);
1388 if (state->pindexLastCommonBlock == state->pindexBestKnownBlock)
1391 const CBlockIndex *pindexWalk = state->pindexLastCommonBlock;
1397 FindNextBlocks(vBlocks, peer, state, pindexWalk,
count, nWindowEnd, &m_chainman.
ActiveChain(), &nodeStaller);
1400 void PeerManagerImpl::TryDownloadingHistoricalBlocks(
const Peer& peer,
unsigned int count, std::vector<const CBlockIndex*>& vBlocks,
const CBlockIndex *from_tip,
const CBlockIndex* target_block)
1405 if (vBlocks.size() >=
count) {
1409 vBlocks.reserve(
count);
1410 CNodeState *state =
Assert(State(peer.m_id));
1412 if (state->pindexBestKnownBlock ==
nullptr || state->pindexBestKnownBlock->GetAncestor(target_block->
nHeight) != target_block) {
1429 void PeerManagerImpl::FindNextBlocks(std::vector<const CBlockIndex*>& vBlocks,
const Peer& peer, CNodeState *state,
const CBlockIndex *pindexWalk,
unsigned int count,
int nWindowEnd,
const CChain* activeChain,
NodeId* nodeStaller)
1431 std::vector<const CBlockIndex*> vToFetch;
1432 int nMaxHeight = std::min<int>(state->pindexBestKnownBlock->nHeight, nWindowEnd + 1);
1434 while (pindexWalk->
nHeight < nMaxHeight) {
1438 int nToFetch = std::min(nMaxHeight - pindexWalk->
nHeight, std::max<int>(
count - vBlocks.size(), 128));
1439 vToFetch.resize(nToFetch);
1440 pindexWalk = state->pindexBestKnownBlock->
GetAncestor(pindexWalk->
nHeight + nToFetch);
1441 vToFetch[nToFetch - 1] = pindexWalk;
1442 for (
unsigned int i = nToFetch - 1; i > 0; i--) {
1443 vToFetch[i - 1] = vToFetch[i]->
pprev;
1461 state->pindexLastCommonBlock = pindex;
1462 }
else if (!IsBlockRequested(pindex->
GetBlockHash())) {
1464 if (pindex->
nHeight > nWindowEnd) {
1466 if (vBlocks.size() == 0 && waitingfor != peer.m_id) {
1468 if (nodeStaller) *nodeStaller = waitingfor;
1472 vBlocks.push_back(pindex);
1473 if (vBlocks.size() ==
count) {
1476 }
else if (waitingfor == -1) {
1478 waitingfor = mapBlocksInFlight.lower_bound(pindex->
GetBlockHash())->second.first;
1486 void PeerManagerImpl::PushNodeVersion(
CNode& pnode,
const Peer& peer)
1488 uint64_t my_services{peer.m_our_services};
1489 const int64_t nTime{
count_seconds(GetTime<std::chrono::seconds>())};
1491 const int nNodeStartingHeight{m_best_height};
1498 const bool tx_relay{!RejectIncomingTxs(pnode)};
1505 LogPrint(
BCLog::NET,
"send version message: version %d, blocks=%d, them=%s, txrelay=%d, peer=%d\n",
PROTOCOL_VERSION, nNodeStartingHeight, addr_you.
ToStringAddrPort(), tx_relay, nodeid);
1511 void PeerManagerImpl::AddTxAnnouncement(
const CNode&
node,
const GenTxid& gtxid, std::chrono::microseconds current_time)
1519 const CNodeState* state = State(nodeid);
1529 const bool preferred = state->fPreferredDownload;
1535 m_txrequest.ReceivedInv(nodeid, gtxid, preferred, current_time + delay);
1538 void PeerManagerImpl::UpdateLastBlockAnnounceTime(
NodeId node, int64_t time_in_seconds)
1541 CNodeState *state = State(
node);
1542 if (state) state->m_last_block_announcement = time_in_seconds;
1550 m_node_states.emplace_hint(m_node_states.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(
node.IsInboundConn()));
1551 assert(m_txrequest.Count(nodeid) == 0);
1553 PeerRef peer = std::make_shared<Peer>(nodeid, our_services);
1556 m_peer_map.emplace_hint(m_peer_map.end(), nodeid, peer);
1558 if (!
node.IsInboundConn()) {
1559 PushNodeVersion(
node, *peer);
1563 void PeerManagerImpl::ReattemptInitialBroadcast(
CScheduler& scheduler)
1567 for (
const auto& txid : unbroadcast_txids) {
1570 if (tx !=
nullptr) {
1571 RelayTransaction(txid, tx->GetWitnessHash());
1579 const std::chrono::milliseconds delta = 10min +
GetRandMillis(5min);
1580 scheduler.
scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta);
1583 void PeerManagerImpl::FinalizeNode(
const CNode&
node)
1595 PeerRef peer = RemovePeer(nodeid);
1597 misbehavior =
WITH_LOCK(peer->m_misbehavior_mutex,
return peer->m_misbehavior_score);
1598 m_wtxid_relay_peers -= peer->m_wtxid_relay;
1599 assert(m_wtxid_relay_peers >= 0);
1601 CNodeState *state = State(nodeid);
1602 assert(state !=
nullptr);
1604 if (state->fSyncStarted)
1607 for (
const QueuedBlock& entry : state->vBlocksInFlight) {
1608 auto range = mapBlocksInFlight.equal_range(entry.pindex->GetBlockHash());
1609 while (range.first != range.second) {
1610 auto [node_id, list_it] = range.first->second;
1611 if (node_id != nodeid) {
1614 range.first = mapBlocksInFlight.erase(range.first);
1619 m_txrequest.DisconnectedPeer(nodeid);
1620 if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid);
1621 m_num_preferred_download_peers -= state->fPreferredDownload;
1622 m_peers_downloading_from -= (!state->vBlocksInFlight.empty());
1623 assert(m_peers_downloading_from >= 0);
1624 m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect;
1625 assert(m_outbound_peers_with_protect_from_disconnect >= 0);
1627 m_node_states.erase(nodeid);
1629 if (m_node_states.empty()) {
1631 assert(mapBlocksInFlight.empty());
1632 assert(m_num_preferred_download_peers == 0);
1633 assert(m_peers_downloading_from == 0);
1634 assert(m_outbound_peers_with_protect_from_disconnect == 0);
1635 assert(m_wtxid_relay_peers == 0);
1636 assert(m_txrequest.Size() == 0);
1640 if (
node.fSuccessfullyConnected && misbehavior == 0 &&
1641 !
node.IsBlockOnlyConn() && !
node.IsInboundConn()) {
1648 LOCK(m_headers_presync_mutex);
1649 m_headers_presync_stats.erase(nodeid);
1654 PeerRef PeerManagerImpl::GetPeerRef(
NodeId id)
const
1657 auto it = m_peer_map.find(
id);
1658 return it != m_peer_map.end() ? it->second :
nullptr;
1661 PeerRef PeerManagerImpl::RemovePeer(
NodeId id)
1665 auto it = m_peer_map.find(
id);
1666 if (it != m_peer_map.end()) {
1667 ret = std::move(it->second);
1668 m_peer_map.erase(it);
1677 const CNodeState* state = State(nodeid);
1678 if (state ==
nullptr)
1680 stats.
nSyncHeight = state->pindexBestKnownBlock ? state->pindexBestKnownBlock->nHeight : -1;
1681 stats.
nCommonHeight = state->pindexLastCommonBlock ? state->pindexLastCommonBlock->nHeight : -1;
1682 for (
const QueuedBlock& queue : state->vBlocksInFlight) {
1688 PeerRef peer = GetPeerRef(nodeid);
1689 if (peer ==
nullptr)
return false;
1698 auto ping_wait{0us};
1699 if ((0 != peer->m_ping_nonce_sent) && (0 != peer->m_ping_start.load().count())) {
1700 ping_wait = GetTime<std::chrono::microseconds>() - peer->m_ping_start.load();
1703 if (
auto tx_relay = peer->GetTxRelay(); tx_relay !=
nullptr) {
1716 LOCK(peer->m_headers_sync_mutex);
1717 if (peer->m_headers_sync) {
1725 void PeerManagerImpl::AddToCompactExtraTransactions(
const CTransactionRef& tx)
1727 if (m_opts.max_extra_txs <= 0)
1729 if (!vExtraTxnForCompact.size())
1730 vExtraTxnForCompact.resize(m_opts.max_extra_txs);
1731 vExtraTxnForCompact[vExtraTxnForCompactIt] = std::make_pair(tx->GetWitnessHash(), tx);
1732 vExtraTxnForCompactIt = (vExtraTxnForCompactIt + 1) % m_opts.max_extra_txs;
1735 void PeerManagerImpl::Misbehaving(Peer& peer,
int howmuch,
const std::string& message)
1739 LOCK(peer.m_misbehavior_mutex);
1740 const int score_before{peer.m_misbehavior_score};
1741 peer.m_misbehavior_score += howmuch;
1742 const int score_now{peer.m_misbehavior_score};
1744 const std::string message_prefixed = message.empty() ?
"" : (
": " + message);
1745 std::string warning;
1748 warning =
" DISCOURAGE THRESHOLD EXCEEDED";
1749 peer.m_should_discourage =
true;
1753 peer.m_id, score_before, score_now, warning, message_prefixed);
1757 bool via_compact_block,
const std::string& message)
1759 PeerRef peer{GetPeerRef(nodeid)};
1770 if (!via_compact_block) {
1771 if (peer) Misbehaving(*peer, 100, message);
1778 CNodeState *node_state = State(nodeid);
1779 if (node_state ==
nullptr) {
1785 if (!via_compact_block && !node_state->m_is_inbound) {
1786 if (peer) Misbehaving(*peer, 100, message);
1794 if (peer) Misbehaving(*peer, 100, message);
1799 if (peer) Misbehaving(*peer, 10, message);
1805 if (message !=
"") {
1813 PeerRef peer{GetPeerRef(nodeid)};
1819 if (peer) Misbehaving(*peer, 100,
"");
1839 bool PeerManagerImpl::BlockRequestAllowed(
const CBlockIndex* pindex)
1848 std::optional<std::string> PeerManagerImpl::FetchBlock(
NodeId peer_id,
const CBlockIndex& block_index)
1853 PeerRef peer = GetPeerRef(peer_id);
1854 if (peer ==
nullptr)
return "Peer does not exist";
1857 if (!CanServeWitnesses(*peer))
return "Pre-SegWit peer";
1862 RemoveBlockRequest(block_index.
GetBlockHash(), std::nullopt);
1865 if (!BlockRequested(peer_id, block_index))
return "Already requested from this peer";
1877 if (!success)
return "Peer not fully connected";
1881 return std::nullopt;
1888 return std::make_unique<PeerManagerImpl>(connman, addrman, banman, chainman, pool, opts);
1894 : m_rng{opts.deterministic_rng},
1896 m_chainparams(chainman.GetParams()),
1900 m_chainman(chainman),
1906 if (opts.reconcile_txs) {
1911 void PeerManagerImpl::StartScheduledTasks(
CScheduler& scheduler)
1921 const std::chrono::milliseconds delta = 10min +
GetRandMillis(5min);
1922 scheduler.
scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta);
1931 void PeerManagerImpl::BlockConnected(
1933 const std::shared_ptr<const CBlock>& pblock,
1938 m_last_tip_update = GetTime<std::chrono::seconds>();
1941 auto stalling_timeout = m_block_stalling_timeout.load();
1945 if (m_block_stalling_timeout.compare_exchange_strong(stalling_timeout, new_timeout)) {
1958 LOCK(m_recent_confirmed_transactions_mutex);
1959 for (
const auto& ptx : pblock->vtx) {
1960 m_recent_confirmed_transactions.insert(ptx->GetHash().ToUint256());
1961 if (ptx->HasWitness()) {
1962 m_recent_confirmed_transactions.insert(ptx->GetWitnessHash().ToUint256());
1968 for (
const auto& ptx : pblock->vtx) {
1969 m_txrequest.ForgetTxHash(ptx->GetHash());
1970 m_txrequest.ForgetTxHash(ptx->GetWitnessHash());
1975 void PeerManagerImpl::BlockDisconnected(
const std::shared_ptr<const CBlock> &block,
const CBlockIndex* pindex)
1985 LOCK(m_recent_confirmed_transactions_mutex);
1986 m_recent_confirmed_transactions.reset();
1993 void PeerManagerImpl::NewPoWValidBlock(
const CBlockIndex *pindex,
const std::shared_ptr<const CBlock>& pblock)
1995 auto pcmpctblock = std::make_shared<const CBlockHeaderAndShortTxIDs>(*pblock);
1999 if (pindex->
nHeight <= m_highest_fast_announce)
2001 m_highest_fast_announce = pindex->
nHeight;
2005 uint256 hashBlock(pblock->GetHash());
2006 const std::shared_future<CSerializedNetMsg> lazy_ser{
2010 auto most_recent_block_txs = std::make_unique<std::map<uint256, CTransactionRef>>();
2011 for (
const auto& tx : pblock->vtx) {
2012 most_recent_block_txs->emplace(tx->GetHash(), tx);
2013 most_recent_block_txs->emplace(tx->GetWitnessHash(), tx);
2016 LOCK(m_most_recent_block_mutex);
2017 m_most_recent_block_hash = hashBlock;
2018 m_most_recent_block = pblock;
2019 m_most_recent_compact_block = pcmpctblock;
2020 m_most_recent_block_txs = std::move(most_recent_block_txs);
2028 ProcessBlockAvailability(pnode->
GetId());
2029 CNodeState &state = *State(pnode->
GetId());
2032 if (state.m_requested_hb_cmpctblocks && !PeerHasHeader(&state, pindex) && PeerHasHeader(&state, pindex->
pprev)) {
2034 LogPrint(
BCLog::NET,
"%s sending header-and-ids %s to peer=%d\n",
"PeerManager::NewPoWValidBlock",
2035 hashBlock.ToString(), pnode->
GetId());
2038 PushMessage(*pnode, ser_cmpctblock.Copy());
2039 state.pindexBestHeaderSent = pindex;
2048 void PeerManagerImpl::UpdatedBlockTip(
const CBlockIndex *pindexNew,
const CBlockIndex *pindexFork,
bool fInitialDownload)
2050 SetBestHeight(pindexNew->
nHeight);
2054 if (fInitialDownload)
return;
2057 std::vector<uint256> vHashes;
2059 while (pindexToAnnounce != pindexFork) {
2061 pindexToAnnounce = pindexToAnnounce->
pprev;
2071 for (
auto& it : m_peer_map) {
2072 Peer& peer = *it.second;
2073 LOCK(peer.m_block_inv_mutex);
2075 peer.m_blocks_for_headers_relay.push_back(hash);
2092 std::map<uint256, std::pair<NodeId, bool>>::iterator it = mapBlockSource.find(hash);
2097 it != mapBlockSource.end() &&
2098 State(it->second.first)) {
2099 MaybePunishNodeForBlock( it->second.first, state, !it->second.second);
2109 mapBlocksInFlight.count(hash) == mapBlocksInFlight.size()) {
2110 if (it != mapBlockSource.end()) {
2111 MaybeSetPeerAsAnnouncingHeaderAndIDs(it->second.first);
2114 if (it != mapBlockSource.end())
2115 mapBlockSource.erase(it);
2124 bool PeerManagerImpl::AlreadyHaveTx(
const GenTxid& gtxid)
2132 m_recent_rejects.reset();
2137 if (m_orphanage.
HaveTx(gtxid))
return true;
2140 LOCK(m_recent_confirmed_transactions_mutex);
2141 if (m_recent_confirmed_transactions.contains(hash))
return true;
2144 return m_recent_rejects.contains(hash) || m_mempool.
exists(gtxid);
2147 bool PeerManagerImpl::AlreadyHaveBlock(
const uint256& block_hash)
2152 void PeerManagerImpl::SendPings()
2155 for(
auto& it : m_peer_map) it.second->m_ping_queued =
true;
2158 void PeerManagerImpl::RelayTransaction(
const uint256& txid,
const uint256& wtxid)
2161 for(
auto& it : m_peer_map) {
2162 Peer& peer = *it.second;
2163 auto tx_relay = peer.GetTxRelay();
2164 if (!tx_relay)
continue;
2166 LOCK(tx_relay->m_tx_inventory_mutex);
2172 if (tx_relay->m_next_inv_send_time == 0s)
continue;
2174 const uint256& hash{peer.m_wtxid_relay ? wtxid : txid};
2175 if (!tx_relay->m_tx_inventory_known_filter.contains(hash)) {
2176 tx_relay->m_tx_inventory_to_send.insert(hash);
2181 void PeerManagerImpl::RelayAddress(
NodeId originator,
2197 const auto current_time{GetTime<std::chrono::seconds>()};
2205 unsigned int nRelayNodes = (fReachable || (hasher.Finalize() & 1)) ? 2 : 1;
2207 std::array<std::pair<uint64_t, Peer*>, 2> best{{{0,
nullptr}, {0,
nullptr}}};
2208 assert(nRelayNodes <= best.size());
2212 for (
auto& [
id, peer] : m_peer_map) {
2213 if (peer->m_addr_relay_enabled &&
id != originator && IsAddrCompatible(*peer, addr)) {
2215 for (
unsigned int i = 0; i < nRelayNodes; i++) {
2216 if (hashKey > best[i].first) {
2217 std::copy(best.begin() + i, best.begin() + nRelayNodes - 1, best.begin() + i + 1);
2218 best[i] = std::make_pair(hashKey, peer.get());
2225 for (
unsigned int i = 0; i < nRelayNodes && best[i].first != 0; i++) {
2226 PushAddress(*best[i].second, addr);
2230 void PeerManagerImpl::ProcessGetBlockData(
CNode& pfrom, Peer& peer,
const CInv& inv)
2232 std::shared_ptr<const CBlock> a_recent_block;
2233 std::shared_ptr<const CBlockHeaderAndShortTxIDs> a_recent_compact_block;
2235 LOCK(m_most_recent_block_mutex);
2236 a_recent_block = m_most_recent_block;
2237 a_recent_compact_block = m_most_recent_compact_block;
2240 bool need_activate_chain =
false;
2252 need_activate_chain =
true;
2256 if (need_activate_chain) {
2258 if (!m_chainman.
ActiveChainstate().ActivateBestChain(state, a_recent_block)) {
2268 if (!BlockRequestAllowed(pindex)) {
2269 LogPrint(
BCLog::NET,
"%s: ignoring request from peer=%i for old block that isn't in the main chain\n", __func__, pfrom.
GetId());
2285 LogPrint(
BCLog::NET,
"Ignore block request below NODE_NETWORK_LIMITED threshold, disconnect peer=%d\n", pfrom.
GetId());
2295 std::shared_ptr<const CBlock> pblock;
2296 if (a_recent_block && a_recent_block->GetHash() == pindex->
GetBlockHash()) {
2297 pblock = a_recent_block;
2301 std::vector<uint8_t> block_data;
2303 assert(!
"cannot load block from disk");
2309 std::shared_ptr<CBlock> pblockRead = std::make_shared<CBlock>();
2311 assert(!
"cannot load block from disk");
2313 pblock = pblockRead;
2321 bool sendMerkleBlock =
false;
2323 if (
auto tx_relay = peer.GetTxRelay(); tx_relay !=
nullptr) {
2324 LOCK(tx_relay->m_bloom_filter_mutex);
2325 if (tx_relay->m_bloom_filter) {
2326 sendMerkleBlock =
true;
2327 merkleBlock =
CMerkleBlock(*pblock, *tx_relay->m_bloom_filter);
2330 if (sendMerkleBlock) {
2338 typedef std::pair<unsigned int, uint256> PairType;
2350 if (a_recent_compact_block && a_recent_compact_block->header.GetHash() == pindex->
GetBlockHash()) {
2363 LOCK(peer.m_block_inv_mutex);
2365 if (inv.
hash == peer.m_continuation_block) {
2369 std::vector<CInv> vInv;
2372 peer.m_continuation_block.SetNull();
2380 auto txinfo = m_mempool.
info_for_relay(gtxid, tx_relay.m_last_inv_sequence);
2382 return std::move(txinfo.tx);
2387 LOCK(m_most_recent_block_mutex);
2388 if (m_most_recent_block_txs !=
nullptr) {
2389 auto it = m_most_recent_block_txs->find(gtxid.
GetHash());
2390 if (it != m_most_recent_block_txs->end())
return it->second;
2397 void PeerManagerImpl::ProcessGetData(
CNode& pfrom, Peer& peer,
const std::atomic<bool>& interruptMsgProc)
2401 auto tx_relay = peer.GetTxRelay();
2403 std::deque<CInv>::iterator it = peer.m_getdata_requests.begin();
2404 std::vector<CInv> vNotFound;
2409 while (it != peer.m_getdata_requests.end() && it->IsGenTxMsg()) {
2410 if (interruptMsgProc)
return;
2415 const CInv &inv = *it++;
2417 if (tx_relay ==
nullptr) {
2427 MakeAndPushMessage(pfrom,
NetMsgType::TX, maybe_with_witness(*tx));
2430 vNotFound.push_back(inv);
2436 if (it != peer.m_getdata_requests.end() && !pfrom.
fPauseSend) {
2437 const CInv &inv = *it++;
2439 ProcessGetBlockData(pfrom, peer, inv);
2445 peer.m_getdata_requests.erase(peer.m_getdata_requests.begin(), it);
2447 if (!vNotFound.empty()) {
2466 uint32_t PeerManagerImpl::GetFetchFlags(
const Peer& peer)
const
2468 uint32_t nFetchFlags = 0;
2469 if (CanServeWitnesses(peer)) {
2478 for (
size_t i = 0; i < req.
indexes.size(); i++) {
2480 Misbehaving(peer, 100,
"getblocktxn with out-of-bounds tx indices");
2489 bool PeerManagerImpl::CheckHeadersPoW(
const std::vector<CBlockHeader>& headers,
const Consensus::Params& consensusParams, Peer& peer)
2493 Misbehaving(peer, 100,
"header with invalid proof of work");
2498 if (!CheckHeadersAreContinuous(headers)) {
2499 Misbehaving(peer, 20,
"non-continuous headers sequence");
2530 void PeerManagerImpl::HandleFewUnconnectingHeaders(
CNode& pfrom, Peer& peer,
2531 const std::vector<CBlockHeader>& headers)
2533 peer.m_num_unconnecting_headers_msgs++;
2536 if (MaybeSendGetHeaders(pfrom,
GetLocator(best_header), peer)) {
2537 LogPrint(
BCLog::NET,
"received header %s: missing prev block %s, sending getheaders (%d) to end (peer=%d, m_num_unconnecting_headers_msgs=%d)\n",
2539 headers[0].hashPrevBlock.ToString(),
2540 best_header->nHeight,
2541 pfrom.
GetId(), peer.m_num_unconnecting_headers_msgs);
2552 Misbehaving(peer, 20,
strprintf(
"%d non-connecting headers", peer.m_num_unconnecting_headers_msgs));
2556 bool PeerManagerImpl::CheckHeadersAreContinuous(
const std::vector<CBlockHeader>& headers)
const
2560 if (!hashLastBlock.
IsNull() && header.hashPrevBlock != hashLastBlock) {
2563 hashLastBlock = header.GetHash();
2568 bool PeerManagerImpl::IsContinuationOfLowWorkHeadersSync(Peer& peer,
CNode& pfrom, std::vector<CBlockHeader>& headers)
2570 if (peer.m_headers_sync) {
2571 auto result = peer.m_headers_sync->ProcessNextHeaders(headers, headers.size() ==
MAX_HEADERS_RESULTS);
2572 if (result.request_more) {
2573 auto locator = peer.m_headers_sync->NextHeadersRequestLocator();
2575 Assume(!locator.vHave.empty());
2576 if (!locator.vHave.empty()) {
2583 bool sent_getheaders = MaybeSendGetHeaders(pfrom, locator, peer);
2584 if (sent_getheaders) {
2586 locator.vHave.front().ToString(), pfrom.
GetId());
2588 LogPrint(
BCLog::NET,
"error sending next getheaders (from %s) to continue sync with peer=%d\n",
2589 locator.vHave.front().ToString(), pfrom.
GetId());
2595 peer.m_headers_sync.reset(
nullptr);
2600 LOCK(m_headers_presync_mutex);
2601 m_headers_presync_stats.erase(pfrom.
GetId());
2604 HeadersPresyncStats stats;
2605 stats.first = peer.m_headers_sync->GetPresyncWork();
2607 stats.second = {peer.m_headers_sync->GetPresyncHeight(),
2608 peer.m_headers_sync->GetPresyncTime()};
2612 LOCK(m_headers_presync_mutex);
2613 m_headers_presync_stats[pfrom.
GetId()] = stats;
2614 auto best_it = m_headers_presync_stats.find(m_headers_presync_bestpeer);
2615 bool best_updated =
false;
2616 if (best_it == m_headers_presync_stats.end()) {
2620 const HeadersPresyncStats* stat_best{
nullptr};
2621 for (
const auto& [peer, stat] : m_headers_presync_stats) {
2622 if (!stat_best || stat > *stat_best) {
2627 m_headers_presync_bestpeer = peer_best;
2628 best_updated = (peer_best == pfrom.
GetId());
2629 }
else if (best_it->first == pfrom.
GetId() || stats > best_it->second) {
2631 m_headers_presync_bestpeer = pfrom.
GetId();
2632 best_updated =
true;
2634 if (best_updated && stats.second.has_value()) {
2636 m_headers_presync_should_signal =
true;
2640 if (result.success) {
2643 headers.swap(result.pow_validated_headers);
2646 return result.success;
2654 bool PeerManagerImpl::TryLowWorkHeadersSync(Peer& peer,
CNode& pfrom,
const CBlockIndex* chain_start_header, std::vector<CBlockHeader>& headers)
2661 arith_uint256 minimum_chain_work = GetAntiDoSWorkThreshold();
2665 if (total_work < minimum_chain_work) {
2679 LOCK(peer.m_headers_sync_mutex);
2681 chain_start_header, minimum_chain_work));
2686 (void)IsContinuationOfLowWorkHeadersSync(peer, pfrom, headers);
2700 bool PeerManagerImpl::IsAncestorOfBestHeaderOrTip(
const CBlockIndex* header)
2702 if (header ==
nullptr) {
2704 }
else if (m_chainman.m_best_header !=
nullptr && header == m_chainman.m_best_header->GetAncestor(header->
nHeight)) {
2712 bool PeerManagerImpl::MaybeSendGetHeaders(
CNode& pfrom,
const CBlockLocator& locator, Peer& peer)
2720 peer.m_last_getheaders_timestamp = current_time;
2731 void PeerManagerImpl::HeadersDirectFetchBlocks(
CNode& pfrom,
const Peer& peer,
const CBlockIndex& last_header)
2734 CNodeState *nodestate = State(pfrom.
GetId());
2737 std::vector<const CBlockIndex*> vToFetch;
2745 vToFetch.push_back(pindexWalk);
2747 pindexWalk = pindexWalk->
pprev;
2758 std::vector<CInv> vGetData;
2765 uint32_t nFetchFlags = GetFetchFlags(peer);
2767 BlockRequested(pfrom.
GetId(), *pindex);
2771 if (vGetData.size() > 1) {
2776 if (vGetData.size() > 0) {
2777 if (!m_opts.ignore_incoming_txs &&
2778 nodestate->m_provides_cmpctblocks &&
2779 vGetData.size() == 1 &&
2780 mapBlocksInFlight.size() == 1 &&
2796 void PeerManagerImpl::UpdatePeerStateForReceivedHeaders(
CNode& pfrom, Peer& peer,
2797 const CBlockIndex& last_header,
bool received_new_header,
bool may_have_more_headers)
2799 if (peer.m_num_unconnecting_headers_msgs > 0) {
2800 LogPrint(
BCLog::NET,
"peer=%d: resetting m_num_unconnecting_headers_msgs (%d -> 0)\n", pfrom.
GetId(), peer.m_num_unconnecting_headers_msgs);
2802 peer.m_num_unconnecting_headers_msgs = 0;
2805 CNodeState *nodestate = State(pfrom.
GetId());
2814 nodestate->m_last_block_announcement =
GetTime();
2822 if (nodestate->pindexBestKnownBlock && nodestate->pindexBestKnownBlock->nChainWork < m_chainman.
MinimumChainWork()) {
2832 LogPrintf(
"Disconnecting outbound peer %d -- headers chain has insufficient work\n", pfrom.
GetId());
2844 if (m_outbound_peers_with_protect_from_disconnect < MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT && nodestate->pindexBestKnownBlock->nChainWork >= m_chainman.
ActiveChain().
Tip()->
nChainWork && !nodestate->m_chain_sync.m_protect) {
2846 nodestate->m_chain_sync.m_protect =
true;
2847 ++m_outbound_peers_with_protect_from_disconnect;
2852 void PeerManagerImpl::ProcessHeadersMessage(
CNode& pfrom, Peer& peer,
2853 std::vector<CBlockHeader>&& headers,
2854 bool via_compact_block)
2856 size_t nCount = headers.size();
2863 LOCK(peer.m_headers_sync_mutex);
2864 if (peer.m_headers_sync) {
2865 peer.m_headers_sync.reset(
nullptr);
2866 LOCK(m_headers_presync_mutex);
2867 m_headers_presync_stats.erase(pfrom.
GetId());
2876 if (!CheckHeadersPoW(headers, m_chainparams.
GetConsensus(), peer)) {
2891 bool already_validated_work =
false;
2894 bool have_headers_sync =
false;
2896 LOCK(peer.m_headers_sync_mutex);
2898 already_validated_work = IsContinuationOfLowWorkHeadersSync(peer, pfrom, headers);
2910 if (headers.empty()) {
2914 have_headers_sync = !!peer.m_headers_sync;
2919 bool headers_connect_blockindex{chain_start_header !=
nullptr};
2921 if (!headers_connect_blockindex) {
2926 HandleFewUnconnectingHeaders(pfrom, peer, headers);
2928 Misbehaving(peer, 10,
"invalid header received");
2941 if (IsAncestorOfBestHeaderOrTip(last_received_header)) {
2942 already_validated_work =
true;
2950 already_validated_work =
true;
2956 if (!already_validated_work && TryLowWorkHeadersSync(peer, pfrom,
2957 chain_start_header, headers)) {
2969 bool received_new_header{last_received_header ==
nullptr};
2975 MaybePunishNodeForBlock(pfrom.
GetId(), state, via_compact_block,
"invalid header received");
2984 if (MaybeSendGetHeaders(pfrom,
GetLocator(pindexLast), peer)) {
2986 pindexLast->
nHeight, pfrom.
GetId(), peer.m_starting_height);
2990 UpdatePeerStateForReceivedHeaders(pfrom, peer, *pindexLast, received_new_header, nCount ==
MAX_HEADERS_RESULTS);
2993 HeadersDirectFetchBlocks(pfrom, peer, *pindexLast);
2998 bool PeerManagerImpl::ProcessOrphanTx(Peer& peer)
3008 const Txid& orphanHash = porphanTx->GetHash();
3009 const Wtxid& orphan_wtxid = porphanTx->GetWitnessHash();
3018 RelayTransaction(orphanHash, porphanTx->GetWitnessHash());
3020 m_orphanage.
EraseTx(orphanHash);
3022 AddToCompactExtraTransactions(removedTx);
3038 MaybePunishNodeForTx(peer.m_id, state);
3057 m_recent_rejects.insert(porphanTx->GetWitnessHash().ToUint256());
3069 m_recent_rejects.insert(porphanTx->GetHash().ToUint256());
3072 m_orphanage.
EraseTx(orphanHash);
3080 bool PeerManagerImpl::PrepareBlockFilterRequest(
CNode&
node, Peer& peer,
3082 const uint256& stop_hash, uint32_t max_height_diff,
3086 const bool supported_filter_type =
3089 if (!supported_filter_type) {
3091 node.GetId(),
static_cast<uint8_t
>(filter_type));
3092 node.fDisconnect =
true;
3101 if (!stop_index || !BlockRequestAllowed(stop_index)) {
3104 node.fDisconnect =
true;
3109 uint32_t stop_height = stop_index->
nHeight;
3110 if (start_height > stop_height) {
3112 "start height %d and stop height %d\n",
3113 node.GetId(), start_height, stop_height);
3114 node.fDisconnect =
true;
3117 if (stop_height - start_height >= max_height_diff) {
3119 node.GetId(), stop_height - start_height + 1, max_height_diff);
3120 node.fDisconnect =
true;
3125 if (!filter_index) {
3135 uint8_t filter_type_ser;
3136 uint32_t start_height;
3139 vRecv >> filter_type_ser >> start_height >> stop_hash;
3145 if (!PrepareBlockFilterRequest(
node, peer, filter_type, start_height, stop_hash,
3150 std::vector<BlockFilter> filters;
3152 LogPrint(
BCLog::NET,
"Failed to find block filter in index: filter_type=%s, start_height=%d, stop_hash=%s\n",
3157 for (
const auto& filter : filters) {
3164 uint8_t filter_type_ser;
3165 uint32_t start_height;
3168 vRecv >> filter_type_ser >> start_height >> stop_hash;
3174 if (!PrepareBlockFilterRequest(
node, peer, filter_type, start_height, stop_hash,
3180 if (start_height > 0) {
3182 stop_index->
GetAncestor(
static_cast<int>(start_height - 1));
3184 LogPrint(
BCLog::NET,
"Failed to find block filter header in index: filter_type=%s, block_hash=%s\n",
3190 std::vector<uint256> filter_hashes;
3192 LogPrint(
BCLog::NET,
"Failed to find block filter hashes in index: filter_type=%s, start_height=%d, stop_hash=%s\n",
3206 uint8_t filter_type_ser;
3209 vRecv >> filter_type_ser >> stop_hash;
3215 if (!PrepareBlockFilterRequest(
node, peer, filter_type, 0, stop_hash,
3216 std::numeric_limits<uint32_t>::max(),
3217 stop_index, filter_index)) {
3225 for (
int i = headers.size() - 1; i >= 0; i--) {
3230 LogPrint(
BCLog::NET,
"Failed to find block filter header in index: filter_type=%s, block_hash=%s\n",
3242 void PeerManagerImpl::ProcessBlock(
CNode&
node,
const std::shared_ptr<const CBlock>& block,
bool force_processing,
bool min_pow_checked)
3244 bool new_block{
false};
3245 m_chainman.
ProcessNewBlock(block, force_processing, min_pow_checked, &new_block);
3247 node.m_last_block_time = GetTime<std::chrono::seconds>();
3252 RemoveBlockRequest(block->GetHash(), std::nullopt);
3255 mapBlockSource.erase(block->GetHash());
3259 void PeerManagerImpl::ProcessCompactBlockTxns(
CNode& pfrom, Peer& peer,
const BlockTransactions& block_transactions)
3261 std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>();
3262 bool fBlockRead{
false};
3266 auto range_flight = mapBlocksInFlight.equal_range(block_transactions.
blockhash);
3267 size_t already_in_flight = std::distance(range_flight.first, range_flight.second);
3268 bool requested_block_from_this_peer{
false};
3271 bool first_in_flight = already_in_flight == 0 || (range_flight.first->second.first == pfrom.
GetId());
3273 while (range_flight.first != range_flight.second) {
3274 auto [node_id, block_it] = range_flight.first->second;
3275 if (node_id == pfrom.
GetId() && block_it->partialBlock) {
3276 requested_block_from_this_peer =
true;
3279 range_flight.first++;
3282 if (!requested_block_from_this_peer) {
3291 Misbehaving(peer, 100,
"invalid compact block/non-matching block transactions");
3294 if (first_in_flight) {
3296 std::vector<CInv> invs;
3301 LogPrint(
BCLog::NET,
"Peer %d sent us a compact block but it failed to reconstruct, waiting on first download to complete\n", pfrom.
GetId());
3330 mapBlockSource.emplace(block_transactions.
blockhash, std::make_pair(pfrom.
GetId(),
false));
3340 ProcessBlock(pfrom, pblock,
true,
true);
3345 void PeerManagerImpl::ProcessMessage(
CNode& pfrom,
const std::string& msg_type,
DataStream& vRecv,
3346 const std::chrono::microseconds time_received,
3347 const std::atomic<bool>& interruptMsgProc)
3353 PeerRef peer = GetPeerRef(pfrom.
GetId());
3354 if (peer ==
nullptr)
return;
3364 uint64_t nNonce = 1;
3367 std::string cleanSubVer;
3368 int starting_height = -1;
3371 vRecv >> nVersion >> Using<CustomUintFormatter<8>>(nServices) >> nTime;
3395 if (!vRecv.
empty()) {
3403 if (!vRecv.
empty()) {
3404 std::string strSubVer;
3408 if (!vRecv.
empty()) {
3409 vRecv >> starting_height;
3429 PushNodeVersion(pfrom, *peer);
3442 if (greatest_common_version >= 70016) {
3451 peer->m_their_services = nServices;
3455 pfrom.cleanSubVer = cleanSubVer;
3457 peer->m_starting_height = starting_height;
3467 (fRelay || (peer->m_our_services &
NODE_BLOOM))) {
3468 auto*
const tx_relay = peer->SetTxRelay();
3470 LOCK(tx_relay->m_bloom_filter_mutex);
3471 tx_relay->m_relay_txs = fRelay;
3483 const auto* tx_relay = peer->GetTxRelay();
3484 if (tx_relay &&
WITH_LOCK(tx_relay->m_bloom_filter_mutex,
return tx_relay->m_relay_txs) &&
3486 const uint64_t recon_salt = m_txreconciliation->PreRegisterPeer(pfrom.
GetId());
3497 CNodeState* state = State(pfrom.
GetId());
3499 m_num_preferred_download_peers += state->fPreferredDownload;
3505 bool send_getaddr{
false};
3507 send_getaddr = SetupAddressRelay(pfrom, *peer);
3517 peer->m_getaddr_sent =
true;
3541 std::string remoteAddr;
3546 LogPrint(
BCLog::NET,
"receive version message: %s: version %d, blocks=%d, us=%s, txrelay=%d, peer=%d%s%s\n",
3549 remoteAddr, (mapped_as ?
strprintf(
", mapped_as=%d", mapped_as) :
""));
3551 int64_t nTimeOffset = nTime -
GetTime();
3560 if (greatest_common_version <= 70012) {
3561 const auto finalAlert{
ParseHex(
"60010000000000000000000000ffffff7f00000000ffffff7ffeffff7f01ffffff7f00000000ffffff7f00ffffff7f002f555247454e543a20416c657274206b657920636f6d70726f6d697365642c2075706772616465207265717569726564004630440220653febd6410f470f6bae11cad19c48413becb1ac2c17f908fd0fd53bdc3abd5202206d0e9c96fe88d4a0f01ed9dedae2b6f9e00da94cad0fecaae66ecf689bf71b50")};
3562 MakeAndPushMessage(pfrom,
"alert",
Span{finalAlert});
3589 LogPrintf(
"New %s %s peer connected: version: %d, blocks=%d, peer=%d%s%s\n",
3592 pfrom.
nVersion.load(), peer->m_starting_height,
3594 (mapped_as ?
strprintf(
", mapped_as=%d", mapped_as) :
""));
3606 if (m_txreconciliation) {
3607 if (!peer->m_wtxid_relay || !m_txreconciliation->IsPeerRegistered(pfrom.
GetId())) {
3611 m_txreconciliation->ForgetPeer(pfrom.
GetId());
3615 if (
auto tx_relay = peer->GetTxRelay()) {
3624 tx_relay->m_tx_inventory_mutex,
3625 return tx_relay->m_tx_inventory_to_send.empty() &&
3626 tx_relay->m_next_inv_send_time == 0s));
3634 peer->m_prefers_headers =
true;
3639 bool sendcmpct_hb{
false};
3640 uint64_t sendcmpct_version{0};
3641 vRecv >> sendcmpct_hb >> sendcmpct_version;
3647 CNodeState* nodestate = State(pfrom.
GetId());
3648 nodestate->m_provides_cmpctblocks =
true;
3649 nodestate->m_requested_hb_cmpctblocks = sendcmpct_hb;
3666 if (!peer->m_wtxid_relay) {
3667 peer->m_wtxid_relay =
true;
3668 m_wtxid_relay_peers++;
3687 peer->m_wants_addrv2 =
true;
3695 if (!m_txreconciliation) {
3707 if (RejectIncomingTxs(pfrom)) {
3716 const auto* tx_relay = peer->GetTxRelay();
3717 if (!tx_relay || !
WITH_LOCK(tx_relay->m_bloom_filter_mutex,
return tx_relay->m_relay_txs)) {
3723 uint32_t peer_txreconcl_version;
3724 uint64_t remote_salt;
3725 vRecv >> peer_txreconcl_version >> remote_salt;
3728 peer_txreconcl_version, remote_salt);
3753 const auto ser_params{
3761 std::vector<CAddress> vAddr;
3763 vRecv >> ser_params(vAddr);
3765 if (!SetupAddressRelay(pfrom, *peer)) {
3772 Misbehaving(*peer, 20,
strprintf(
"%s message size = %u", msg_type, vAddr.size()));
3777 std::vector<CAddress> vAddrOk;
3778 const auto current_a_time{Now<NodeSeconds>()};
3781 const auto current_time{GetTime<std::chrono::microseconds>()};
3784 const auto time_diff = std::max(current_time - peer->m_addr_token_timestamp, 0us);
3788 peer->m_addr_token_timestamp = current_time;
3791 uint64_t num_proc = 0;
3792 uint64_t num_rate_limit = 0;
3793 Shuffle(vAddr.begin(), vAddr.end(), m_rng);
3796 if (interruptMsgProc)
3800 if (peer->m_addr_token_bucket < 1.0) {
3806 peer->m_addr_token_bucket -= 1.0;
3815 addr.
nTime = current_a_time - 5 * 24h;
3817 AddAddressKnown(*peer, addr);
3824 if (addr.
nTime > current_a_time - 10min && !peer->m_getaddr_sent && vAddr.size() <= 10 && addr.
IsRoutable()) {
3826 RelayAddress(pfrom.
GetId(), addr, reachable);
3830 vAddrOk.push_back(addr);
3833 peer->m_addr_processed += num_proc;
3834 peer->m_addr_rate_limited += num_rate_limit;
3835 LogPrint(
BCLog::NET,
"Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n",
3836 vAddr.size(), num_proc, num_rate_limit, pfrom.
GetId());
3838 m_addrman.
Add(vAddrOk, pfrom.
addr, 2h);
3839 if (vAddr.size() < 1000) peer->m_getaddr_sent =
false;
3850 std::vector<CInv> vInv;
3854 Misbehaving(*peer, 20,
strprintf(
"inv message size = %u", vInv.size()));
3858 const bool reject_tx_invs{RejectIncomingTxs(pfrom)};
3862 const auto current_time{GetTime<std::chrono::microseconds>()};
3865 for (
CInv& inv : vInv) {
3866 if (interruptMsgProc)
return;
3871 if (peer->m_wtxid_relay) {
3878 const bool fAlreadyHave = AlreadyHaveBlock(inv.
hash);
3881 UpdateBlockAvailability(pfrom.
GetId(), inv.
hash);
3889 best_block = &inv.
hash;
3892 if (reject_tx_invs) {
3898 const bool fAlreadyHave = AlreadyHaveTx(gtxid);
3901 AddKnownTx(*peer, inv.
hash);
3903 AddTxAnnouncement(pfrom, gtxid, current_time);
3910 if (best_block !=
nullptr) {
3922 if (state.fSyncStarted || (!peer->m_inv_triggered_getheaders_before_sync && *best_block != m_last_block_inv_triggering_headers_sync)) {
3923 if (MaybeSendGetHeaders(pfrom,
GetLocator(m_chainman.m_best_header), *peer)) {
3925 m_chainman.m_best_header->nHeight, best_block->ToString(),
3928 if (!state.fSyncStarted) {
3929 peer->m_inv_triggered_getheaders_before_sync =
true;
3933 m_last_block_inv_triggering_headers_sync = *best_block;
3942 std::vector<CInv> vInv;
3946 Misbehaving(*peer, 20,
strprintf(
"getdata message size = %u", vInv.size()));
3952 if (vInv.size() > 0) {
3957 LOCK(peer->m_getdata_requests_mutex);
3958 peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), vInv.begin(), vInv.end());
3959 ProcessGetData(pfrom, *peer, interruptMsgProc);
3968 vRecv >> locator >> hashStop;
3984 std::shared_ptr<const CBlock> a_recent_block;
3986 LOCK(m_most_recent_block_mutex);
3987 a_recent_block = m_most_recent_block;
3990 if (!m_chainman.
ActiveChainstate().ActivateBestChain(state, a_recent_block)) {
4005 for (; pindex; pindex = m_chainman.
ActiveChain().Next(pindex))
4020 if (--nLimit <= 0) {
4024 WITH_LOCK(peer->m_block_inv_mutex, {peer->m_continuation_block = pindex->GetBlockHash();});
4035 std::shared_ptr<const CBlock> recent_block;
4037 LOCK(m_most_recent_block_mutex);
4038 if (m_most_recent_block_hash == req.
blockhash)
4039 recent_block = m_most_recent_block;
4043 SendBlockTransactions(pfrom, *peer, *recent_block, req);
4061 SendBlockTransactions(pfrom, *peer, block, req);
4075 WITH_LOCK(peer->m_getdata_requests_mutex, peer->m_getdata_requests.push_back(inv));
4083 vRecv >> locator >> hashStop;
4105 if (m_chainman.
ActiveTip() ==
nullptr ||
4107 LogPrint(
BCLog::NET,
"Ignoring getheaders from peer=%d because active chain has too little work; sending empty response\n", pfrom.
GetId());
4114 CNodeState *nodestate = State(pfrom.
GetId());
4124 if (!BlockRequestAllowed(pindex)) {
4125 LogPrint(
BCLog::NET,
"%s: ignoring request from peer=%i for old block header that isn't in the main chain\n", __func__, pfrom.
GetId());
4138 std::vector<CBlock> vHeaders;
4141 for (; pindex; pindex = m_chainman.
ActiveChain().Next(pindex))
4144 if (--nLimit <= 0 || pindex->GetBlockHash() == hashStop)
4159 nodestate->pindexBestHeaderSent = pindex ? pindex : m_chainman.
ActiveChain().
Tip();
4165 if (RejectIncomingTxs(pfrom)) {
4180 const uint256& txid = ptx->GetHash();
4181 const uint256& wtxid = ptx->GetWitnessHash();
4183 const uint256& hash = peer->m_wtxid_relay ? wtxid : txid;
4184 AddKnownTx(*peer, hash);
4188 m_txrequest.ReceivedResponse(pfrom.
GetId(), txid);
4189 if (tx.
HasWitness()) m_txrequest.ReceivedResponse(pfrom.
GetId(), wtxid);
4209 LogPrintf(
"Not relaying non-mempool transaction %s (wtxid=%s) from forcerelay peer=%d\n",
4212 LogPrintf(
"Force relaying tx %s (wtxid=%s) from peer=%d\n",
4241 m_txrequest.ForgetTxHash(tx.
GetHash());
4255 AddToCompactExtraTransactions(removedTx);
4260 bool fRejectedParents =
false;
4264 std::vector<uint256> unique_parents;
4265 unique_parents.reserve(tx.
vin.size());
4270 std::sort(unique_parents.begin(), unique_parents.end());
4271 unique_parents.erase(std::unique(unique_parents.begin(), unique_parents.end()), unique_parents.end());
4272 for (
const uint256& parent_txid : unique_parents) {
4273 if (m_recent_rejects.contains(parent_txid)) {
4274 fRejectedParents =
true;
4278 if (!fRejectedParents) {
4279 const auto current_time{GetTime<std::chrono::microseconds>()};
4281 for (
const uint256& parent_txid : unique_parents) {
4288 AddKnownTx(*peer, parent_txid);
4289 if (!AlreadyHaveTx(gtxid)) AddTxAnnouncement(pfrom, gtxid, current_time);
4293 AddToCompactExtraTransactions(ptx);
4297 m_txrequest.ForgetTxHash(tx.
GetHash());
4314 m_txrequest.ForgetTxHash(tx.
GetHash());
4344 m_txrequest.ForgetTxHash(tx.
GetHash());
4347 AddToCompactExtraTransactions(ptx);
4358 MaybePunishNodeForTx(pfrom.
GetId(), state);
4372 vRecv >> cmpctblock;
4374 bool received_new_header =
false;
4384 MaybeSendGetHeaders(pfrom,
GetLocator(m_chainman.m_best_header), *peer);
4394 received_new_header =
true;
4402 MaybePunishNodeForBlock(pfrom.
GetId(), state,
true,
"invalid header via cmpctblock");
4407 if (received_new_header) {
4409 blockhash.ToString(), pfrom.
GetId());
4412 bool fProcessBLOCKTXN =
false;
4416 bool fRevertToHeaderProcessing =
false;
4420 std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>();
4421 bool fBlockReconstructed =
false;
4429 CNodeState *nodestate = State(pfrom.
GetId());
4434 nodestate->m_last_block_announcement =
GetTime();
4440 auto range_flight = mapBlocksInFlight.equal_range(pindex->
GetBlockHash());
4441 size_t already_in_flight = std::distance(range_flight.first, range_flight.second);
4442 bool requested_block_from_this_peer{
false};
4445 bool first_in_flight = already_in_flight == 0 || (range_flight.first->second.first == pfrom.
GetId());
4447 while (range_flight.first != range_flight.second) {
4448 if (range_flight.first->second.first == pfrom.
GetId()) {
4449 requested_block_from_this_peer =
true;
4452 range_flight.first++;
4457 if (requested_block_from_this_peer) {
4460 std::vector<CInv> vInv(1);
4461 vInv[0] =
CInv(
MSG_BLOCK | GetFetchFlags(*peer), blockhash);
4468 if (!already_in_flight && !CanDirectFetch()) {
4476 requested_block_from_this_peer) {
4477 std::list<QueuedBlock>::iterator* queuedBlockIt =
nullptr;
4478 if (!BlockRequested(pfrom.
GetId(), *pindex, &queuedBlockIt)) {
4479 if (!(*queuedBlockIt)->partialBlock)
4492 Misbehaving(*peer, 100,
"invalid compact block");
4495 if (first_in_flight) {
4497 std::vector<CInv> vInv(1);
4498 vInv[0] =
CInv(
MSG_BLOCK | GetFetchFlags(*peer), blockhash);
4508 for (
size_t i = 0; i < cmpctblock.
BlockTxCount(); i++) {
4513 fProcessBLOCKTXN =
true;
4514 }
else if (first_in_flight) {
4521 IsBlockRequestedFromOutbound(blockhash) ||
4540 ReadStatus status = tempBlock.InitData(cmpctblock, vExtraTxnForCompact);
4545 std::vector<CTransactionRef> dummy;
4546 status = tempBlock.FillBlock(*pblock, dummy);
4548 fBlockReconstructed =
true;
4552 if (requested_block_from_this_peer) {
4555 std::vector<CInv> vInv(1);
4556 vInv[0] =
CInv(
MSG_BLOCK | GetFetchFlags(*peer), blockhash);
4561 fRevertToHeaderProcessing =
true;
4566 if (fProcessBLOCKTXN) {
4569 return ProcessCompactBlockTxns(pfrom, *peer, txn);
4572 if (fRevertToHeaderProcessing) {
4578 return ProcessHeadersMessage(pfrom, *peer, {cmpctblock.
header},
true);
4581 if (fBlockReconstructed) {
4586 mapBlockSource.emplace(pblock->GetHash(), std::make_pair(pfrom.
GetId(),
false));
4597 ProcessBlock(pfrom, pblock,
true,
true);
4604 RemoveBlockRequest(pblock->GetHash(), std::nullopt);
4621 return ProcessCompactBlockTxns(pfrom, *peer, resp);
4634 peer->m_last_getheaders_timestamp = {};
4636 std::vector<CBlockHeader> headers;
4641 Misbehaving(*peer, 20,
strprintf(
"headers message size = %u", nCount));
4644 headers.resize(nCount);
4645 for (
unsigned int n = 0; n < nCount; n++) {
4646 vRecv >> headers[n];
4650 ProcessHeadersMessage(pfrom, *peer, std::move(headers),
false);
4654 if (m_headers_presync_should_signal.exchange(
false)) {
4655 HeadersPresyncStats stats;
4657 LOCK(m_headers_presync_mutex);
4658 auto it = m_headers_presync_stats.find(m_headers_presync_bestpeer);
4659 if (it != m_headers_presync_stats.end()) stats = it->second;
4677 std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>();
4682 bool forceProcessing =
false;
4683 const uint256 hash(pblock->GetHash());
4684 bool min_pow_checked =
false;
4689 forceProcessing = IsBlockRequested(hash);
4690 RemoveBlockRequest(hash, pfrom.
GetId());
4694 mapBlockSource.emplace(hash, std::make_pair(pfrom.
GetId(),
true));
4699 min_pow_checked =
true;
4702 ProcessBlock(pfrom, pblock, forceProcessing, min_pow_checked);
4719 Assume(SetupAddressRelay(pfrom, *peer));
4723 if (peer->m_getaddr_recvd) {
4727 peer->m_getaddr_recvd =
true;
4729 peer->m_addrs_to_send.clear();
4730 std::vector<CAddress> vAddr;
4736 for (
const CAddress &addr : vAddr) {
4737 PushAddress(*peer, addr);
4765 if (
auto tx_relay = peer->GetTxRelay(); tx_relay !=
nullptr) {
4766 LOCK(tx_relay->m_tx_inventory_mutex);
4767 tx_relay->m_send_mempool =
true;
4793 const auto ping_end = time_received;
4796 bool bPingFinished =
false;
4797 std::string sProblem;
4799 if (nAvail >=
sizeof(
nonce)) {
4803 if (peer->m_ping_nonce_sent != 0) {
4804 if (
nonce == peer->m_ping_nonce_sent) {
4806 bPingFinished =
true;
4807 const auto ping_time = ping_end - peer->m_ping_start.load();
4808 if (ping_time.count() >= 0) {
4813 sProblem =
"Timing mishap";
4817 sProblem =
"Nonce mismatch";
4820 bPingFinished =
true;
4821 sProblem =
"Nonce zero";
4825 sProblem =
"Unsolicited pong without ping";
4829 bPingFinished =
true;
4830 sProblem =
"Short payload";
4833 if (!(sProblem.empty())) {
4837 peer->m_ping_nonce_sent,
4841 if (bPingFinished) {
4842 peer->m_ping_nonce_sent = 0;
4849 LogPrint(
BCLog::NET,
"filterload received despite not offering bloom services from peer=%d; disconnecting\n", pfrom.
GetId());
4859 Misbehaving(*peer, 100,
"too-large bloom filter");
4860 }
else if (
auto tx_relay = peer->GetTxRelay(); tx_relay !=
nullptr) {
4862 LOCK(tx_relay->m_bloom_filter_mutex);
4863 tx_relay->m_bloom_filter.reset(
new CBloomFilter(filter));
4864 tx_relay->m_relay_txs =
true;
4874 LogPrint(
BCLog::NET,
"filteradd received despite not offering bloom services from peer=%d; disconnecting\n", pfrom.
GetId());
4878 std::vector<unsigned char> vData;
4886 }
else if (
auto tx_relay = peer->GetTxRelay(); tx_relay !=
nullptr) {
4887 LOCK(tx_relay->m_bloom_filter_mutex);
4888 if (tx_relay->m_bloom_filter) {
4889 tx_relay->m_bloom_filter->insert(vData);
4895 Misbehaving(*peer, 100,
"bad filteradd message");
4902 LogPrint(
BCLog::NET,
"filterclear received despite not offering bloom services from peer=%d; disconnecting\n", pfrom.
GetId());
4906 auto tx_relay = peer->GetTxRelay();
4907 if (!tx_relay)
return;
4910 LOCK(tx_relay->m_bloom_filter_mutex);
4911 tx_relay->m_bloom_filter =
nullptr;
4912 tx_relay->m_relay_txs =
true;
4921 vRecv >> newFeeFilter;
4923 if (
auto tx_relay = peer->GetTxRelay(); tx_relay !=
nullptr) {
4924 tx_relay->m_fee_filter_received = newFeeFilter;
4932 ProcessGetCFilters(pfrom, *peer, vRecv);
4937 ProcessGetCFHeaders(pfrom, *peer, vRecv);
4942 ProcessGetCFCheckPt(pfrom, *peer, vRecv);
4947 std::vector<CInv> vInv;
4951 for (
CInv &inv : vInv) {
4955 m_txrequest.ReceivedResponse(pfrom.
GetId(), inv.
hash);
4967 bool PeerManagerImpl::MaybeDiscourageAndDisconnect(
CNode& pnode, Peer& peer)
4970 LOCK(peer.m_misbehavior_mutex);
4973 if (!peer.m_should_discourage)
return false;
4975 peer.m_should_discourage =
false;
4980 LogPrintf(
"Warning: not punishing noban peer %d!\n", peer.m_id);
4986 LogPrintf(
"Warning: not punishing manually connected peer %d!\n", peer.m_id);
5006 bool PeerManagerImpl::ProcessMessages(
CNode* pfrom, std::atomic<bool>& interruptMsgProc)
5010 PeerRef peer = GetPeerRef(pfrom->
GetId());
5011 if (peer ==
nullptr)
return false;
5014 LOCK(peer->m_getdata_requests_mutex);
5015 if (!peer->m_getdata_requests.empty()) {
5016 ProcessGetData(*pfrom, *peer, interruptMsgProc);
5020 const bool processed_orphan = ProcessOrphanTx(*peer);
5025 if (processed_orphan)
return true;
5030 LOCK(peer->m_getdata_requests_mutex);
5031 if (!peer->m_getdata_requests.empty())
return true;
5044 bool fMoreWork = poll_result->second;
5046 TRACE6(net, inbound_message,
5055 if (m_opts.capture_messages) {
5060 ProcessMessage(*pfrom,
msg.m_type,
msg.m_recv,
msg.m_time, interruptMsgProc);
5061 if (interruptMsgProc)
return false;
5063 LOCK(peer->m_getdata_requests_mutex);
5064 if (!peer->m_getdata_requests.empty()) fMoreWork =
true;
5072 }
catch (
const std::exception& e) {
5081 void PeerManagerImpl::ConsiderEviction(
CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds)
5085 CNodeState &state = *State(pto.
GetId());
5094 if (state.pindexBestKnownBlock !=
nullptr && state.pindexBestKnownBlock->nChainWork >= m_chainman.
ActiveChain().
Tip()->
nChainWork) {
5095 if (state.m_chain_sync.m_timeout != 0s) {
5096 state.m_chain_sync.m_timeout = 0s;
5097 state.m_chain_sync.m_work_header =
nullptr;
5098 state.m_chain_sync.m_sent_getheaders =
false;
5100 }
else if (state.m_chain_sync.m_timeout == 0s || (state.m_chain_sync.m_work_header !=
nullptr && state.pindexBestKnownBlock !=
nullptr && state.pindexBestKnownBlock->nChainWork >= state.m_chain_sync.m_work_header->nChainWork)) {
5106 state.m_chain_sync.m_work_header = m_chainman.
ActiveChain().
Tip();
5107 state.m_chain_sync.m_sent_getheaders =
false;
5108 }
else if (state.m_chain_sync.m_timeout > 0s && time_in_seconds > state.m_chain_sync.m_timeout) {
5112 if (state.m_chain_sync.m_sent_getheaders) {
5114 LogPrintf(
"Disconnecting outbound peer %d for old chain, best known block = %s\n", pto.
GetId(), state.pindexBestKnownBlock !=
nullptr ? state.pindexBestKnownBlock->GetBlockHash().ToString() :
"<none>");
5117 assert(state.m_chain_sync.m_work_header);
5122 MaybeSendGetHeaders(pto,
5123 GetLocator(state.m_chain_sync.m_work_header->pprev),
5125 LogPrint(
BCLog::NET,
"sending getheaders to outbound peer=%d to verify chain work (current best known block:%s, benchmark blockhash: %s)\n", pto.
GetId(), state.pindexBestKnownBlock !=
nullptr ? state.pindexBestKnownBlock->GetBlockHash().ToString() :
"<none>", state.m_chain_sync.m_work_header->GetBlockHash().ToString());
5126 state.m_chain_sync.m_sent_getheaders =
true;
5138 void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now)
5147 std::pair<NodeId, std::chrono::seconds> youngest_peer{-1, 0}, next_youngest_peer{-1, 0};
5151 if (pnode->
GetId() > youngest_peer.first) {
5152 next_youngest_peer = youngest_peer;
5153 youngest_peer.first = pnode->GetId();
5154 youngest_peer.second = pnode->m_last_block_time;
5157 NodeId to_disconnect = youngest_peer.first;
5158 if (youngest_peer.second > next_youngest_peer.second) {
5161 to_disconnect = next_youngest_peer.first;
5170 CNodeState *node_state = State(pnode->
GetId());
5171 if (node_state ==
nullptr ||
5174 LogPrint(
BCLog::NET,
"disconnecting extra block-relay-only peer=%d (last block received at time %d)\n",
5178 LogPrint(
BCLog::NET,
"keeping block-relay-only peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n",
5194 int64_t oldest_block_announcement = std::numeric_limits<int64_t>::max();
5197 AssertLockHeld(::cs_main);
5201 if (!pnode->IsFullOutboundConn() || pnode->fDisconnect) return;
5202 CNodeState *state = State(pnode->GetId());
5203 if (state == nullptr) return;
5205 if (state->m_chain_sync.m_protect) return;
5208 if (!m_connman.MultipleManualOrFullOutboundConns(pnode->addr.GetNetwork())) return;
5209 if (state->m_last_block_announcement < oldest_block_announcement || (state->m_last_block_announcement == oldest_block_announcement && pnode->GetId() > worst_peer)) {
5210 worst_peer = pnode->GetId();
5211 oldest_block_announcement = state->m_last_block_announcement;
5214 if (worst_peer != -1) {
5223 CNodeState &state = *State(pnode->
GetId());
5225 LogPrint(
BCLog::NET,
"disconnecting extra outbound peer=%d (last block announcement received at time %d)\n", pnode->
GetId(), oldest_block_announcement);
5229 LogPrint(
BCLog::NET,
"keeping outbound peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n",
5246 void PeerManagerImpl::CheckForStaleTipAndEvictPeers()
5250 auto now{GetTime<std::chrono::seconds>()};
5252 EvictExtraOutboundPeers(now);
5254 if (now > m_stale_tip_check_time) {
5258 LogPrintf(
"Potential stale tip detected, will try using extra outbound peer (last tip update: %d seconds ago)\n",
5267 if (!m_initial_sync_finished && CanDirectFetch()) {
5269 m_initial_sync_finished =
true;
5273 void PeerManagerImpl::MaybeSendPing(
CNode& node_to, Peer& peer, std::chrono::microseconds now)
5276 peer.m_ping_nonce_sent &&
5286 bool pingSend =
false;
5288 if (peer.m_ping_queued) {
5293 if (peer.m_ping_nonce_sent == 0 && now > peer.m_ping_start.load() +
PING_INTERVAL) {
5301 nonce = GetRand<uint64_t>();
5302 }
while (
nonce == 0);
5303 peer.m_ping_queued =
false;
5304 peer.m_ping_start = now;
5306 peer.m_ping_nonce_sent =
nonce;
5310 peer.m_ping_nonce_sent = 0;
5316 void PeerManagerImpl::MaybeSendAddr(
CNode&
node, Peer& peer, std::chrono::microseconds current_time)
5319 if (!peer.m_addr_relay_enabled)
return;
5321 LOCK(peer.m_addr_send_times_mutex);
5324 peer.m_next_local_addr_send < current_time) {
5331 if (peer.m_next_local_addr_send != 0us) {
5332 peer.m_addr_known->reset();
5335 CAddress local_addr{*local_service, peer.m_our_services, Now<NodeSeconds>()};
5336 PushAddress(peer, local_addr);
5342 if (current_time <= peer.m_next_addr_send)
return;
5355 bool ret = peer.m_addr_known->contains(addr.
GetKey());
5356 if (!
ret) peer.m_addr_known->insert(addr.
GetKey());
5359 peer.m_addrs_to_send.erase(std::remove_if(peer.m_addrs_to_send.begin(), peer.m_addrs_to_send.end(), addr_already_known),
5360 peer.m_addrs_to_send.end());
5363 if (peer.m_addrs_to_send.empty())
return;
5365 if (peer.m_wants_addrv2) {
5370 peer.m_addrs_to_send.clear();
5373 if (peer.m_addrs_to_send.capacity() > 40) {
5374 peer.m_addrs_to_send.shrink_to_fit();
5378 void PeerManagerImpl::MaybeSendSendHeaders(
CNode&
node, Peer& peer)
5386 CNodeState &state = *State(
node.GetId());
5387 if (state.pindexBestKnownBlock !=
nullptr &&
5394 peer.m_sent_sendheaders =
true;
5399 void PeerManagerImpl::MaybeSendFeefilter(
CNode& pto, Peer& peer, std::chrono::microseconds current_time)
5401 if (m_opts.ignore_incoming_txs)
return;
5417 if (peer.m_fee_filter_sent == MAX_FILTER) {
5420 peer.m_next_send_feefilter = 0us;
5423 if (current_time > peer.m_next_send_feefilter) {
5424 CAmount filterToSend = m_fee_filter_rounder.round(currentFilter);
5427 if (filterToSend != peer.m_fee_filter_sent) {
5429 peer.m_fee_filter_sent = filterToSend;
5436 (currentFilter < 3 * peer.m_fee_filter_sent / 4 || currentFilter > 4 * peer.m_fee_filter_sent / 3)) {
5442 class CompareInvMempoolOrder
5447 explicit CompareInvMempoolOrder(
CTxMemPool *_mempool,
bool use_wtxid)
5450 m_wtxid_relay = use_wtxid;
5453 bool operator()(std::set<uint256>::iterator a, std::set<uint256>::iterator b)
5462 bool PeerManagerImpl::RejectIncomingTxs(
const CNode& peer)
const
5472 bool PeerManagerImpl::SetupAddressRelay(
const CNode&
node, Peer& peer)
5477 if (
node.IsBlockOnlyConn())
return false;
5479 if (!peer.m_addr_relay_enabled.exchange(
true)) {
5483 peer.m_addr_known = std::make_unique<CRollingBloomFilter>(5000, 0.001);
5489 bool PeerManagerImpl::SendMessages(
CNode* pto)
5493 PeerRef peer = GetPeerRef(pto->
GetId());
5494 if (!peer)
return false;
5499 if (MaybeDiscourageAndDisconnect(*pto, *peer))
return true;
5505 const auto current_time{GetTime<std::chrono::microseconds>()};
5513 MaybeSendPing(*pto, *peer, current_time);
5518 MaybeSendAddr(*pto, *peer, current_time);
5520 MaybeSendSendHeaders(*pto, *peer);
5525 CNodeState &state = *State(pto->
GetId());
5528 if (m_chainman.m_best_header ==
nullptr) {
5535 bool sync_blocks_and_headers_from_peer =
false;
5536 if (state.fPreferredDownload) {
5537 sync_blocks_and_headers_from_peer =
true;
5548 if (m_num_preferred_download_peers == 0 || mapBlocksInFlight.empty()) {
5549 sync_blocks_and_headers_from_peer =
true;
5555 if ((nSyncStarted == 0 && sync_blocks_and_headers_from_peer) || m_chainman.m_best_header->Time() >
GetAdjustedTime() - 24h) {
5556 const CBlockIndex* pindexStart = m_chainman.m_best_header;
5564 if (pindexStart->
pprev)
5565 pindexStart = pindexStart->
pprev;
5566 if (MaybeSendGetHeaders(*pto,
GetLocator(pindexStart), *peer)) {
5569 state.fSyncStarted =
true;
5593 LOCK(peer->m_block_inv_mutex);
5594 std::vector<CBlock> vHeaders;
5595 bool fRevertToInv = ((!peer->m_prefers_headers &&
5596 (!state.m_requested_hb_cmpctblocks || peer->m_blocks_for_headers_relay.size() > 1)) ||
5599 ProcessBlockAvailability(pto->
GetId());
5601 if (!fRevertToInv) {
5602 bool fFoundStartingHeader =
false;
5606 for (
const uint256& hash : peer->m_blocks_for_headers_relay) {
5611 fRevertToInv =
true;
5614 if (pBestIndex !=
nullptr && pindex->
pprev != pBestIndex) {
5626 fRevertToInv =
true;
5629 pBestIndex = pindex;
5630 if (fFoundStartingHeader) {
5633 }
else if (PeerHasHeader(&state, pindex)) {
5635 }
else if (pindex->
pprev ==
nullptr || PeerHasHeader(&state, pindex->
pprev)) {
5638 fFoundStartingHeader =
true;
5643 fRevertToInv =
true;
5648 if (!fRevertToInv && !vHeaders.empty()) {
5649 if (vHeaders.size() == 1 && state.m_requested_hb_cmpctblocks) {
5653 vHeaders.front().GetHash().ToString(), pto->
GetId());
5655 std::optional<CSerializedNetMsg> cached_cmpctblock_msg;
5657 LOCK(m_most_recent_block_mutex);
5658 if (m_most_recent_block_hash == pBestIndex->
GetBlockHash()) {
5662 if (cached_cmpctblock_msg.has_value()) {
5663 PushMessage(*pto, std::move(cached_cmpctblock_msg.value()));
5671 state.pindexBestHeaderSent = pBestIndex;
5672 }
else if (peer->m_prefers_headers) {
5673 if (vHeaders.size() > 1) {
5676 vHeaders.front().GetHash().ToString(),
5677 vHeaders.back().GetHash().ToString(), pto->
GetId());
5680 vHeaders.front().GetHash().ToString(), pto->
GetId());
5683 state.pindexBestHeaderSent = pBestIndex;
5685 fRevertToInv =
true;
5691 if (!peer->m_blocks_for_headers_relay.empty()) {
5692 const uint256& hashToAnnounce = peer->m_blocks_for_headers_relay.back();
5705 if (!PeerHasHeader(&state, pindex)) {
5706 peer->m_blocks_for_inv_relay.push_back(hashToAnnounce);
5712 peer->m_blocks_for_headers_relay.clear();
5718 std::vector<CInv> vInv;
5720 LOCK(peer->m_block_inv_mutex);
5724 for (
const uint256& hash : peer->m_blocks_for_inv_relay) {
5731 peer->m_blocks_for_inv_relay.clear();
5734 if (
auto tx_relay = peer->GetTxRelay(); tx_relay !=
nullptr) {
5735 LOCK(tx_relay->m_tx_inventory_mutex);
5738 if (tx_relay->m_next_inv_send_time < current_time) {
5739 fSendTrickle =
true;
5749 LOCK(tx_relay->m_bloom_filter_mutex);
5750 if (!tx_relay->m_relay_txs) tx_relay->m_tx_inventory_to_send.clear();
5754 if (fSendTrickle && tx_relay->m_send_mempool) {
5755 auto vtxinfo = m_mempool.
infoAll();
5756 tx_relay->m_send_mempool =
false;
5757 const CFeeRate filterrate{tx_relay->m_fee_filter_received.load()};
5759 LOCK(tx_relay->m_bloom_filter_mutex);
5761 for (
const auto& txinfo : vtxinfo) {
5764 peer->m_wtxid_relay ?
5765 txinfo.tx->GetWitnessHash().ToUint256() :
5766 txinfo.tx->GetHash().ToUint256(),
5768 tx_relay->m_tx_inventory_to_send.erase(inv.
hash);
5771 if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
5774 if (tx_relay->m_bloom_filter) {
5775 if (!tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx))
continue;
5777 tx_relay->m_tx_inventory_known_filter.insert(inv.
hash);
5778 vInv.push_back(inv);
5789 std::vector<std::set<uint256>::iterator> vInvTx;
5790 vInvTx.reserve(tx_relay->m_tx_inventory_to_send.size());
5791 for (std::set<uint256>::iterator it = tx_relay->m_tx_inventory_to_send.begin(); it != tx_relay->m_tx_inventory_to_send.end(); it++) {
5792 vInvTx.push_back(it);
5794 const CFeeRate filterrate{tx_relay->m_fee_filter_received.load()};
5797 CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool, peer->m_wtxid_relay);
5798 std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
5801 unsigned int nRelayedTransactions = 0;
5802 LOCK(tx_relay->m_bloom_filter_mutex);
5805 while (!vInvTx.empty() && nRelayedTransactions < broadcast_max) {
5807 std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
5808 std::set<uint256>::iterator it = vInvTx.back();
5813 tx_relay->m_tx_inventory_to_send.erase(it);
5815 if (tx_relay->m_tx_inventory_known_filter.contains(hash)) {
5824 if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
5827 if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx))
continue;
5829 vInv.push_back(inv);
5830 nRelayedTransactions++;
5835 tx_relay->m_tx_inventory_known_filter.insert(hash);
5840 tx_relay->m_last_inv_sequence = m_mempool.
GetSequence();
5847 auto stalling_timeout = m_block_stalling_timeout.load();
5848 if (state.m_stalling_since.count() && state.m_stalling_since < current_time - stalling_timeout) {
5857 if (stalling_timeout != new_timeout && m_block_stalling_timeout.compare_exchange_strong(stalling_timeout, new_timeout)) {
5867 if (state.vBlocksInFlight.size() > 0) {
5868 QueuedBlock &queuedBlock = state.vBlocksInFlight.front();
5869 int nOtherPeersWithValidatedDownloads = m_peers_downloading_from - 1;
5877 if (state.fSyncStarted && peer->m_headers_sync_timeout < std::chrono::microseconds::max()) {
5880 if (current_time > peer->m_headers_sync_timeout && nSyncStarted == 1 && (m_num_preferred_download_peers - state.fPreferredDownload >= 1)) {
5897 state.fSyncStarted =
false;
5899 peer->m_headers_sync_timeout = 0us;
5905 peer->m_headers_sync_timeout = std::chrono::microseconds::max();
5911 ConsiderEviction(*pto, *peer, GetTime<std::chrono::seconds>());
5916 std::vector<CInv> vGetData;
5918 std::vector<const CBlockIndex*> vToDownload;
5920 auto get_inflight_budget = [&state]() {
5926 FindNextBlocksToDownload(*peer, get_inflight_budget(), vToDownload, staller);
5928 TryDownloadingHistoricalBlocks(
5930 get_inflight_budget(),
5932 Assert(m_chainman.GetSnapshotBaseBlock()));
5935 uint32_t nFetchFlags = GetFetchFlags(*peer);
5937 BlockRequested(pto->
GetId(), *pindex);
5941 if (state.vBlocksInFlight.empty() && staller != -1) {
5942 if (State(staller)->m_stalling_since == 0us) {
5943 State(staller)->m_stalling_since = current_time;
5952 std::vector<std::pair<NodeId, GenTxid>> expired;
5953 auto requestable = m_txrequest.GetRequestable(pto->
GetId(), current_time, &expired);
5954 for (
const auto& entry : expired) {
5955 LogPrint(
BCLog::NET,
"timeout of inflight %s %s from peer=%d\n", entry.second.IsWtxid() ?
"wtx" :
"tx",
5956 entry.second.GetHash().ToString(), entry.first);
5958 for (
const GenTxid& gtxid : requestable) {
5959 if (!AlreadyHaveTx(gtxid)) {