bitcoin 代码中大量使用 boost::signal
, boost::signal 实现了信号与槽的事件通知机制,或者说是一种消息的发布与订阅机制, signal
类型是一个可调用类型,slot
就是callback 对象,或者说事件的订阅者,signal 实例是一个可调用对象,调用signal 对象,就相当于发布了相应的事件, signal 的connect
, disconnect
方法分别相当于对事件的订阅,取消。
1 |
|
上面这个例子, 有五个函数订阅sig 事件,sig(5. , 3.) 的调用触发事件,参数5,3,相当于事件携带的消息paylaod, 传给了五个事件订阅者。
bitcoin 中定义了类型CMainSignals
来统一管理各个功能模块的事件通知,CMainSignal
是一个资源管理类型, 主要工作代理给由unique_ptr
管理内存的成员 m_internals
, 它的类型是MainSignalsInstance
,内部定义十个boost signal 变量, 分别表达十种要通知的事件。
1 | class CMainSignals { |
类型CValidationInterface
主要是统一表示某些对MainSignalsInstance
中定义的十个事件感兴趣的对象, 即这些事件的订阅者, 所有对这些事件感兴趣代码继承CValidationInterface
类型,
提供自己版本的这些虚成员函数的实现,覆盖baseCValidationInterface
中对应的空方法, 表达对相应的事件感兴趣, 不感兴趣的事件的回调方法继续是那些继承自base class 的空方法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26class CValidationInterface {
protected:
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {}
virtual void TransactionAddedToMempool(const CTransactionRef &ptxn) {}
virtual void TransactionRemovedFromMempool(const CTransactionRef &ptx) {}
virtual void BlockConnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex *pindex, const std::vector<CTransactionRef> &txnConflicted) {}
virtual void BlockDisconnected(const std::shared_ptr<const CBlock> &block) {}
virtual void SetBestChain(const CBlockLocator &locator) {}
virtual void Inventory(const uint256 &hash) {}
virtual void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) {}
virtual void BlockChecked(const CBlock&, const CValidationState&) {}
virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& block) {};
friend void ::RegisterValidationInterface(CValidationInterface*);
friend void ::UnregisterValidationInterface(CValidationInterface*);
friend void ::UnregisterAllValidationInterfaces();
};
CValidationInterface 有四个子类, CWallet , CZMQNotificationInterface, submitblock_StateCatcher, PeerLogicValidation 分别对应四个对MainSignalsInstance
中的事件感兴趣的订阅者。
1 | class CWallet final : public CCryptoKeyStore, public CValidationInterface |
这四个订阅者通过调用RegisterValidationInterface
, UnregisterValidationInterface
订阅,取消事件通知,函数接受参数是指向订阅者的指针。
1 | void RegisterValidationInterface(CValidationInterface* pwalletIn) { |
在程序启动,初始化阶段,启动调度器线程1
2
3
4
5
6
7bool AppInitMain()
{
.................
CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler);
threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
.................
}
调用RegisterBackgroundSignalScheduler(), 初始化全局MainSignalsInstance 实例; 调用RegisterWithMempoolSignals(), 订阅全局内存池对象的NotifyEntryRemoved 事件通知, MainSignalsInstance 只对由于超时,大小限制, blockchian 重组,替换等原因发生的离开内存池事件感兴趣, 收到后MainSignalsInstance 再作为事件发布者, 转发给其他订阅者, 如CWallet。
1 | bool AppInitMain() |
初始化全局的连接管理对象connman 后, 初始化全局的peerLogic 对象, 然后调用RegisterValidationInterface(), peerLogic 成为MainSignalsInstance 对象的订阅者, 如果用户编译了zeromq
支持模块,调用RegisterValidationInterface(), pzmqNotificationInterface 订阅MainSignalsInstance 。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20bool AppInitMain()
{
.............
assert(!g_connman);
g_connman = std::unique_ptr<CConnman>(new CConnman(GetRand(std::numeric_limits<uint64_t>::max()), GetRand(std::numeric_limits<uint64_t>::max())));
CConnman& connman = *g_connman;
peerLogic.reset(new PeerLogicValidation(&connman, scheduler));
RegisterValidationInterface(peerLogic.get());
.............
pzmqNotificationInterface = CZMQNotificationInterface::Create();
if (pzmqNotificationInterface) {
RegisterValidationInterface(pzmqNotificationInterface);
}
...............
}
如果钱包功能开启, 启动后打开钱包过程,会把钱包注册成为MainSignalsInstance的订阅者。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38bool AppInitMain()
{
........
if (!OpenWallets())
return false;
LogPrintf("No wallet support compiled in!\n");
........
}
bool OpenWallets()
{
if (gArgs.GetBoolArg("-disablewallet", DEFAULT_DISABLE_WALLET)) {
LogPrintf("Wallet disabled!\n");
return true;
}
for (const std::string& walletFile : gArgs.GetArgs("-wallet")) {
CWallet * const pwallet = CWallet::CreateWalletFromFile(walletFile);
if (!pwallet) {
return false;
}
vpwallets.push_back(pwallet);
}
return true;
}
CWallet* CWallet::CreateWalletFromFile(const std::string walletFile)
{
......
CWallet *walletInstance = new CWallet(std::move(dbw));
RegisterValidationInterface(walletInstance);
......
}
在submitblock rpc
调用中, 用户提交hex编码的原始block, 解析后, 调用ProcessNewBlock()检查处理,使用类型submitblock_StateCatcher
的对象sc 作为MainSignalsInstance 的订阅者,
对提交过去的block 的验证结果,作为事件通知返回给rpc 调用。1
2
3
4
5
6
7
8
9
10
11UniValue submitblock(const JSONRPCRequest& request)
{
...........
submitblock_StateCatcher sc(block.GetHash());
RegisterValidationInterface(&sc);
bool fAccepted = ProcessNewBlock(Params(), blockptr, true, nullptr);
UnregisterValidationInterface(&sc);
...........
}
CMainSignals
类型上面定义了一堆触发事件的方法, 别的代码模块调用这些方法, 触发相应的事件,把事件通知发给相关的订阅者。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
m_internals->m_schedulerClient.AddToProcessQueue([pindexNew, pindexFork, fInitialDownload, this] {
m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload);
});
}
void CMainSignals::TransactionAddedToMempool(const CTransactionRef &ptx) {
m_internals->m_schedulerClient.AddToProcessQueue([ptx, this] {
m_internals->TransactionAddedToMempool(ptx);
});
}
void CMainSignals::BlockConnected(const std::shared_ptr<const CBlock> &pblock, const CBlockIndex *pindex, const std::shared_ptr<const std::vector<CTransactionRef>>& pvtxConflicted) {
m_internals->m_schedulerClient.AddToProcessQueue([pblock, pindex, pvtxConflicted, this] {
m_internals->BlockConnected(pblock, pindex, *pvtxConflicted);
});
}
void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock> &pblock) {
m_internals->m_schedulerClient.AddToProcessQueue([pblock, this] {
m_internals->BlockDisconnected(pblock);
});
}
void CMainSignals::SetBestChain(const CBlockLocator &locator) {
m_internals->m_schedulerClient.AddToProcessQueue([locator, this] {
m_internals->SetBestChain(locator);
});
}
void CMainSignals::Inventory(const uint256 &hash) {
m_internals->m_schedulerClient.AddToProcessQueue([hash, this] {
m_internals->Inventory(hash);
});
}
void CMainSignals::Broadcast(int64_t nBestBlockTime, CConnman* connman) {
m_internals->Broadcast(nBestBlockTime, connman);
}
void CMainSignals::BlockChecked(const CBlock& block, const CValidationState& state) {
m_internals->BlockChecked(block, state);
}
void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock> &block) {
m_internals->NewPoWValidBlock(pindex, block);
}
CChainState 的ActivateBestChain
方法里, 发布BlockConnected, UpdatedBlockTip 事件:
1 | bool CChainState::ActivateBestChain(CValidationState &state, const CChainParams& chainparams, std::shared_ptr<const CBlock> pblock) { |
CChainState 的AcceptBlock 方法里,发布NewPoWValidBlock 事件:
1 | bool CChainState::AcceptBlock(const std::shared_ptr<const CBlock>& pblock, CValidationState& state, const CChainParams& chainparams, CBlockIndex** ppindex, bool fRequested, const CDiskBlockPos* dbp, bool* fNewBlock) |
CChainState 的DisconnectTip 方法里,发布 BlockDisconnected
事件:1
2
3
4
5
6bool CChainState::DisconnectTip(CValidationState& state, const CChainParams& chainparams, DisconnectedBlockTransactions *disconnectpool)
{
...................
GetMainSignals().BlockDisconnected(pblock);
..............
}
CChainState 的ConnectTip 方法里,发布 BlockChecked
事件:1
2
3
4
5
6
7
8
9
10bool CChainState::ConnectTip(CValidationState& state, const CChainParams& chainparams, CBlockIndex* pindexNew, const std::shared_ptr<const CBlock>& pblock, ConnectTrace& connectTrace, DisconnectedBlockTransactions &disconnectpool)
{
................
CCoinsViewCache view(pcoinsTip.get());
bool rv = ConnectBlock(blockConnecting, state, pindexNew, view, chainparams);
GetMainSignals().BlockChecked(blockConnecting, state);
................
}
PeerLogicValidation 的 SendMessage
方法里, 发布Broadcast事件, 通知钱包重新发送未确认的交易:1
2
3
4
5
6
7
8
9bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptMsgProc)
{
.............
if (!fReindex && !fImporting && !IsInitialBlockDownload())
{
GetMainSignals().Broadcast(nTimeBestReceived, connman);
}
.............
}
从网络上收到INV
消息后,通知给钱包,更新内部状态。1
2
3
4
5
6
7
8
9
10
11bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
{
.................
for (CInv &inv : vInv)
{
............
GetMainSignals().Inventory(inv.hash);
}
................
}
validation.cpp
里面的 ProcessNewBlock
在内部调用 CheckBlock 后,检查失败后,发布 BlockCheck s事件, 通告给相关订阅者。1
2
3
4
5
6
7
8
9
10
11bool ProcessNewBlock(const CChainParams& chainparams, const std::shared_ptr<const CBlock> pblock, bool fForceProcessing, bool *fNewBlock)
{
............
bool ret = CheckBlock(*pblock, state, chainparams.GetConsensus());
if (!ret) {
GetMainSignals().BlockChecked(*pblock, state);
return error("%s: AcceptBlock FAILED (%s)", __func__, state.GetDebugMessage());
}
...........
}
FlushStateToDisk
, 发布SetBestChain
事件, 通知钱包1
2
3
4
5
6
7
8
9bool static FlushStateToDisk(const CChainParams& chainparams, CValidationState &state, FlushStateMode mode, int nManualPruneHeight) {
...............
if (fDoFullFlush || ((mode == FLUSH_STATE_ALWAYS || mode == FLUSH_STATE_PERIODIC) && nNow > nLastSetChain + (int64_t)DATABASE_WRITE_INTERVAL * 1000000)) {
// Update best block in wallet (so we can detect restored wallets).
GetMainSignals().SetBestChain(chainActive.GetLocator());
nLastSetChain = nNow;
}
...............
}
validation.cpp
里面的AcceptToMemoryPoolWorker
, 在结束前, 发布TransactionAddedToMempool
事件。1
2
3
4
5
6
7
8static bool AcceptToMemoryPoolWorker(const CChainParams& chainparams, CTxMemPool& pool, CValidationState& state, const CTransactionRef& ptx,
bool* pfMissingInputs, int64_t nAcceptTime, std::list<CTransactionRef>* plTxnReplaced,
bool bypass_limits, const CAmount& nAbsurdFee, std::vector<COutPoint>& coins_to_uncache)
{
................
GetMainSignals().TransactionAddedToMempool(ptx);
}
本文由 Copernicus团队
喻建写作,转载无需授权