nmstl

Table of Contents

http://sourceforge.net/projects/nmstl/

1 Overview

代码非常精简大概6000多行,但是完成不是一个库而是一个框架的功能。代码非常飘逸清爽,并且很好地被注释了。

里面也使用了很多C++的特性但是却没有滥用,所以不管是从代码实现角度还是从设计思想角度,都是值得阅读的。

整体思想和muduo非常相似,唯一不同的地方就是工作线程有策略地动态进程调整。这也是特色的地方,我们将这个特色地方,称之为SEDA并且在后面分析。

NMSTL和Asio一样都是template-heavy code,大部分都是头文件。NMSTL设计者考虑到使用这种东西编写C++代码出错的话,很容易在模板的错误中迷失,所以在bin目录下面用perl编写了一个wtf工具,可以将g++编译错误信息很好地表现出来。使用也非常简单,分析效果比原来的g++错误信息好多了。

./bin/wtf g++ <command>

下面是NMSTL主要文件列表,这里给出简要的说明

  • atomic.原子操作。
  • callback.回调函数封装(boost::bind).
  • debug.调试和日志工具。
  • internal.主要是提供对象的to_string等方法,要求对象本身提供as_string这样的方法。
  • io.提供各种buffer,并且提供IOHandle封装各种读写操作。IOHandle里面有fd并且有对fd引用计数。
  • ioevent.select实现的reactor.每个fd绑定到IOHandler上面,一旦触发可读可写使用调用对应IOHandler的ravil和wavil事件。
  • net.网络地址和socket相关操作。
  • netioevent.封装网络异步IO操作。实现上和muduo非常相似,对MsgHandler稍加修改可以支持HTTP协议,而MsgHandler本身比较适合RPC.
  • ntime.时间封装。
  • ptr.多线程安全指针(boost::shared_ptr).内部有引用计数。
  • rpc.RPC实现,没有仔细阅读。
  • seda.提出SEDAStage这个概念允许将多个操作串联起来,然后可以配合线程池和异步队列动态修改工作线程。
  • serial.序列化和反序列化部分,没有仔细阅读。
  • terminal.封装终端异步IO操作,使用了readline库,没有仔细阅读。
  • thread.线程方面封装,锁,条件变量,线程和线程池。
  • tqueue.异步队列。

2 SEDA

http://www.eecs.harvard.edu/~mdw/proj/seda/.

SEDA并不是NMSTL提出的,SEDA主要思想就是提出动态线程池这个概念。动态线程池可以在吞吐和响应上平衡,而划分成为stage的话可以让整个设计更加灵活,甚至一个stage都可以使用block方式处理(如果异步编写比较麻烦的话)通过线程来支持,当然一个stage推荐还是使用nonblock方式处理。我们这里看看NMSTL怎么实现动态线程池的。首先看看SEDAStage这么一个概念。在NMSTL里面定义是这样的

template<class In, class Out = void>
class SEDAStage;

一个SEDAStage和一个In/Out类型绑定形成一个stage.这个stage允许不断地插入In对象,实现起来内部能够非常高效地处理这些In对象生成Out对象。然后一个stage可能后面会挂上一个next stage,这个next stage接收Out对象然后不断地继续。对于这里的高效处理就是使用异步队列+线程池模型来处理了,我们可以看看处理这个部分的代码

class my_thread : public Thread {
    bool quit;
    bool pending;
    SEDAStage<In>& stage;

  public:
    my_thread(SEDAStage<In>& stage) :
        quit(false), pending(true), stage(stage) {}

    virtual void run() {
        while (true) {
            In in;
            if (!stage.inq.wait(in, &pending)) // 其中这里的stage.inq就是异步队列
                break; // 注意这个地方,这个地方能够通过异步队列的返回值来通知线程是否需要自己销毁。

            stage.handle(in); // 从异步队列取出in之后交给stage::handle来处理。
        }

        if (!stage.inq.is_closed()) { // 如果stage本身没有关闭的话,那么说明是动态取消线程
            delete_on_completion(); // 那么线程需要在退出做一些事情。主动退出的话通常线程需要detach.
            stage.thread_dying(this); // 然后从线程池中移除这个线程。
        } else { // 如果是stage主动挂掉的话,那么就需要让线程自己销毁,根据状态决定是否需要join.
            // The stage is dying; let the destructor reap this
            // Thread
        }
    }
};

然后我们看看多个Stage是如何相连在一起的.

template<class In, class Out>
class SEDAStage : public SEDAStage<In, void> {
    SEDAStage<Out>* next;

  public:
    SEDAStage(string name = string()) :
        SEDAStage<In, void>(name)
    {
    }
    // 设置下一个阶段Stage
    /// Specifies the stage to be used after this stage.
    /// This stage will be used when the handler invokes
    /// output(Out& out).
    void set_next(SEDAStage<Out>& next) { this->next = &next; }

  protected:
    // 这样允许我们在handle之后得到Out结果之后,output(handle(in))
    // 这样Out部分又可以放到下一个步骤进行处理了。
    // 不过从代码上看类型上面有点问题,如果是多个stage级联的话
    // 类型上看需要进行强转。
    /// Injects the output element into the queue which has
    /// previous been specified by set_next(SEDAStage<Out>&).
    void output(Out& out) {
        ASSERT(next != 0);
        next->inject(out);
    }
};

3 动态线程池

首先SEDAStage对于动态线程池有三个指标

  • min_threads.
  • max_threads.
  • reached_capacity(rc).

我们主要关注第三个参数。rc初始化为0.然后我们开始进行输出In对象

3.1 增加线程

/// Injects an element into the queue.
void inject(In& in) {
    if (!inq.push(in)) // 如果异步队列push返回false的话,那么就会调用clogged.
        clogged(in);
}

/// Called when someone tries to inject an element into the queue,
/// but there are no handlers available to service it.  This is
/// invoked on the same Thread which calls inject (before inject
/// returns) so it must return very quickly.
virtual void clogged(In& in) {
    locking (this) {
        reached_capacity = 0; // rc=0
        add_threads(); // 增加线程
    }
}
void add_threads() {
    // Requires lock on this
    while (num_threads < min_threads ||
           (num_threads < max_threads && inq.extra_capacity() < 0))
    {
        inq.waiter_pending();
        ++num_threads;
        my_thread *th = new my_thread(*this);
        threads.insert(th);

        th->start();
    }
}

然后我们看看异步队列返回false和和waiter_pending的操作是如何的。从上面看到waiter_pending就是 增加线程的数量,所以这里waiters可以认为是可以使用的线程数目。而waiters>=q.size()表示工作线程数目 大于task数量,那么这个时候可以认为没有必要增加线程。不过这里我们没有分析extra_capacity这个意思。

/// Adds an item to the back of the queue.  Returns true if the item
/// will be immediately dispatched.
bool push(const value_type& v) {
    locking(m) {
        q.push(v);
        c.signal();
        return waiters >= q.size();
    }
}

void waiter_pending() {
    locking(m)
        ++waiters; // waiters是已经开辟的线程数目
}

如果阅读完成回收线程就会发现extra_capacity意思就是当前空余线程数目。

3.2 回收线程

然后我们还是回来看看导致线程减少的这部分逻辑

while (true) {
    In in;
    if (!stage.inq.wait(in, &pending))
        break;

    stage.handle(in);
}

如果异步队列在wait返回false的话那么就会取消线程。阅读wait这个部分代码的话就会发现只有两个地方是返回false

/// Waits for the queue to become non-empty, then atomically
/// retrieves an item from the queue, sets out, and returns true; or
/// returns false if the queue has been closed.
bool wait(T& out, bool *pending = 0) {
    locking(m) {
        if (pending && *pending) {
            --waiters;
            *pending = false;
        }

        while (1) {
            if (waiters_starve) { // 一种情况是存在waiters_starve.
                --waiters_starve;

                // Because starve decremented waiters, and we
                // decremented it again in this loop...
                ++waiters;

                return false;
            }

            if (!q.empty()) {
                out = q.front();
                q.pop();
                return true;
            }

            if (closed) // 另外一种就是这个异步队列关闭
                return false;

            // Can only get to this point if the queue is empty
            // but has not been closed.

            ++waiters;
            c.wait(m);
            --waiters;
        }
    }
}

我们什么时候设置waiters_starve的呢?是在starve这个函数里面

/// Starves one waiter, causing its "wait" to return false.
void starve() {
    locking (m) {
        ++waiters_starve;
        --waiters;
        c.signal();
    }
}

所以可以认为调用一次starve的话那么就会回收一个线程。然后我们看看starve是在什么时候触发的。 在seda下面存在seda_clock_thread这个类,这个类单独起一个线程然后定时(默认5s)会调用SEDAStage::scan方法。

/// Periodic scan to see whether to kill a Thread.
void scan() {
    locking (this) {
        if (reached_capacity > 5 && num_threads > min_threads) {
            inq.starve();
            --num_threads;
        }
        ++reached_capacity;
    }
}

也就是说线程的删减是定时触发的。如果reached_capacity>5并且当前线程数目大小最小线程数目的话,那么就会一直回收线程。 而reached_capacity回在clogged这个部分清空。

3.3 逻辑总结

如果当前可用线程数目小于queue tasks数目的时候,那么就会添加线程并且将rc置零。然后后端会启动线程每隔5s会将rc++. 如果rc>=6的话并且当前线程个数大于最小线程个数的话,那么就会销毁一个线程。可以看出回收线程至少要达到30s以后才会操作, 并且在这30s内没有任何添加线程的动作,如果一旦有的那么会重新计算。一旦开始回收线程之后,以后每隔5s就会回收一个。 可以看到回收线程是一个非常平滑的过程,并且一旦增加线程的话计时又要等上30s,考虑处理性能同时考虑了线程本身的overhead.

comments powered by Disqus