前两篇文章主要从整体逻辑上对代码进行了梳理,这篇文章将主要讲述网络模块主要的函数,以及其具体实现。
监听连接:ThreadSocketHandler
断开没有使用的节点,首先遍历节点数组vNodesCopy,如果节点标识断开连接(fDisconnect),或者没有任何引用、发送接收消息,则移除节点,关闭socket,添加到断开连接节点数组 (vNodesDisconnected)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| std::vector<CNode *> vNodesCopy = vNodes; for (CNode *pnode : vNodesCopy) { if (pnode->fDisconnect) { vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
pnode->grantOutbound.Release();
pnode->CloseSocketDisconnect();
pnode->Release(); vNodesDisconnected.push_back(pnode); } }
|
遍历断开连接节点数组,当节点没有引用,且能获取节点的相关锁,则移除节点,删除此节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| std::list<CNode *> vNodesDisconnectedCopy = vNodesDisconnected; for (CNode *pnode : vNodesDisconnectedCopy) { if (pnode->GetRefCount() <= 0) { bool fDelete = false; { TRY_LOCK(pnode->cs_inventory, lockInv); if (lockInv) { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { fDelete = true; } } } if (fDelete) { vNodesDisconnected.remove(pnode); DeleteNode(pnode); } } }
|
重新设置节点数量。
1 2 3 4 5 6 7 8 9 10 11
| size_t vNodesSize; { LOCK(cs_vNodes); vNodesSize = vNodes.size(); } if (vNodesSize != nPrevNodeCount) { nPrevNodeCount = vNodesSize; if (clientInterface) { clientInterface->NotifyNumConnectionsChanged(nPrevNodeCount); } }
|
遍历监听SOCKET数组(vhListenSocket)、节点数组,为每个节点的socket 设置发送、接收fd_set
1 2 3 4 5
| for (const ListenSocket &hListenSocket : vhListenSocket) { FD_SET(hListenSocket.socket, &fdsetRecv); hSocketMax = std::max(hSocketMax, hListenSocket.socket); have_fds = true; }
|
调用select函数监听socket。
1 2
| int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
|
遍历监听socket数组,当接收到数据,且socket有效时,接受连接,新建节点,添加到节点数组
1 2 3 4 5 6
| for (const ListenSocket &hListenSocket : vhListenSocket) { if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv)) { AcceptConnection(hListenSocket); } }
|
遍历节点数组,增加节点的引用次数。
1 2 3 4 5 6 7 8
| std::vector<CNode *> vNodesCopy; { LOCK(cs_vNodes); vNodesCopy = vNodes; for (CNode *pnode : vNodesCopy) { pnode->AddRef(); } }
|
遍历节点数组,接收网络数据,解析成消息,添加到节点的接收消息数组 (vRecvMsg)。当发送集合有数据时,把节点的发送消息(vSendMsg)发送出去:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| for (CNode *pnode : vNodesCopy) { if (interruptNet) { return; } bool recvSet = false; bool sendSet = false; bool errorSet = false; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) { continue; } recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv); sendSet = FD_ISSET(pnode->hSocket, &fdsetSend); errorSet = FD_ISSET(pnode->hSocket, &fdsetError); } if (recvSet || errorSet) { .... } if (sendSet) { LOCK(pnode->cs_vSend); size_t nBytes = SocketSendData(pnode); if (nBytes) { RecordBytesSent(nBytes); } }
|
满足以下几点是不活跃的socke连接:
- 没有任何发送、接收消息(nLastSend、nLastRecv)
- 距离上次发送消息超过了90分钟(nLastSend),且距离上次把全部消息发送
出去也超过了90分钟(nLastSendEmpty)
- 距离上次接收消息超过了90分钟(nLastRecv)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| int64_t nTime = GetSystemTimeInSeconds(); if (nTime - pnode->nTimeConnected > 60) { if (pnode->nLastRecv == 0 || pnode->nLastSend == 0) { LogPrint("net", "socket no message in first 60 seconds, %d " "%d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->id); pnode->fDisconnect = true; } else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL) { LogPrintf("socket sending timeout: %is\n", nTime - pnode->nLastSend); pnode->fDisconnect = true; } else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90 * 60)) { LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv); pnode->fDisconnect = true; } else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros()) { LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart)); pnode->fDisconnect = true; } else if (!pnode->fSuccessfullyConnected) { LogPrintf("version handshake timeout from %d\n", pnode->id); pnode->fDisconnect = true; } }
|
发送接收消息:ThreadMessageHandler
此线程为无限循环,每隔0.1秒循环一次:
1 2
| while (!flagInterruptMsgProc) { }
|
遍历节点数组,增加节点引用次数,检查节点是否需要同步:
1 2 3 4 5 6 7 8
| std::vector<CNode *> vNodesCopy; { LOCK(cs_vNodes); vNodesCopy = vNodes; for (CNode *pnode : vNodesCopy) { pnode->AddRef(); } }
|
遍历节点数组, 处理接收的消息,发送消息。遍历节点数组,减少节点引用次数:
1 2
| for (CNode *pnode : vNodesCopy) { }
|
接收消息
节点接收消息后,循环遍历节点的消息缓冲区,解析消息头、数据,校验消息的有效性,再处理消息数据,处理过程中如果发生异常,则节点发送拒绝消息,此节点停止发送、接收消息。
消息的有效性检查主要检查以下4项:
- 接收到的消息是否完整 ,类CNetMessage的complete()函数校验完整性,in_data为TRUE,且消
息大小与消息数据偏移相等,则消息是完整的。
- 消息头开始字符串是否与环境参数中的字符串相同。
- 消息头是否有效。
- 消息头校验和是否正确。
重新计算数据的校验和,检验与消息头中的校验和是否相等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| if (pnode->fDisconnect) { continue; }
bool fMoreNodeWork = GetNodeSignals().ProcessMessages( *config, pnode, *this, flagInterruptMsgProc); fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); if (flagInterruptMsgProc) { return; } ```
## 发送消息
```c++ { LOCK(pnode->cs_sendProcessing); GetNodeSignals().SendMessages(*config, pnode, *this, flagInterruptMsgProc); } if (flagInterruptMsgProc) { return; }
|
调用这个函数:
1 2 3 4 5 6
| bool SendMessages(const Config &config, CNode *pto, CConnman &connman, const std::atomic<bool> &interruptMsgProc) { ... ... }
|
发送消息时,检验5个命令是否需要发送,分别是:ping、addr、getblocks、 inv、getdata。
当用户在RPC中请求ping命令(节点的fPingQueued为TRUE),或者距离上一次发送命令超过了30分钟,且发送消息队列为空,则发送ping命令。当节点的版本大于BIP0031_VERSION时,设置发送ping命令的开始时间,发送ping命令时带随机数,随机数保存在节点的nPingNonceSent中。低于此版本时,不设置开始时间,不带随机数。
随机选择一个节点,发送addr消息发送节点的发送地址数组中的地址,同时把地址插入到已知地址集中,每次最多发送1000个地址。发送完毕后,清空节点的发送地址数组。
当节点需要同步时,且不处于导入、重建索引,则发送getblocks消息获取区块。获取区块时,指定开始区块、结束区块。同时保存开始的区块索引到节点的 pindexLastGetBlocksBegin中,保存结束区块索引值到节点hashLastGetBlocksEnd中。当获取区块时,检查这2项,如果已经获取了最新的区块,则不再获取区块,避免重复发送getblocks消息。
遍历节点的发送Inventory数组,发送inv消息。发送的Inventory必须时尚未处理的,即不在节点已知Inventory集中(setInventoryKnown),发送后添加到已知 Inventory集中。发送inv消息时最多发送1000个inventory。如果是随机节点,且 inventory的类型时MSG_TX,则只发送1/4的inventory,先计算随机值,随机值的低 2bit位为0的inventory发送出去,不为0的inventory保留在节点的发送inventory数组 (vInventoryToSend)中。
如果节点存在当前时间早的延迟的inventory,则发送getdata消息。延迟的 inventory保存在节点的mapAskFor数组。发送的inventory必须是节点中没有的。每次发送getdata消息时最多发送1000个inventory。同时删除节点的mapAskFor中的一项。
主动与外部节点建立连接:ThreadOpenConnections
1 2 3
| void CConnman::ThreadOpenConnections() { ... }
|
此线程也是无限循环,每隔500毫秒连接一次。
1 2 3 4 5
| while (!interruptNet) { if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) { return; } }
|
连接地址时注意以下几点:
- 不连接无效的、地址数组中的外部连接地址中已有的、本地的地址。
- 最多尝试连接100个地址。
- 不连接受限制的地址。
- 尝试连接次数达到30次时,才尝试连接距离上一次尝试连接10分钟以内的地址。
- 尝试连接次数达到50次时,才尝试连接地址端不是参数默认端的地址。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| while (!interruptNet) { CAddrInfo addr = addrman.Select(fFeeler); if (!addr.IsValid() || setConnected.count(addr.GetGroup()) || IsLocal(addr)) { break; } nTries++; if (nTries > 100) { break; } if (IsLimited(addr)) { continue; } if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES) { continue; } if (nANow - addr.nLastTry < 600 && nTries < 30) { continue; } if ((addr.nServices & nRelevantServices) != nRelevantServices && (nTries < 40 || nOutbound >= (nMaxOutbound >> 1))) { continue; } if (addr.GetPort() != Params().GetDefaultPort() && nTries < 50) { continue; } addrConnect = addr; break; } if (addrConnect.IsValid()) { if (fFeeler) { int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000); if (!interruptNet.sleep_for( std::chrono::milliseconds(randsleep))) { return; } LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString()); } OpenNetworkConnection(addrConnect, (int)setConnected.size() >= std::min(nMaxConnections - 1, 2), &grant, nullptr, false, fFeeler); }
|
获取主机ip:GetLocal
节点启用时,获取主机名称、IP地址,添加到地址、服务对应的数组中(mapLocalHost)。获取本地地址,保存到节点的本地地址(addrLocal),且向节点网络广播地址(AdvertizeLocal)。这样获取的地址大于监听(LOCAL_IF)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| bool GetLocal(CService &addr, const CNetAddr *paddrPeer) { if (!fListen) return false; int nBestScore = -1; int nBestReachability = -1; { LOCK(cs_mapLocalHost); for (std::map<CNetAddr, LocalServiceInfo>::iterator it = mapLocalHost.begin(); it != mapLocalHost.end(); it++) { int nScore = (*it).second.nScore; int nReachability = (*it).first.GetReachabilityFrom(paddrPeer); if (nReachability > nBestReachability || (nReachability == nBestReachability && nScore > nBestScore)) { addr = CService((*it).first, (*it).second.nPort); nBestReachability = nReachability; nBestScore = nScore; } } } return nBestScore >= 0; }
|
系统定义了几种本地地址类型,优先采用值大的类型(GetLocal函数)
enum {
LOCAL_NONE,
LOCAL_IF,
LOCAL_BIND,
LOCAL_UPNP,
LOCAL_MANUAL,
LOCAL_MAX
};
本文由 copernicus 团队 冉小龙 编写,转载无需授权!