比特币源码分析:网络(二)

By Copernicus

众所周知,比特币网络是采用的P2P网络体系,所以,没有明显的客户端与服务端的区别或者是概念,每一个节点既是自身的客户端,又是其它节点的服务端。

sync.h中,定义了 CSemaphore,它包装了系统底层的信号量机制,对wait(), try_wait(),post()实现了封装,代码如下:

1
2
3
4
5
6
7
8
9
10
11
class CSemaphore {
private:
boost::condition_variable condition;
boost::mutex mutex;
int value;
public:
CSemaphore(int init) : value(init) {}
void wait() {}
bool try_wait() {}
void post() {}
};

用于控制网络连接时的最大数量,每一个网络节点的最大连接数受限于信号量所允许的最大值。

下面我们按照一个网络连接从发送到接收到请求返回的这么个思路,来梳理代码逻辑。

004.png

CNode

CNode定义在bitcoin.cpp中,是比较重要的也是较为复杂的一个类,节点的所有信息都包含在内:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class CNode {
SOCKET sock; //用来连接的socket句柄
CDataStream vSend; //发送消息
CDataStream vRecv; //接收消息
uint32_t nHeaderStart; //头信息开始
uint32_t nMessageStart;
int nVersion; //版本信息
std::string strSubVer;
int nStartingHeight; //起始高度
std::vector<CAddress> *vAddr; //ip地址(网络上节点的连接信息)
int ban;
int64_t doneAfter;
CAddress you;
};

在上述定义中,最主要的是 std::vector<CAddress> *vAddr; 它包含了连接的所有节点,如果有节点连接进来,就加入到这个vector中;如果某个节点断开连接,就从这个vector中删除。

net.h中,对CNode进行了详细的定义(所有关于节点的信息,都进行了详细罗列),由于篇幅较长,只罗列其中的一些关键结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/** Information about a peer */
class CNode {
friend class CConnman;

public:
SOCKET hSocket; //连接的socket句柄
size_t nSendSize; //所有vSendMsg条目的总大小。
size_t nSendOffset; //已经发送的第一个vSendMsg内的偏移量。
std::deque<std::vector<uint8_t>> vSendMsg;//发送消息的数组
...
const CAddress addr;//节点地址信息
std::atomic<int> nVersion;//版本信息
CBloomFilter *pfilter;//海量过滤器
const NodeId id;//节点ID

protected:
mapMsgCmdSize mapSendBytesPerMsgCmd;
mapMsgCmdSize mapRecvBytesPerMsgCmd;

public:
uint256 hashContinue;
std::atomic<int> nStartingHeight;
private:
std::list<CNetMessage> vRecvMsg;//接收消息的数组
public:
//用来解析接收到的消息数据
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool &complete);

//用来设置接收版本
void SetRecvVersion(int nVersionIn) { nRecvVersion = nVersionIn; }
int GetRecvVersion() { return nRecvVersion; }
void SetSendVersion(int nVersionIn);
int GetSendVersion() const;

//用来发送地址
void PushAddress(const CAddress &_addr, FastRandomContext &insecure_rand) {
// Known checking here is only to save space from duplicates.
// SendMessages will filter it again for knowns that were added
// after addresses were pushed.
if (_addr.IsValid() && !addrKnown.contains(_addr.GetKey())) {
if (vAddrToSend.size() >= MAX_ADDR_TO_SEND) {
vAddrToSend[insecure_rand.randrange(vAddrToSend.size())] =
_addr;
} else {
vAddrToSend.push_back(_addr);
}
}
}

//用来发送inventory消息
void PushInventory(const CInv &inv) {
LOCK(cs_inventory);
if (inv.type == MSG_TX) {
if (!filterInventoryKnown.contains(inv.hash)) {
setInventoryTxToSend.insert(inv.hash);
}
} else if (inv.type == MSG_BLOCK) {
vInventoryBlockToSend.push_back(inv.hash);
}
}

void PushBlockHash(const uint256 &hash) {
LOCK(cs_inventory);
vBlockHashesToAnnounce.push_back(hash);
}
};

发送消息

CDataStream这个类主要是包装了一个带有双向缓冲区的接口, 它重载了 >> 和 <<,使用上述序列化读取和写入未格式化的数据模板,以线性时间填充数据;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class CDataStream {
protected:
typedef CSerializeData vector_type;
vector_type vch;
unsigned int nReadPos;

int nType;
int nVersion;

public:
template <typename T> CDataStream &operator<<(const T &obj) {
// Serialize to this stream
::Serialize(*this, obj);
return (*this);
}

template <typename T> CDataStream &operator>>(T &obj) {
// Unserialize from this stream
::Unserialize(*this, obj);
return (*this);
}

void read(char *pch, size_t nSize) {
if (nSize == 0) {
return;
}

// Read from the beginning of the buffer
unsigned int nReadPosNext = nReadPos + nSize;
if (nReadPosNext >= vch.size()) {
if (nReadPosNext > vch.size()) {
throw std::ios_base::failure(
"CDataStream::read(): end of data");
}
memcpy(pch, &vch[nReadPos], nSize);
nReadPos = 0;
vch.clear();
return;
}
memcpy(pch, &vch[nReadPos], nSize);
nReadPos = nReadPosNext;
}

void write(const char *pch, size_t nSize) {
// Write to the end of the buffer
vch.insert(vch.end(), pch, pch + nSize);
}

所以,当我们需要发送消息时,首先会把数据放到CDataStream的数据流中,构造好完整的消息,但此时的消息格式是网络无法识别的,下一步,将构造好的消息放入到CSerializeData(类似一个消息队列)进行序列化,序列化之后,我们就可以把消息放到SocketSendData中发送出去。

CSerializeData 的格式如下:

1
2
// Byte-vector that clears its contents before deletion.
typedef std::vector<char, zero_after_free_allocator<char>> CSerializeData;

SocketSendData 的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
size_t CConnman::SocketSendData(CNode *pnode) const {
AssertLockHeld(pnode->cs_vSend);
size_t nSentSize = 0;
size_t nMsgCount = 0;

for (const auto &data : pnode->vSendMsg) {
assert(data.size() > pnode->nSendOffset);
int nBytes = 0;
...
}
pnode->vSendMsg.erase(pnode->vSendMsg.begin(),
pnode->vSendMsg.begin() + nMsgCount);
if (pnode->vSendMsg.empty()) {
assert(pnode->nSendOffset == 0);
assert(pnode->nSendSize == 0);
}
return nSentSize;
}

接收消息

接收消息的工作,主要是由 ThreadSocketHandler 来完成的,

1
2
3
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) {
pnode->CloseSocketDisconnect();
}

随后,通过 ReceiveMsgBytes 把从其它节点接收到的数据解析为单个数据,然后放回到消息队列,最后由ThreadMessageHandler来进行最后的处理。

ReceiveMsgBytes解析数据的主要流程如下,调用的是CNetMessage下的readHeader和readData方法,随后,使用complete()进行一次判定,看解析是否完成:

1
2
3
4
5
6
7
// Absorb network data.
int handled;
if (!msg.in_data) {
handled = msg.readHeader(pch, nBytes);
} else {
handled = msg.readData(pch, nBytes);
}

readHeader

readHeader 主要用来解析消息头,由上一篇文章我们能够知道,一个消息头,至少24字节,如果小于24字节直接退出,如果满足这个条件,先把接收到的数据的开始部分复制到消息头数据流中(hdrbuf),再反格式化成消息头(hdr)。消息数据最大为MAX_SIZE(0x02000000),如果大于这个值,证明出错,直接退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int CNetMessage::readHeader(const char *pch, unsigned int nBytes) {
// copy data to temporary parsing buffer
unsigned int nRemaining = 24 - nHdrPos;
unsigned int nCopy = std::min(nRemaining, nBytes);

memcpy(&hdrbuf[nHdrPos], pch, nCopy);
nHdrPos += nCopy;

// if header incomplete, exit
if (nHdrPos < 24) {
return nCopy;
}

// deserialize to CMessageHeader
try {
hdrbuf >> hdr;
} catch (const std::exception &) {
return -1;
}

// reject messages larger than MAX_SIZE
if (hdr.nMessageSize > MAX_SIZE) {
return -1;
}

// switch state to reading message data
in_data = true;

return nCopy;
}

readData

readData 主要用来解析消息体,消息的数据部分复制到消息数据流中(vRecv)来处理,如果 vRecv 的空间不够,会进行扩容,但最多分配256 KB,不能超过总消息大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int CNetMessage::readData(const char *pch, unsigned int nBytes) {
unsigned int nRemaining = hdr.nMessageSize - nDataPos;
unsigned int nCopy = std::min(nRemaining, nBytes);

if (vRecv.size() < nDataPos + nCopy) {
// Allocate up to 256 KiB ahead, but never more than the total message
// size.
vRecv.resize(std::min(hdr.nMessageSize, nDataPos + nCopy + 256 * 1024));
}

hasher.Write((const uint8_t *)pch, nCopy);
memcpy(&vRecv[nDataPos], pch, nCopy);
nDataPos += nCopy;
return nCopy;
}

缓冲区

在 net.h 文件中,我们能够看到如下定义:

1
2
3
4
//接收消息缓冲区
static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000;
//发送消息缓冲区
static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000;

我们将接收或者发送的数据放入到缓冲区,我们可以通过如下函数,分别对他们调用,加速我们的处理过程:

1
2
3
4
5
6
unsigned int CConnman::GetReceiveFloodSize() const {
return nReceiveFloodSize;
}
unsigned int CConnman::GetSendBufferSize() const {
return nSendBufferMaxSize;
}

本文由 copernicus 团队 冉小龙 分析编写,转载无需授权!