hpserver

Table of Contents

http://code.google.com/p/hpserver/

虽然主页上介绍"HPServer is a free, open-source light-weighted framework for concurrent networking software. ",但是实际上hpserver完成事情是相当有限的。 类似于libevent,没有管理线程,内部也没有异步队列的实现。所以基本上可以认为就是一个libevent的简单实现。hpserver的特点有下面这些:

hpserver的类设计个人感觉有点麻烦,而且在信号处理方面类设计并不优雅。hpserver引入很多概念,了解这些概念倒是非常有帮助。hpserver设计的时候将 event handler,event item,handle(fd或者是信号编号)分开了,但是阅读代码就会发现这些东西都是1:1:1进行绑定的。对于IO来说还可以接受,但是对于信号处理就非常悲剧了, 因为对于每一种信号必须产生一个EventHandler实例。不过还好我们关注的信号还是比较有限的,所以开辟的EventHandler还不算太多。这个从samples/signal-handler.cpp就可以看出来。

1 Reactor

class CReactor
{
    IReactor_Imp *m_pImp; // real implementation.内部有一个Reactor实现,主要是负责进行poll方面的工作
    std::vector<DTEV_ITEM *> m_vecActiveList[MAX_EV_PRIORITY]; // current active events in each loop.当前活跃事件,注意这里类型是DTEV_ITEM*
    int m_activeHandlers; // 当前有多少个活跃handlers.
    IEventScheduler *m_pScheduler; // 调度策略,就是对于一系列活跃事件处理顺序如何
    CDefScheduler    m_defScheduler; // 默认调度策略
    CDemuxTable   *m_tabIoHandlers; // DemuxTable.用来将fd到DETV_ITEM映射
    CSignalSet    *m_setSigHandlers; // SignalSet,内部是EventHandler到DTEV_ITEM的映射
    CMinHeap      *m_timerHeap; // 最小堆,用来实现定时器
    CSignalInfo   *m_sigInfo; // 信号量和DTEV_ITEM的映射
    struct timeval m_time; // last dispatch return time.上次disptach之前的时间
    struct timeval m_timeCache; // caches dispatch return time.m_timeCache.
    volatile bool m_bRunning, m_bNeedStop; // 是否在运行,是否需要停止
};

2 IReactor

class IReactor_Imp
{
    // initialize I/O method and set event factory
    virtual int Initialize(CReactor *pReactor) = 0;
    virtual int Finalize() = 0;
    virtual int Dispatch(struct timeval *tv) = 0; // 这里的tv表示进行poll的超时时间
    // get kernel mechanism
    virtual const tchar *GetMethod() = 0;

    // register event read/write
    // @event: EV_READ or EV_WRITE
    virtual int RegisterEvent(const DTEV_ITEM *pEvItem, short event) = 0; // event item关注event.
    // unregister event read/write
    // @event: EV_READ or EV_WRITE
    virtual int UnregisterEvent(const DTEV_ITEM *pEvItem, short event) = 0; // event item不关注event.
};

对于这里面Register和Unregsiter内部实现来说会根据pEvItem以前是否在Poller里面注册了来决定是Add,Mod还是Del.从接口上来说还是比较好用的。 在include目录下面有一个IReactor_Imp的实现,有select和epoll两个版本实现。对于我们来说没有必要仔细阅读,了解到这个接口就足够了。

3 DTEV_ITEM

struct DTEV_ITEM // event item structure
{
    const void *pHandler; // event handler/completion handler the handle binded to.绑定到一个Handler上面
    HL_SOCKET      handle; // hande of the event item, in linux it's the same as the index in DTEV_ITEM array
    short events, evActive; // 我们关注的events以及活跃的events.
    short nsigs; // for signals.信号触发了多少次,这个对于pending信号来说有效
    short flags; // event flags.这个item是否取消了
    // for timer event management
    int   heapIndex; // index in timer min-heap.这个item在heap中的索引,对于定时器也会绑定一个event item.
    struct timeval timeout; // next timeout value.下面一个超时时间,绝对时间
    struct timeval period; // period value.周期触发时间
};

4 EventScheduler

class IEventScheduler
{
  public:
    // schedule all active events
    // stored in pVecActiveList[size], actually size is MAX_EV_PRIORITY
    // call CReactor::ScheduleActiveEvent() to run an active event
    virtual void ScheduleActiveEvents(std::vector<DTEV_ITEM *> *pVecActiveList, int size) = 0;
  protected:
    CReactor *m_pReactor;
};

参数就是按照优先级区分的活动链表,然后hpserver内部实现了一个默认的Scheduler非常简单,就是按照优先级然后遍历活动event即可。

void CDefScheduler::ScheduleActiveEvents(std::vector<DTEV_ITEM *> *pVecActiveList, int size){
    // iterate all active lists and process based on scheduler
    for(int i = size-1; i >= 0; --i){ // 按照优先级下来
        if(!pVecActiveList[i].empty()){
            for(int n = 0; n < pVecActiveList[i].size(); ++n){ // 遍历Event然后交给Reactor来执行
                m_pReactor->ScheduleActiveEvent(pVecActiveList[i][n]); // 内部应该就是调用绑定的EventHandler的回调函数
            }
            pVecActiveList[i].clear();
        }
    }
}

这个过程调用的时机,应该是在Reactor每次循环收集到活动事情之后,然后使用这个Scheduler来决定活动事件处理的顺序是怎么样的。

5 DemuxTable

我们注册fd和对应的event handler的时候,那么event item在什么地方开辟呢?这个event item就是在DemuxTable上面开辟。DemuxTable就是一个 将fd映射到event item的地方并且提供每个fd相关的event item.对于内部实现的话非常简单,因为在Linux上面fd都是连续的,所以我们可以使用 数组来映射,而在Windows上面fd不是连续的那么使用std::map来进行映射。这里我们关心Linux上面的,大致看看是怎么样的。

#define ELE_SET_SIZE 4096
#define ELE_SET_SIZE_SHIFT 12
#define ELE_SET_ARRAY_NUM 256
class CDemuxTable
{
    struct ELE_SET
    {
        DTEV_ITEM *item_array;
    };
    ELE_SET m_pTable[ELE_SET_ARRAY_NUM];
};

实现上来看是一个二维数组,然后可以动态地进行分配。每一个DTEV_ITEM开辟的数组内容为4096个,所以fd上限在1048576=1M=1024K.不过对于现在 的网络服务来说的话,通常连接数还不会达到这个数量级别,所以完全OK.

6 SignalSet

对于信号来说,并不是将fd和DTEV_ITEM进行绑定,而是将EventHandler和DTEV_ITEM进行绑定。映射关系就维护在SignalSet里面

class CSignalSet
{
  public:
    typedef std::map<IEventHandler*, DTEV_ITEM> SEH_MAP;
    typedef std::pair<IEventHandler*, DTEV_ITEM> SEH_PAIR;
    inline DTEV_ITEM *GetAt(IEventHandler *pHandler);
    inline int Insert(IEventHandler *pHandler);
    inline void Remove(IEventHandler *pHandler);
  private:
    SEH_MAP  m_mapHandlers;
};

7 SignalInfo

对于我们关心的信号都往这里面注册即可。我们注册的对象是DTEV_ITEM,但是我们可以根据DTEV_ITEM找到对应的handler,然后 找到对应的信号量,然后注册我们关注这个信号。然后SignalInfo里面有ProcessSignals能够遍历捕获到的信号,然后feed_event调用 reactor的active_event函数。SignalInfo应该会自己安装自己的信号处理函数,然后再自己的信号处理函数里进行标记,然后统一交给Reactor处理。

class CSignalInfo
{
  public:
    int ProcessSignal();
  private:
    // 使用这个信号处理函数
    static void sig_handler(int signum);
#ifdef HAVE_SIGACTION
    struct sigaction* m_sigHandler_old[NSIG];
#else
    sighandler_t m_sigHandler_old[NSIG];
#endif
    std::list<DTEV_ITEM*> m_signalHandler[NSIG];
    sig_atomic_t  m_isigCaught[NSIG];
    HL_SOCKET m_socketPair[2]; // 这个是一个管道,通常这个方式可以通知其他线程信号触发了。
};

我们看看sig_handler里面做了什么事情

void CSignalInfo::sig_handler(int signum)
{
    //record occur counts of signum
    if(signum>=0 && signum<NSIG)
    {
        if(g_sigReactor != NULL)
        {
            CSignalInfo *sigInfo = g_sigReactor->GetSigInfo(); // 全局只有一个信号reactor.
            if(sigInfo != NULL)
            {
                sigInfo->m_isigCaught[signum]++; // 触发信号的话那么做一个标记
                char buf[2] = "s";
                send(sigInfo->m_socketPair[0], buf, 1, 0); // 向pipe里面写入内容,其他线程可以被通知到
            }
        }
    }
}

8 How it works

我们主要关注Reactor的RunForever.RunForever里面会一遍一遍地调用EventLoopOnce这个过程,每次调用的时候都会检查是否需要退出。 在EventLoopOnce里面会根据定时器最小堆计算出到下一个定时器触发需要等待多长时间tv,然后调用Poller的Dispatch(tv)方法。 在Dispatch方法里面会使用epoll等待io并且等待tv时间,然后调用SignalInfo::ProcessSignals处理信号,然后调用Reactor::ExpireTimerEvents来处理超时事件, 然后调用ActiveEvent将所有可读可写事件加入m_vecActiveList里面来,然后调用scheduler进行调度。对于ScheduleActiveEvent这个函数, 无非就是分析每个event item是否取消,如果没有取消的话那么调用绑定的handler对应的回调函数。整个过程就是这样的。

comments powered by Disqus