比特币源码分析-网络(三)

By Copernicus

前两篇文章主要从整体逻辑上对代码进行了梳理,这篇文章将主要讲述网络模块主要的函数,以及其具体实现。

监听连接:ThreadSocketHandler

断开没有使用的节点,首先遍历节点数组vNodesCopy,如果节点标识断开连接(fDisconnect),或者没有任何引用、发送接收消息,则移除节点,关闭socket,添加到断开连接节点数组 (vNodesDisconnected)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Disconnect unused nodes
std::vector<CNode *> vNodesCopy = vNodes;
for (CNode *pnode : vNodesCopy) {
if (pnode->fDisconnect) {
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode),
vNodes.end());

// release outbound grant (if any)
pnode->grantOutbound.Release();

// close socket and cleanup
pnode->CloseSocketDisconnect();

// hold in disconnected pool until all refs are released
pnode->Release();
vNodesDisconnected.push_back(pnode);
}
}

遍历断开连接节点数组,当节点没有引用,且能获取节点的相关锁,则移除节点,删除此节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Delete disconnected nodes
std::list<CNode *> vNodesDisconnectedCopy = vNodesDisconnected;
for (CNode *pnode : vNodesDisconnectedCopy) {
// wait until threads are done using it
if (pnode->GetRefCount() <= 0) {
bool fDelete = false;
{
TRY_LOCK(pnode->cs_inventory, lockInv);
if (lockInv) {
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
fDelete = true;
}
}
}
if (fDelete) {
vNodesDisconnected.remove(pnode);
DeleteNode(pnode);
}
}
}

重新设置节点数量。

1
2
3
4
5
6
7
8
9
10
11
size_t vNodesSize;
{
LOCK(cs_vNodes);
vNodesSize = vNodes.size();
}
if (vNodesSize != nPrevNodeCount) {
nPrevNodeCount = vNodesSize;
if (clientInterface) {
clientInterface->NotifyNumConnectionsChanged(nPrevNodeCount);
}
}

遍历监听SOCKET数组(vhListenSocket)、节点数组,为每个节点的socket 设置发送、接收fd_set

1
2
3
4
5
for (const ListenSocket &hListenSocket : vhListenSocket) {
FD_SET(hListenSocket.socket, &fdsetRecv);
hSocketMax = std::max(hSocketMax, hListenSocket.socket);
have_fds = true;
}

调用select函数监听socket。

1
2
int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv,
&fdsetSend, &fdsetError, &timeout);

遍历监听socket数组,当接收到数据,且socket有效时,接受连接,新建节点,添加到节点数组

1
2
3
4
5
6
for (const ListenSocket &hListenSocket : vhListenSocket) {
if (hListenSocket.socket != INVALID_SOCKET &&
FD_ISSET(hListenSocket.socket, &fdsetRecv)) {
AcceptConnection(hListenSocket);
}
}

遍历节点数组,增加节点的引用次数。

1
2
3
4
5
6
7
8
std::vector<CNode *> vNodesCopy;
{
LOCK(cs_vNodes);
vNodesCopy = vNodes;
for (CNode *pnode : vNodesCopy) {
pnode->AddRef();
}
}

遍历节点数组,接收网络数据,解析成消息,添加到节点的接收消息数组 (vRecvMsg)。当发送集合有数据时,把节点的发送消息(vSendMsg)发送出去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
for (CNode *pnode : vNodesCopy) {                                          
if (interruptNet) {
return;
}
// Receive
bool recvSet = false;
bool sendSet = false;
bool errorSet = false;
{
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET) {
continue;
}
recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
}
if (recvSet || errorSet) {
....
}
// Send
if (sendSet) {
LOCK(pnode->cs_vSend);
size_t nBytes = SocketSendData(pnode);
if (nBytes) {
RecordBytesSent(nBytes);
}
}

满足以下几点是不活跃的socke连接:

  • 没有任何发送、接收消息(nLastSend、nLastRecv)
  • 距离上次发送消息超过了90分钟(nLastSend),且距离上次把全部消息发送
    出去也超过了90分钟(nLastSendEmpty)
  • 距离上次接收消息超过了90分钟(nLastRecv)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int64_t nTime = GetSystemTimeInSeconds();                              
if (nTime - pnode->nTimeConnected > 60) {
if (pnode->nLastRecv == 0 || pnode->nLastSend == 0) {
LogPrint("net", "socket no message in first 60 seconds, %d "
"%d from %d\n",
pnode->nLastRecv != 0, pnode->nLastSend != 0,
pnode->id);
pnode->fDisconnect = true;
} else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL) {
LogPrintf("socket sending timeout: %is\n",
nTime - pnode->nLastSend);
pnode->fDisconnect = true;
} else if (nTime - pnode->nLastRecv >
(pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL
: 90 * 60)) {
LogPrintf("socket receive timeout: %is\n",
nTime - pnode->nLastRecv);
pnode->fDisconnect = true;
} else if (pnode->nPingNonceSent &&
pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 <
GetTimeMicros()) {
LogPrintf("ping timeout: %fs\n",
0.000001 *
(GetTimeMicros() - pnode->nPingUsecStart));
pnode->fDisconnect = true;
} else if (!pnode->fSuccessfullyConnected) {
LogPrintf("version handshake timeout from %d\n", pnode->id);
pnode->fDisconnect = true;
}
}

发送接收消息:ThreadMessageHandler

此线程为无限循环,每隔0.1秒循环一次:

1
2
while (!flagInterruptMsgProc) {
}

遍历节点数组,增加节点引用次数,检查节点是否需要同步:

1
2
3
4
5
6
7
8
std::vector<CNode *> vNodesCopy;      
{
LOCK(cs_vNodes);
vNodesCopy = vNodes;
for (CNode *pnode : vNodesCopy) {
pnode->AddRef();
}
}

遍历节点数组, 处理接收的消息,发送消息。遍历节点数组,减少节点引用次数:

1
2
for (CNode *pnode : vNodesCopy) { 
}

接收消息

节点接收消息后,循环遍历节点的消息缓冲区,解析消息头、数据,校验消息的有效性,再处理消息数据,处理过程中如果发生异常,则节点发送拒绝消息,此节点停止发送、接收消息。
消息的有效性检查主要检查以下4项:

  1. 接收到的消息是否完整 ,类CNetMessage的complete()函数校验完整性,in_data为TRUE,且消
    息大小与消息数据偏移相等,则消息是完整的。
  2. 消息头开始字符串是否与环境参数中的字符串相同。
  3. 消息头是否有效。
  4. 消息头校验和是否正确。

重新计算数据的校验和,检验与消息头中的校验和是否相等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (pnode->fDisconnect) {                                
continue;
}

// Receive messages
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(
*config, pnode, *this, flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc) {
return;
}
```

## 发送消息

```c++
{
LOCK(pnode->cs_sendProcessing);
GetNodeSignals().SendMessages(*config, pnode, *this,
flagInterruptMsgProc);
}
if (flagInterruptMsgProc) {
return;
}

调用这个函数:

1
2
3
4
5
6
bool SendMessages(const Config &config, CNode *pto, CConnman &connman,
const std::atomic<bool> &interruptMsgProc) {

...
...
}

发送消息时,检验5个命令是否需要发送,分别是:ping、addr、getblocks、 inv、getdata。

当用户在RPC中请求ping命令(节点的fPingQueued为TRUE),或者距离上一次发送命令超过了30分钟,且发送消息队列为空,则发送ping命令。当节点的版本大于BIP0031_VERSION时,设置发送ping命令的开始时间,发送ping命令时带随机数,随机数保存在节点的nPingNonceSent中。低于此版本时,不设置开始时间,不带随机数。

随机选择一个节点,发送addr消息发送节点的发送地址数组中的地址,同时把地址插入到已知地址集中,每次最多发送1000个地址。发送完毕后,清空节点的发送地址数组。

当节点需要同步时,且不处于导入、重建索引,则发送getblocks消息获取区块。获取区块时,指定开始区块、结束区块。同时保存开始的区块索引到节点的 pindexLastGetBlocksBegin中,保存结束区块索引值到节点hashLastGetBlocksEnd中。当获取区块时,检查这2项,如果已经获取了最新的区块,则不再获取区块,避免重复发送getblocks消息。

遍历节点的发送Inventory数组,发送inv消息。发送的Inventory必须时尚未处理的,即不在节点已知Inventory集中(setInventoryKnown),发送后添加到已知 Inventory集中。发送inv消息时最多发送1000个inventory。如果是随机节点,且 inventory的类型时MSG_TX,则只发送1/4的inventory,先计算随机值,随机值的低 2bit位为0的inventory发送出去,不为0的inventory保留在节点的发送inventory数组 (vInventoryToSend)中。

如果节点存在当前时间早的延迟的inventory,则发送getdata消息。延迟的 inventory保存在节点的mapAskFor数组。发送的inventory必须是节点中没有的。每次发送getdata消息时最多发送1000个inventory。同时删除节点的mapAskFor中的一项。

主动与外部节点建立连接:ThreadOpenConnections

1
2
3
void CConnman::ThreadOpenConnections() {
...
}

此线程也是无限循环,每隔500毫秒连接一次。

1
2
3
4
5
while (!interruptNet) {
if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) {
return;
}
}

连接地址时注意以下几点:

  • 不连接无效的、地址数组中的外部连接地址中已有的、本地的地址。
  • 最多尝试连接100个地址。
  • 不连接受限制的地址。
  • 尝试连接次数达到30次时,才尝试连接距离上一次尝试连接10分钟以内的地址。
  • 尝试连接次数达到50次时,才尝试连接地址端不是参数默认端的地址。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
while (!interruptNet) {                                                     
CAddrInfo addr = addrman.Select(fFeeler);
if (!addr.IsValid() || setConnected.count(addr.GetGroup()) ||
IsLocal(addr)) {
break;
}
nTries++;
if (nTries > 100) {
break;
}
if (IsLimited(addr)) {
continue;
}
if ((addr.nServices & REQUIRED_SERVICES) != REQUIRED_SERVICES) {
continue;
}
// only consider very recently tried nodes after 30 failed attempts
if (nANow - addr.nLastTry < 600 && nTries < 30) {
continue;
}
if ((addr.nServices & nRelevantServices) != nRelevantServices &&
(nTries < 40 || nOutbound >= (nMaxOutbound >> 1))) {
continue;
}
if (addr.GetPort() != Params().GetDefaultPort() && nTries < 50) {
continue;
}
addrConnect = addr;
break;
}

if (addrConnect.IsValid()) {

if (fFeeler) {
// Add small amount of random noise before connection to avoid
// synchronization.
int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000);
if (!interruptNet.sleep_for(
std::chrono::milliseconds(randsleep))) {
return;
}
LogPrint("net", "Making feeler connection to %s\n",
addrConnect.ToString());
}

OpenNetworkConnection(addrConnect,
(int)setConnected.size() >=
std::min(nMaxConnections - 1, 2),
&grant, nullptr, false, fFeeler);
}

获取主机ip:GetLocal

节点启用时,获取主机名称、IP地址,添加到地址、服务对应的数组中(mapLocalHost)。获取本地地址,保存到节点的本地地址(addrLocal),且向节点网络广播地址(AdvertizeLocal)。这样获取的地址大于监听(LOCAL_IF)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bool GetLocal(CService &addr, const CNetAddr *paddrPeer) {                      
if (!fListen) return false;

int nBestScore = -1;
int nBestReachability = -1;
{
LOCK(cs_mapLocalHost);
for (std::map<CNetAddr, LocalServiceInfo>::iterator it =
mapLocalHost.begin();
it != mapLocalHost.end(); it++) {
int nScore = (*it).second.nScore;
int nReachability = (*it).first.GetReachabilityFrom(paddrPeer);
if (nReachability > nBestReachability ||
(nReachability == nBestReachability && nScore > nBestScore)) {
addr = CService((*it).first, (*it).second.nPort);
nBestReachability = nReachability;
nBestScore = nScore;
}
}
}
return nBestScore >= 0;
}

系统定义了几种本地地址类型,优先采用值大的类型(GetLocal函数)

enum {
    // unknown
    LOCAL_NONE,
    // address a local interface listens on
    LOCAL_IF,
    // address explicit bound to
    LOCAL_BIND,
    // address reported by UPnP
    LOCAL_UPNP,
    // address explicitly specified (-externalip=)
    LOCAL_MANUAL,

    LOCAL_MAX
};

本文由 copernicus 团队 冉小龙 编写,转载无需授权!