1. Overview
NMSTL和Asio一样都是template-heavy code,大部分都是头文件。NMSTL设计者考虑到使用这种东西编写C++代码出错的话,很容易在模板的错误中迷失,所以在bin目录下面用perl编写了一个wtf工具,可以将g++编译错误信息很好地表现出来。使用也非常简单,分析效果比原来的g++错误信息好多了。
./bin/wtf g++ <command>
- atomic.原子操作。
- callback.回调函数封装(boost::bind).
- debug.调试和日志工具。
- internal.主要是提供对象的to_string等方法,要求对象本身提供as_string这样的方法。
- io.提供各种buffer,并且提供IOHandle封装各种读写操作。IOHandle里面有fd并且有对fd引用计数。
- ioevent.select实现的reactor.每个fd绑定到IOHandler上面,一旦触发可读可写使用调用对应IOHandler的ravil和wavil事件。
- net.网络地址和socket相关操作。
- netioevent.封装网络异步IO操作。实现上和muduo非常相似,对MsgHandler稍加修改可以支持HTTP协议,而MsgHandler本身比较适合RPC.
- ntime.时间封装。
- ptr.多线程安全指针(boost::shared_ptr).内部有引用计数。
- rpc.RPC实现,没有仔细阅读。
- seda.提出SEDAStage这个概念允许将多个操作串联起来,然后可以配合线程池和异步队列动态修改工作线程。
- serial.序列化和反序列化部分,没有仔细阅读。
- terminal.封装终端异步IO操作,使用了readline库,没有仔细阅读。
- thread.线程方面封装,锁,条件变量,线程和线程池。
- tqueue.异步队列。
template<class In, class Out = void> class SEDAStage;
一个SEDAStage和一个In/Out类型绑定形成一个stage.这个stage允许不断地插入In对象,实现起来内部能够非常高效地处理这些In对象生成Out对象。然后一个stage可能后面会挂上一个next stage,这个next stage接收Out对象然后不断地继续。对于这里的高效处理就是使用异步队列+线程池模型来处理了,我们可以看看处理这个部分的代码
class my_thread : public Thread { bool quit; bool pending; SEDAStage<In>& stage; public: my_thread(SEDAStage<In>& stage) : quit(false), pending(true), stage(stage) {} virtual void run() { while (true) { In in; if (!stage.inq.wait(in, &pending)) // 其中这里的stage.inq就是异步队列 break; // 注意这个地方,这个地方能够通过异步队列的返回值来通知线程是否需要自己销毁。 stage.handle(in); // 从异步队列取出in之后交给stage::handle来处理。 } if (!stage.inq.is_closed()) { // 如果stage本身没有关闭的话,那么说明是动态取消线程 delete_on_completion(); // 那么线程需要在退出做一些事情。主动退出的话通常线程需要detach. stage.thread_dying(this); // 然后从线程池中移除这个线程。 } else { // 如果是stage主动挂掉的话,那么就需要让线程自己销毁,根据状态决定是否需要join. // The stage is dying; let the destructor reap this // Thread } } };
template<class In, class Out> class SEDAStage : public SEDAStage<In, void> { SEDAStage<Out>* next; public: SEDAStage(string name = string()) : SEDAStage<In, void>(name) { } // 设置下一个阶段Stage /// Specifies the stage to be used after this stage. /// This stage will be used when the handler invokes /// output(Out& out). void set_next(SEDAStage<Out>& next) { this->next = &next; } protected: // 这样允许我们在handle之后得到Out结果之后,output(handle(in)) // 这样Out部分又可以放到下一个步骤进行处理了。 // 不过从代码上看类型上面有点问题,如果是多个stage级联的话 // 类型上看需要进行强转。 /// Injects the output element into the queue which has /// previous been specified by set_next(SEDAStage<Out>&). void output(Out& out) { ASSERT(next != 0); next->inject(out); } };
3. 动态线程池
- min_threads.
- max_threads.
- reached_capacity(rc).
3.1. 增加线程
/// Injects an element into the queue. void inject(In& in) { if (!inq.push(in)) // 如果异步队列push返回false的话,那么就会调用clogged. clogged(in); } /// Called when someone tries to inject an element into the queue, /// but there are no handlers available to service it. This is /// invoked on the same Thread which calls inject (before inject /// returns) so it must return very quickly. virtual void clogged(In& in) { locking (this) { reached_capacity = 0; // rc=0 add_threads(); // 增加线程 } } void add_threads() { // Requires lock on this while (num_threads < min_threads || (num_threads < max_threads && inq.extra_capacity() < 0)) { inq.waiter_pending(); ++num_threads; my_thread *th = new my_thread(*this); threads.insert(th); th->start(); } }
然后我们看看异步队列返回false和和waiter_pending的操作是如何的。从上面看到waiter_pending就是 增加线程的数量,所以这里waiters可以认为是可以使用的线程数目。而waiters>=q.size()表示工作线程数目 大于task数量,那么这个时候可以认为没有必要增加线程。不过这里我们没有分析extra_capacity这个意思。
/// Adds an item to the back of the queue. Returns true if the item /// will be immediately dispatched. bool push(const value_type& v) { locking(m) { q.push(v); c.signal(); return waiters >= q.size(); } } void waiter_pending() { locking(m) ++waiters; // waiters是已经开辟的线程数目 }
3.2. 回收线程
while (true) { In in; if (!stage.inq.wait(in, &pending)) break; stage.handle(in); }
/// Waits for the queue to become non-empty, then atomically /// retrieves an item from the queue, sets out, and returns true; or /// returns false if the queue has been closed. bool wait(T& out, bool *pending = 0) { locking(m) { if (pending && *pending) { --waiters; *pending = false; } while (1) { if (waiters_starve) { // 一种情况是存在waiters_starve. --waiters_starve; // Because starve decremented waiters, and we // decremented it again in this loop... ++waiters; return false; } if (!q.empty()) { out = q.front(); q.pop(); return true; } if (closed) // 另外一种就是这个异步队列关闭 return false; // Can only get to this point if the queue is empty // but has not been closed. ++waiters; c.wait(m); --waiters; } } }
/// Starves one waiter, causing its "wait" to return false. void starve() { locking (m) { ++waiters_starve; --waiters; c.signal(); } }
所以可以认为调用一次starve的话那么就会回收一个线程。然后我们看看starve是在什么时候触发的。 在seda下面存在seda_clock_thread这个类,这个类单独起一个线程然后定时(默认5s)会调用SEDAStage::scan方法。
/// Periodic scan to see whether to kill a Thread. void scan() { locking (this) { if (reached_capacity > 5 && num_threads > min_threads) { inq.starve(); --num_threads; } ++reached_capacity; } }
也就是说线程的删减是定时触发的。如果reached_capacity>5并且当前线程数目大小最小线程数目的话,那么就会一直回收线程。 而reached_capacity回在clogged这个部分清空。
3.3. 逻辑总结
如果当前可用线程数目小于queue tasks数目的时候,那么就会添加线程并且将rc置零。然后后端会启动线程每隔5s会将rc++. 如果rc>=6的话并且当前线程个数大于最小线程个数的话,那么就会销毁一个线程。可以看出回收线程至少要达到30s以后才会操作, 并且在这30s内没有任何添加线程的动作,如果一旦有的那么会重新计算。一旦开始回收线程之后,以后每隔5s就会回收一个。 可以看到回收线程是一个非常平滑的过程,并且一旦增加线程的话计时又要等上30s,考虑处理性能同时考虑了线程本身的overhead.