众所周知,比特币网络是采用的P2P网络体系,所以,没有明显的客户端与服务端的区别或者是概念,每一个节点既是自身的客户端,又是其它节点的服务端。
在sync.h
中,定义了 CSemaphore
,它包装了系统底层的信号量机制,对wait(), try_wait(),post()
实现了封装,代码如下:
1 2 3 4 5 6 7 8 9 10 11 class CSemaphore {private : boost::condition_variable condition; boost::mutex mutex; int value; public : CSemaphore(int init) : value(init) {} void wait () {} bool try_wait () {} void post () {} };
用于控制网络连接时的最大数量,每一个网络节点的最大连接数受限于信号量所允许的最大值。
下面我们按照一个网络连接从发送到接收到请求返回的这么个思路,来梳理代码逻辑。
CNode CNode定义在bitcoin.cpp
中,是比较重要的也是较为复杂的一个类,节点的所有信息都包含在内:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class CNode { SOCKET sock; CDataStream vSend; CDataStream vRecv; uint32_t nHeaderStart; uint32_t nMessageStart; int nVersion; std ::string strSubVer; int nStartingHeight; std ::vector <CAddress> *vAddr; int ban; int64_t doneAfter; CAddress you; };
在上述定义中,最主要的是 std::vector<CAddress> *vAddr;
它包含了连接的所有节点,如果有节点连接进来,就加入到这个vector中;如果某个节点断开连接,就从这个vector中删除。
在net.h
中,对CNode进行了详细的定义(所有关于节点的信息,都进行了详细罗列),由于篇幅较长,只罗列其中的一些关键结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 class CNode { friend class CConnman ; public : SOCKET hSocket; size_t nSendSize; size_t nSendOffset; std ::deque <std ::vector <uint8_t >> vSendMsg; ... const CAddress addr; std ::atomic<int > nVersion; CBloomFilter *pfilter; const NodeId id; protected : mapMsgCmdSize mapSendBytesPerMsgCmd; mapMsgCmdSize mapRecvBytesPerMsgCmd; public : uint256 hashContinue; std ::atomic<int > nStartingHeight; private : std ::list <CNetMessage> vRecvMsg; public : bool ReceiveMsgBytes (const char *pch, unsigned int nBytes, bool &complete) ; void SetRecvVersion (int nVersionIn) { nRecvVersion = nVersionIn; } int GetRecvVersion () { return nRecvVersion; } void SetSendVersion (int nVersionIn) ; int GetSendVersion () const ; void PushAddress (const CAddress &_addr, FastRandomContext &insecure_rand) { if (_addr.IsValid() && !addrKnown.contains(_addr.GetKey())) { if (vAddrToSend.size() >= MAX_ADDR_TO_SEND) { vAddrToSend[insecure_rand.randrange(vAddrToSend.size())] = _addr; } else { vAddrToSend.push_back(_addr); } } } void PushInventory (const CInv &inv) { LOCK(cs_inventory); if (inv.type == MSG_TX) { if (!filterInventoryKnown.contains(inv.hash)) { setInventoryTxToSend.insert(inv.hash); } } else if (inv.type == MSG_BLOCK) { vInventoryBlockToSend.push_back(inv.hash); } } void PushBlockHash (const uint256 &hash) { LOCK(cs_inventory); vBlockHashesToAnnounce.push_back(hash); } };
发送消息 CDataStream这个类主要是包装了一个带有双向缓冲区的接口, 它重载了 >> 和 <<,使用上述序列化读取和写入未格式化的数据模板,以线性时间填充数据;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 class CDataStream {protected : typedef CSerializeData vector_type; vector_type vch; unsigned int nReadPos; int nType; int nVersion; public : template <typename T> CDataStream &operator <<(const T &obj) { ::Serialize(*this , obj); return (*this ); } template <typename T> CDataStream &operator >>(T &obj) { ::Unserialize(*this , obj); return (*this ); } void read (char *pch, size_t nSize) { if (nSize == 0 ) { return ; } unsigned int nReadPosNext = nReadPos + nSize; if (nReadPosNext >= vch.size()) { if (nReadPosNext > vch.size()) { throw std ::ios_base::failure( "CDataStream::read(): end of data" ); } memcpy (pch, &vch[nReadPos], nSize); nReadPos = 0 ; vch.clear(); return ; } memcpy (pch, &vch[nReadPos], nSize); nReadPos = nReadPosNext; } void write (const char *pch, size_t nSize) { vch.insert(vch.end(), pch, pch + nSize); }
所以,当我们需要发送消息时,首先会把数据放到CDataStream
的数据流中,构造好完整的消息,但此时的消息格式是网络无法识别的,下一步,将构造好的消息放入到CSerializeData
(类似一个消息队列)进行序列化,序列化之后,我们就可以把消息放到SocketSendData
中发送出去。
CSerializeData 的格式如下:
1 2 typedef std ::vector <char , zero_after_free_allocator<char >> CSerializeData;
SocketSendData 的定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 size_t CConnman::SocketSendData(CNode *pnode) const { AssertLockHeld(pnode->cs_vSend); size_t nSentSize = 0 ; size_t nMsgCount = 0 ; for (const auto &data : pnode->vSendMsg) { assert(data.size() > pnode->nSendOffset); int nBytes = 0 ; ... } pnode->vSendMsg.erase(pnode->vSendMsg.begin(), pnode->vSendMsg.begin() + nMsgCount); if (pnode->vSendMsg.empty()) { assert(pnode->nSendOffset == 0 ); assert(pnode->nSendSize == 0 ); } return nSentSize; }
接收消息 接收消息的工作,主要是由 ThreadSocketHandler
来完成的,
1 2 3 if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) { pnode->CloseSocketDisconnect(); }
随后,通过 ReceiveMsgBytes
把从其它节点接收到的数据解析为单个数据,然后放回到消息队列,最后由ThreadMessageHandler
来进行最后的处理。
ReceiveMsgBytes解析数据的主要流程如下,调用的是CNetMessage下的readHeader和readData方法,随后,使用complete()进行一次判定,看解析是否完成:
1 2 3 4 5 6 7 int handled;if (!msg.in_data) { handled = msg.readHeader(pch, nBytes); } else { handled = msg.readData(pch, nBytes); }
readHeader 主要用来解析消息头,由上一篇文章我们能够知道,一个消息头,至少24字节,如果小于24字节直接退出,如果满足这个条件,先把接收到的数据的开始部分复制到消息头数据流中(hdrbuf),再反格式化成消息头(hdr)。消息数据最大为MAX_SIZE(0x02000000),如果大于这个值,证明出错,直接退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 int CNetMessage::readHeader(const char *pch, unsigned int nBytes) { unsigned int nRemaining = 24 - nHdrPos; unsigned int nCopy = std ::min(nRemaining, nBytes); memcpy (&hdrbuf[nHdrPos], pch, nCopy); nHdrPos += nCopy; if (nHdrPos < 24 ) { return nCopy; } try { hdrbuf >> hdr; } catch (const std ::exception &) { return -1 ; } if (hdr.nMessageSize > MAX_SIZE) { return -1 ; } in_data = true ; return nCopy; }
readData readData 主要用来解析消息体,消息的数据部分复制到消息数据流中(vRecv)来处理,如果 vRecv 的空间不够,会进行扩容,但最多分配256 KB,不能超过总消息大小。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 int CNetMessage::readData(const char *pch, unsigned int nBytes) { unsigned int nRemaining = hdr.nMessageSize - nDataPos; unsigned int nCopy = std ::min(nRemaining, nBytes); if (vRecv.size() < nDataPos + nCopy) { vRecv.resize(std ::min(hdr.nMessageSize, nDataPos + nCopy + 256 * 1024 )); } hasher.Write((const uint8_t *)pch, nCopy); memcpy (&vRecv[nDataPos], pch, nCopy); nDataPos += nCopy; return nCopy; }
缓冲区 在 net.h 文件中,我们能够看到如下定义:
1 2 3 4 static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000 ;static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000 ;
我们将接收或者发送的数据放入到缓冲区,我们可以通过如下函数,分别对他们调用,加速我们的处理过程:
1 2 3 4 5 6 unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; } unsigned int CConnman::GetSendBufferSize() const { return nSendBufferMaxSize; }
本文由 copernicus 团队 冉小龙
分析编写,转载无需授权!
为正常使用来必力评论功能请激活JavaScript