比特币源码分析:任务调度器的使用

栏目: 后端 · 发布时间: 6年前

内容简介:Bitcoin 进程启动后,有一个专门的线程做任务调度, 这些任务根据指定的时刻,执行对应的函数:调度器类主要是实现了一个生产者消费者的任务队列,只是这个任务队列是用 std::multimap 实现的,map 的key表达某一时刻,map的值表达:那一时刻要执行的函数,内部使用条件变量和锁来保护multimap ,还有几个bool 条件:CScheduler的client 通过调用schedule 往内部multimap添加一个条目; scheduleFromNow 和scheduleEvery 内部都

任务调度器

Bitcoin 进程启动后,有一个专门的线程做任务调度, 这些任务根据指定的时刻,执行对应的函数:

bool AppInitMain() { ....... // Start the lightweight task scheduler thread CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler); threadGroup.create_thread(boost::bind(&TraceThread, "scheduler", serviceLoop)); ....... }

调度器类主要是实现了一个生产者消费者的任务队列,只是这个任务队列是用 std::multimap 实现的,map 的key表达某一时刻,map的值表达:那一时刻要执行的函数,内部使用条件变量和锁来保护multimap ,还有几个bool 条件:

class CScheduler { public: CScheduler(); ~CScheduler(); typedef std::function Function; void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now()); void scheduleFromNow(Function f, int64_t deltaMilliSeconds); void scheduleEvery(Function f, int64_t deltaMilliSeconds); void serviceQueue(); void stop(bool drain=false); size_t getQueueInfo(boost::chrono::system_clock::time_point &first, boost::chrono::system_clock::time_point &last) const; bool AreThreadsServicingQueue() const; private: std::multimap taskQueue; boost::condition_variable newTaskScheduled; mutable boost::mutex newTaskMutex; int nThreadsServicingQueue; bool stopRequested; bool stopWhenEmpty; bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } };

CScheduler的client 通过调用schedule 往内部multimap添加一个条目; scheduleFromNow 和scheduleEvery 内部都是调用schedule 方法实现; 这三个方法属于生产者要生产任务的方法, 任务的消费者调用serviceQueue等待取走任务, 然后执行。

目前整个程序有一个全局的CScheduler实例:

static CScheduler scheduler; 这个实例对应只有一个消费者线程, 即唯一的后台调度器线程。

class SingleThreadedSchedulerClient 主要用途是,借助CScheduler类型,保障被添加到内部链表的任务,被串行执行:

class SingleThreadedSchedulerClient { private: CScheduler *m_pscheduler; CCriticalSection m_cs_callbacks_pending; std::list m_callbacks_pending; bool m_are_callbacks_running = false; void MaybeScheduleProcessQueue(); void ProcessQueue(); public: explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {} void AddToProcessQueue(std::function func); void EmptyQueue(); size_t CallbacksPending(); };

使用例子

基本的使用例子:

#include #include  #include  #include  #include static void doN(){ std::cout << "output now\n"; } static void doE(){ for(int i = 0; i < 10; i++){ std::cout << "i = " << i << '\n'; } std::cout << '\n'; } BOOST_AUTO_TEST_SUITE(sche_tests) BOOST_AUTO_TEST_CASE(sche) { CScheduler s; s.scheduleFromNow(doN, 1000); s.scheduleEvery(doE, 1000); boost::thread t(boost::bind(&CScheduler::serviceQueue, &s)); boost::this_thread::sleep_for(boost::chrono::seconds{5}); t.interrupt(); t.join(); } BOOST_AUTO_TEST_CASE(singlethread) { CScheduler s; SingleThreadedSchedulerClient sc (&s); for(int i = 1; i <11; i++){ auto f = [=]{ std::cout << "thread " << boost::this_thread::get_id() << " print arg: " << i << '\n'; }; sc.AddToProcessQueue(f); } boost::thread t(boost::bind(&CScheduler::serviceQueue, &s)); boost::this_thread::sleep_for(boost::chrono::seconds{1}); t.interrupt(); t.join(); } BOOST_AUTO_TEST_SUITE_END()

进程启动后, 全局对象连接管理器connman初始化后, connman 的Start 方法最后,通过scheduler 线程安排了一个定时任务: 每隔15分钟, 把connman 对象内部成员,banmap_t 类型的 setBanned, CAddrMan 类型的addrman 序列化到本地文件banlist.dat 和 peers.dat。

//init.cpp if (!connman.Start(scheduler, connOptions)) { return false; } //net.cpp bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) { ............... scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000); }

如果钱包功能编译使能, 会让scheduler 线程安排每隔500毫秒刷新钱包状态。

//init.cpp #ifdef ENABLE_WALLET StartWallets(scheduler); #endif //wallet/init.cpp void StartWallets(CScheduler& scheduler) { for (CWalletRef pwallet : vpwallets) { pwallet->postInitProcess(scheduler); } } //wallet/wallet.cpp void CWallet::postInitProcess(CScheduler& scheduler) { ReacceptWalletTransactions(); if (!CWallet::fFlushScheduled.exchange(true)) { scheduler.scheduleEvery(MaybeCompactWalletDB, 500); } }

PeerLogicValidation 对象的构造函数内部, scheduler 线程安排每45秒执行CheckForStaleTipAndEvictPeer函数主要做两件事:

  1. 关掉多余的外出tcp 连接

  2. 根据当前时间,检查当前节点的blockchain 的tip 是否有可能过时了,建立额外的连接同步跟上

PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, CScheduler &scheduler) : connman(connmanIn), m_stale_tip_check_time(0) { // Initialize global variables that cannot be constructed at startup. recentRejects.reset(new CRollingBloomFilter(120000, 0.000001)); const Consensus::Params& consensusParams = Params().GetConsensus(); // Stale tip checking and peer eviction are on two different timers, but we // don't want them to get out of sync due to drift in the scheduler, so we // combine them in one function and schedule at the quicker (peer-eviction) // timer. static_assert(EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer"); scheduler.scheduleEvery(std::bind(&PeerLogicValidation::CheckForStaleTipAndEvictPeers, this, consensusParams), EXTRA_PEER_CHECK_INTERVAL * 1000); } void PeerLogicValidation::CheckForStaleTipAndEvictPeers(const Consensus::Params &consensusParams) { if (connman == nullptr) return; int64_t time_in_seconds = GetTime(); EvictExtraOutboundPeers(time_in_seconds); if (time_in_seconds > m_stale_tip_check_time) { LOCK(cs_main); // Check whether our tip is stale, and if so, allow using an extra // outbound peer if (TipMayBeStale(consensusParams)) { LogPrintf("Potential stale tip detected, will try using extra outbound peer (last tip update: %d seconds ago)\n", time_in_seconds - g_last_tip_update); connman->SetTryNewOutboundPeer(true); } else if (connman->GetTryNewOutboundPeer()) { connman->SetTryNewOutboundPeer(false); } m_stale_tip_check_time = time_in_seconds + STALE_CHECK_INTERVAL; } }

以上就是bitoin 里面CScheduler类的主要使用场景。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Art of Computer Programming, Volume 4,  Fascicle 3

The Art of Computer Programming, Volume 4, Fascicle 3

Donald E. Knuth / Addison-Wesley Professional / 2005-08-05 / USD 19.99

Finally, after a wait of more than thirty-five years, the first part of Volume 4 is at last ready for publication. Check out the boxed set that brings together Volumes 1 - 4A in one elegant case, and ......一起来看看 《The Art of Computer Programming, Volume 4, Fascicle 3》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具