nmstl

Table of Contents

http://sourceforge.net/projects/nmstl/

1. Overview

代码非常精简大概6000多行,但是完成不是一个库而是一个框架的功能。代码非常飘逸清爽,并且很好地被注释了。

里面也使用了很多C++的特性但是却没有滥用,所以不管是从代码实现角度还是从设计思想角度,都是值得阅读的。

整体思想和muduo非常相似,唯一不同的地方就是工作线程有策略地动态进程调整。这也是特色的地方,我们将这个特色地方,称之为SEDA并且在后面分析。

NMSTL和Asio一样都是template-heavy code,大部分都是头文件。NMSTL设计者考虑到使用这种东西编写C++代码出错的话,很容易在模板的错误中迷失,所以在bin目录下面用perl编写了一个wtf工具,可以将g++编译错误信息很好地表现出来。使用也非常简单,分析效果比原来的g++错误信息好多了。

./bin/wtf g++ <command>

下面是NMSTL主要文件列表,这里给出简要的说明

  • atomic.原子操作。
  • callback.回调函数封装(boost::bind).
  • debug.调试和日志工具。
  • internal.主要是提供对象的to_string等方法,要求对象本身提供as_string这样的方法。
  • io.提供各种buffer,并且提供IOHandle封装各种读写操作。IOHandle里面有fd并且有对fd引用计数。
  • ioevent.select实现的reactor.每个fd绑定到IOHandler上面,一旦触发可读可写使用调用对应IOHandler的ravil和wavil事件。
  • net.网络地址和socket相关操作。
  • netioevent.封装网络异步IO操作。实现上和muduo非常相似,对MsgHandler稍加修改可以支持HTTP协议,而MsgHandler本身比较适合RPC.
  • ntime.时间封装。
  • ptr.多线程安全指针(boost::shared_ptr).内部有引用计数。
  • rpc.RPC实现,没有仔细阅读。
  • seda.提出SEDAStage这个概念允许将多个操作串联起来,然后可以配合线程池和异步队列动态修改工作线程。
  • serial.序列化和反序列化部分,没有仔细阅读。
  • terminal.封装终端异步IO操作,使用了readline库,没有仔细阅读。
  • thread.线程方面封装,锁,条件变量,线程和线程池。
  • tqueue.异步队列。

2. SEDA

http://www.eecs.harvard.edu/~mdw/proj/seda/.

SEDA并不是NMSTL提出的,SEDA主要思想就是提出动态线程池这个概念。动态线程池可以在吞吐和响应上平衡,而划分成为stage的话可以让整个设计更加灵活,甚至一个stage都可以使用block方式处理(如果异步编写比较麻烦的话)通过线程来支持,当然一个stage推荐还是使用nonblock方式处理。我们这里看看NMSTL怎么实现动态线程池的。首先看看SEDAStage这么一个概念。在NMSTL里面定义是这样的

template<class In, class Out = void>
class SEDAStage;

一个SEDAStage和一个In/Out类型绑定形成一个stage.这个stage允许不断地插入In对象,实现起来内部能够非常高效地处理这些In对象生成Out对象。然后一个stage可能后面会挂上一个next stage,这个next stage接收Out对象然后不断地继续。对于这里的高效处理就是使用异步队列+线程池模型来处理了,我们可以看看处理这个部分的代码

class my_thread : public Thread {
bool quit;
bool pending;
SEDAStage<In>& stage;

  public:
my_thread(SEDAStage<In>& stage) :
    quit(false), pending(true), stage(stage) {}

virtual void run() {
    while (true) {
    In in;
    if (!stage.inq.wait(in, &pending)) // 其中这里的stage.inq就是异步队列
        break; // 注意这个地方,这个地方能够通过异步队列的返回值来通知线程是否需要自己销毁。

    stage.handle(in); // 从异步队列取出in之后交给stage::handle来处理。
    }

    if (!stage.inq.is_closed()) { // 如果stage本身没有关闭的话,那么说明是动态取消线程
    delete_on_completion(); // 那么线程需要在退出做一些事情。主动退出的话通常线程需要detach.
    stage.thread_dying(this); // 然后从线程池中移除这个线程。
    } else { // 如果是stage主动挂掉的话,那么就需要让线程自己销毁,根据状态决定是否需要join.
    // The stage is dying; let the destructor reap this
    // Thread
    }
}
};

然后我们看看多个Stage是如何相连在一起的.

template<class In, class Out>
class SEDAStage : public SEDAStage<In, void> {
    SEDAStage<Out>* next;

  public:
    SEDAStage(string name = string()) :
    SEDAStage<In, void>(name)
    {
    }
    // 设置下一个阶段Stage
    /// Specifies the stage to be used after this stage.
    /// This stage will be used when the handler invokes
    /// output(Out& out).
    void set_next(SEDAStage<Out>& next) { this->next = &next; }

  protected:
    // 这样允许我们在handle之后得到Out结果之后,output(handle(in))
    // 这样Out部分又可以放到下一个步骤进行处理了。
    // 不过从代码上看类型上面有点问题,如果是多个stage级联的话
    // 类型上看需要进行强转。
    /// Injects the output element into the queue which has
    /// previous been specified by set_next(SEDAStage<Out>&).
    void output(Out& out) {
    ASSERT(next != 0);
    next->inject(out);
    }
};

3. 动态线程池

首先SEDAStage对于动态线程池有三个指标

  • min_threads.
  • max_threads.
  • reached_capacity(rc).

我们主要关注第三个参数。rc初始化为0.然后我们开始进行输出In对象

3.1. 增加线程

/// Injects an element into the queue.
void inject(In& in) {
if (!inq.push(in)) // 如果异步队列push返回false的话,那么就会调用clogged.
    clogged(in);
}

/// Called when someone tries to inject an element into the queue,
/// but there are no handlers available to service it.  This is
/// invoked on the same Thread which calls inject (before inject
/// returns) so it must return very quickly.
virtual void clogged(In& in) {
locking (this) {
    reached_capacity = 0; // rc=0
    add_threads(); // 增加线程
}
}
void add_threads() {
// Requires lock on this
while (num_threads < min_threads ||
       (num_threads < max_threads && inq.extra_capacity() < 0))
{
    inq.waiter_pending();
    ++num_threads;
    my_thread *th = new my_thread(*this);
    threads.insert(th);

    th->start();
}
}

然后我们看看异步队列返回false和和waiter_pending的操作是如何的。从上面看到waiter_pending就是 增加线程的数量,所以这里waiters可以认为是可以使用的线程数目。而waiters>=q.size()表示工作线程数目 大于task数量,那么这个时候可以认为没有必要增加线程。不过这里我们没有分析extra_capacity这个意思。

/// Adds an item to the back of the queue.  Returns true if the item
/// will be immediately dispatched.
bool push(const value_type& v) {
locking(m) {
    q.push(v);
        c.signal();
        return waiters >= q.size();
}
}

void waiter_pending() {
locking(m)
    ++waiters; // waiters是已经开辟的线程数目
}

如果阅读完成回收线程就会发现extra_capacity意思就是当前空余线程数目。

3.2. 回收线程

然后我们还是回来看看导致线程减少的这部分逻辑

while (true) {
In in;
if (!stage.inq.wait(in, &pending))
    break;

stage.handle(in);
}

如果异步队列在wait返回false的话那么就会取消线程。阅读wait这个部分代码的话就会发现只有两个地方是返回false

/// Waits for the queue to become non-empty, then atomically
/// retrieves an item from the queue, sets out, and returns true; or
/// returns false if the queue has been closed.
bool wait(T& out, bool *pending = 0) {
locking(m) {
    if (pending && *pending) {
    --waiters;
    *pending = false;
    }

    while (1) {
    if (waiters_starve) { // 一种情况是存在waiters_starve.
        --waiters_starve;

        // Because starve decremented waiters, and we
        // decremented it again in this loop...
        ++waiters;

        return false;
    }

    if (!q.empty()) {
        out = q.front();
        q.pop();
        return true;
    }

    if (closed) // 另外一种就是这个异步队列关闭
        return false;

            // Can only get to this point if the queue is empty
            // but has not been closed.

    ++waiters;
    c.wait(m);
    --waiters;
    }
}
}

我们什么时候设置waiters_starve的呢?是在starve这个函数里面

/// Starves one waiter, causing its "wait" to return false.
void starve() {
locking (m) {
    ++waiters_starve;
    --waiters;
    c.signal();
}
}

所以可以认为调用一次starve的话那么就会回收一个线程。然后我们看看starve是在什么时候触发的。 在seda下面存在seda_clock_thread这个类,这个类单独起一个线程然后定时(默认5s)会调用SEDAStage::scan方法。

/// Periodic scan to see whether to kill a Thread.
void scan() {
locking (this) {
    if (reached_capacity > 5 && num_threads > min_threads) {
    inq.starve();
    --num_threads;
    }
    ++reached_capacity;
}
}

也就是说线程的删减是定时触发的。如果reached_capacity>5并且当前线程数目大小最小线程数目的话,那么就会一直回收线程。 而reached_capacity回在clogged这个部分清空。

3.3. 逻辑总结

如果当前可用线程数目小于queue tasks数目的时候,那么就会添加线程并且将rc置零。然后后端会启动线程每隔5s会将rc++. 如果rc>=6的话并且当前线程个数大于最小线程个数的话,那么就会销毁一个线程。可以看出回收线程至少要达到30s以后才会操作, 并且在这30s内没有任何添加线程的动作,如果一旦有的那么会重新计算。一旦开始回收线程之后,以后每隔5s就会回收一个。 可以看到回收线程是一个非常平滑的过程,并且一旦增加线程的话计时又要等上30s,考虑处理性能同时考虑了线程本身的overhead.