任务调度器
Bitcoin 进程启动后,有一个专门的线程做任务调度, 这些任务根据指定的时刻,执行对应的函数:1
2
3
4
5
6
7
8bool AppInitMain()
{
.......
// Start the lightweight task scheduler thread
CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler);
threadGroup.create_thread(boost::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));
.......
}
调度器类主要是实现了一个生产者消费者的任务队列,只是这个任务队列是用 std::multimap 实现的,map 的key表达某一时刻,map的值表达:那一时刻要执行的函数,内部使用条件变量和锁来保护multimap ,还有几个bool 条件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26class CScheduler
{
public:
CScheduler();
~CScheduler();
typedef std::function<void(void)> Function;
void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now());
void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
void scheduleEvery(Function f, int64_t deltaMilliSeconds);
void serviceQueue();
void stop(bool drain=false);
size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
boost::chrono::system_clock::time_point &last) const;
bool AreThreadsServicingQueue() const;
private:
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
boost::condition_variable newTaskScheduled;
mutable boost::mutex newTaskMutex;
int nThreadsServicingQueue;
bool stopRequested;
bool stopWhenEmpty;
bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
};
CScheduler的client 通过调用schedule 往内部multimap添加一个条目;
scheduleFromNow 和scheduleEvery 内部都是调用schedule 方法实现;
这三个方法属于生产者要生产任务的方法, 任务的消费者调用serviceQueue等待取走任务, 然后执行。
目前整个程序有一个全局的CScheduler实例:1
static CScheduler scheduler;
这个实例对应只有一个消费者线程, 即唯一的后台调度器线程。class SingleThreadedSchedulerClient
主要用途是,借助CScheduler类型,保障被添加到内部链表的任务,被串行执行:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class SingleThreadedSchedulerClient {
private:
CScheduler *m_pscheduler;
CCriticalSection m_cs_callbacks_pending;
std::list<std::function<void (void)>> m_callbacks_pending;
bool m_are_callbacks_running = false;
void MaybeScheduleProcessQueue();
void ProcessQueue();
public:
explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
void AddToProcessQueue(std::function<void (void)> func);
void EmptyQueue();
size_t CallbacksPending();
};
使用例子
基本的使用例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static void doN(){
std::cout << "output now\n";
}
static void doE(){
for(int i = 0; i < 10; i++){
std::cout << "i = " << i << '\n';
}
std::cout << '\n';
}
BOOST_AUTO_TEST_SUITE(sche_tests)
BOOST_AUTO_TEST_CASE(sche)
{
CScheduler s;
s.scheduleFromNow(doN, 1000);
s.scheduleEvery(doE, 1000);
boost::thread t(boost::bind(&CScheduler::serviceQueue, &s));
boost::this_thread::sleep_for(boost::chrono::seconds{5});
t.interrupt();
t.join();
}
BOOST_AUTO_TEST_CASE(singlethread)
{
CScheduler s;
SingleThreadedSchedulerClient sc (&s);
for(int i = 1; i <11; i++){
auto f = [=]{
std::cout << "thread " << boost::this_thread::get_id() << " print arg: " << i << '\n';
};
sc.AddToProcessQueue(f);
}
boost::thread t(boost::bind(&CScheduler::serviceQueue, &s));
boost::this_thread::sleep_for(boost::chrono::seconds{1});
t.interrupt();
t.join();
}
BOOST_AUTO_TEST_SUITE_END()
进程启动后, 全局对象连接管理器connman初始化后, connman 的Start 方法最后,通过scheduler 线程安排了一个定时任务: 每隔15分钟, 把connman 对象内部成员,banmap_t 类型的 setBanned, CAddrMan 类型的addrman 序列化到本地文件banlist.dat 和 peers.dat。1
2
3
4
5
6
7
8
9
10//init.cpp
if (!connman.Start(scheduler, connOptions)) {
return false;
}
//net.cpp
bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
{
...............
scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000);
}
如果钱包功能编译使能, 会让scheduler 线程安排每隔500毫秒刷新钱包状态。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20//init.cpp
StartWallets(scheduler);
//wallet/init.cpp
void StartWallets(CScheduler& scheduler) {
for (CWalletRef pwallet : vpwallets) {
pwallet->postInitProcess(scheduler);
}
}
//wallet/wallet.cpp
void CWallet::postInitProcess(CScheduler& scheduler)
{
ReacceptWalletTransactions();
if (!CWallet::fFlushScheduled.exchange(true)) {
scheduler.scheduleEvery(MaybeCompactWalletDB, 500);
}
}
PeerLogicValidation 对象的构造函数内部, scheduler 线程安排每45秒执行CheckForStaleTipAndEvictPeer函数主要做两件事:
- 关掉多余的外出tcp 连接
- 根据当前时间,检查当前节点的blockchain 的tip 是否有可能过时了,建立额外的连接同步跟上
1 | PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, CScheduler &scheduler) : connman(connmanIn), m_stale_tip_check_time(0) { |
以上就是bitoin 里面CScheduler类的主要使用场景。
本文由 Copernicus团队 喻建
编写,转载无需授权!