kylin

Table of Contents

kylin是baidu in-house的异步编程框架,提供CPU,Network以及Disk异步操作接口,并且内置许多常用编程组件包括定时器和内存池等。

1 share

公共组件代码

1.1 atomic

最主要实现了atomic add/swap/cas三个操作。

// return old value
static inline int atomic_add(volatile int *count, int add) {
    __asm__ __volatile__(
        "lock xadd %0, (%1);"
        : "=a"(add)
        : "r"(count), "a"(add)
        : "memory");
    return add;
}

// return old value
static inline int atomic_swap(volatile void *lockword, int value) {
    __asm__ __volatile__(
        "lock xchg %0, (%1);"
        : "=a"(value)
        : "r"(lockword), "a"(value)
        : "memory");
    return value;
}

// return old value
// 语义是这样的
// 如果*lockword==comperand,那么*lockword=exchange
// 否则不进行任何操作
// 返回原始的*lockword

// 对于cmpxchg x y的语义是这样的
// 如果y==%%eax,那么x->y.否则不变。然后y(原始)->%%eax
static inline int atomic_comp_swap(volatile void *lockword,
                                   int exchange,
                                   int comperand)
{
    __asm__ __volatile__(
        "lock cmpxchg %1, (%2)"
        :"=a"(comperand)
        :"d"(exchange), "r"(lockword), "a"(comperand));
    return comperand;
}

然后再上面封装了一系列原子操作。封装的一系列原子操作还是比较好理解的。

#define AtomicGetValue(x)    (atomic_comp_swap(&(x), 0, 0))
#define AtomicSetValue(x, v)    (atomic_swap(&(x), (v)))
#define AtomicSetValueIf(x, v, ifn)(atomic_comp_swap(&(x), (v), ifn))
#define AtomicDec(c)    (atomic_add(&(c), -1) - 1)
#define AtomicInc(c)    (atomic_add(&(c), 1) + 1)

1.2 spinlock

spinlock直接使用atomic提供的原子操作来实现,理解起来倒不是很麻烦

static inline void spin_lock(volatile int *lock) {
    int l;
    int i = 10;
    int id = thread_getid();
    //l==0的话说明原来lock==0然后被置为id
    //l==id的话说明原来lock==id那么就不必在进行加锁操作
    for (l=atomic_comp_swap(lock, id, 0);
         l!=0 && l!=id;
         l=atomic_comp_swap(lock, id, 0)
         ) {
        if (i --) {
            nop();
        }
        else {
            // 进行10次nop之后如果没有得到锁的话
            // 那么就直接relinquish CPU
            // #define thread_yield sched_yield
            i = 10;
            thread_yield();
        }
    }
}

// 返回值可以知道之前lock是否锁在自己这里
// 如果为false的话表示自己并没有锁
static inline bool spin_unlock(volatile int *lock) {
    int id = thread_getid();
    return id == atomic_comp_swap(lock, 0, id);
}

static inline bool spin_trylock(volatile int *lock) {
    int id = thread_getid();
    int owner = atomic_comp_swap(lock, id, 0);
    return (owner==0 || owner==id);
}

在spinlock.h下面有一个token实现。token语义非常简单,如果token==0的话那么这个令牌没有被任何人获得, 如果token!=0的话,那么令牌被token标记的对象获取了。token可以是pid,也可以是tid.

static inline int token_acquire(volatile int *token, int id) {
    return atomic_comp_swap(token, id, 0);
}

static inline int token_release(volatile int *token, int id) {
    return atomic_comp_swap(token, 0, id);
}

static inline int token_transfer(volatile int *token, int oldid, int newid) {
    return atomic_comp_swap(token, newid, oldid);
}

static inline int token_set(volatile int *token, int id) {
    return atomic_comp_swap(token, id, *token);
}

1.3 cycle

提供开销更小的计时器,使用读取CPU的time stamp counter.这个内容表示自计算机启动以来的CPU运行周期。

static inline uint64 rdtsc() {
    unsigned int lo, hi;
    /* We cannot use "=A", since this would use %rax on x86_64 */
    __asm__ __volatile__ (
        "rdtsc"
        : "=a" (lo), "=d" (hi));
    return (uint64)hi << 32 | lo;
}

得到周期之后我们必须转换称为时间(s)。周期转换称为时间就是除CPU的主频。得到CPU主频的话没有什么特别好的办法, 一种简单的方法是通过等待1s然后得到tsc差。对于Linux操作系统的话可以通过读取proc文件系统获得

[zhangyan@tc-cm-et18.tc.baidu.com]$ cat /proc/cpuinfo
processor       : 0
vendor_id       : GenuineIntel
cpu family      : 6
model           : 12
model name      : Intel(R) Xeon(R) CPU           E5620  @ 2.40GHz
stepping        : 2
cpu MHz         : 2400.186
cache size      : 256 KB
physical id     : 0
siblings        : 16
core id         : 0
cpu cores       : 16
fpu             : yes
fpu_exception   : yes
cpuid level     : 11
wp              : yes
flags           : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm syscall nx lm pni monitor ds_cpl est tm2 cx16 xtpr
bogomips        : 4803.76
clflush size    : 64
cache_alignment : 64
address sizes   : 40 bits physical, 48 bits virtual
power management:

1.4 support

从这里面我们可以学习到如何进行系统调用,阅读一下<asm/unistd.h>可以找到系统调用号,然后使用syscall来发起。

#include <unistd.h>
#include <sys/syscall.h>
#include <cstdio>
int main() {
    printf("%lu\n",syscall(__NR_gettid));
    return 0;
}

1.5 futex

关于futex的话可以看看下面这些链接

尤其是最后一篇文章可以好好看看,讲到了关于如何使用futex.futex使用需要用户态和内核态的配合,用户态处理一些uncontented case, 而对于contented case的话交给内核态处理。在实际应用上发现大部分情况都是uncontented case都可以在用户态解决而不用陷入内核态。 如果想要深入了解的话,看看pthread里面同步组件的实现。

这里我们简单地介绍一下kylin里面使用futex实现的功能,先看看futex结构

struct futex {
    volatile int lock; // futex shared address
    volatile int count;
};

1.5.1 sema

可以认为是操作系统里面的PV实现.count就是资源数目,lock始终==0.理解起来并不会很麻烦。

static inline int futex_sema_down(struct futex* pf, struct timespec* timeout, bool interruptable) {
    // 首先在用户态尝试取资源
    // 如果n>0的话,说明资源OK,那么就不需要陷入内核态进行wait.
    int n = atomic_add(&pf->count, -1);
    if (n <= 0) {
  retry:
        if (0 == sys_futex(&pf->lock, FUTEX_WAIT, 0, timeout)) {
            return 0;
        }
        switch (errno) {
            case ETIMEDOUT:
                atomic_add(&pf->count, 1);
                return ETIMEDOUT;
            case EINTR:
                if (!interruptable)
                    goto retry;
                atomic_add(&pf->count, 1);
                return EINTR;
            default:
                RaiseError(IMPOSSIBLE__Can_not_lock_in_futex_sema_down);
        }
    }
    return 0;
}
static inline int futex_sema_up(struct futex* pf) {
    int retry;
    // 首先在用户态释放资源
    // 如果n<0的话,说明存在等待资源的waiters,我们必须陷入内核态wakeup.
    int n = atomic_add(&pf->count, 1);
    if (n < 0) {
        retry = 10;
        // 这个地方写得非常仔细需要重试
        while (1 != (n=sys_futex(&pf->lock, FUTEX_WAKE, 1, NULL))) {
            /* it means the downer decreases the count but not yet start waiting
             *   --- may be interrupted near the retry label in the above function;
             * so we have to wait and retry.
             */
            if (retry --) {
                nop();
            }
            else {
                retry = 10;
                thread_yield();
            }
        }
        return n;
    }
    return 0;
}

1.5.2 cond

这里cond和pthread_cond是有差别的,这里的cond没有和任何mutex相关。kylin这里认为count==0的时候,那么condition才被满足。

static inline int futex_cond_wait(struct futex* pf, struct timespec* timeout, bool interruptable) {
    /* I dont know whether it is a bug of linux kernel.
     * Sometimes, sys_futex(.., FUTEX_WAIT, ..) returns 0, but the condition is not satisfied.
     * So we have to check the condition again after return.
     */
    while (0 < AtomicGetValue(pf->count)) {
        sys_futex(&pf->lock, FUTEX_WAIT, 0, timeout);
        switch (errno) {
            case ETIMEDOUT:
                return ETIMEDOUT;
            case EINTR:
                if (interruptable) {
                    return EINTR;
                }
            default:
                break;
        }
    }
    return 0;
}

static inline int futex_cond_signal(struct futex* pf) {
    int n = atomic_add(&pf->count, -1);
    if (1 == n) {
        pf->lock = 1; // 一旦触发之后,那么就不能够再进行wait了。
        mfence_c();
        return sys_futex(&pf->lock, FUTEX_WAKE, 65535, NULL);// I hope 65535 is enough to wake up all
    }
    return 0;
}

1.5.3 event

这里的event名字取得也相当的奇怪。这里count实际上有两个状态,>=0以及<0(LARGE_ENOUGH_NEGATIVE).对于count>=0的状态时候, 可以认为当前是没有signaled的需要wait,如果count为<0(LARGE_ENOUGH_NEGATIVE)的时候是有signal的状态的不需要wait。

static inline int futex_event_wait(struct futex* pf, struct timespec* timeout, bool interruptable) {
    // 如果不是signaled状态的话
    int n = atomic_add(&pf->count, 1);
    if (0 <= n) {
  retry:
        if (0 == sys_futex(&pf->lock, FUTEX_WAIT, 0, timeout))
            return 0;

        switch (errno) {
            case ETIMEDOUT:
                atomic_add(&pf->count, -1);
                return ETIMEDOUT;
            case EINTR:
                if (!interruptable)
                    goto retry;
                atomic_add(&pf->count, -1);
                return EINTR;
            default:
                RaiseError(IMPOSSIBLE__Can_not_lock_in_futex_sema_down);
        }
    }
    else {  // else signaled
        AtomicSetValue(pf->count, LARGE_ENOUGH_NEGATIVE);
    }
    return 0;
}

static inline int futex_event_signal(struct futex* pf, bool reset) {
    int m, n, retry;
    // 看看当前是否signaled
    // 如果没有signal的话,那么需要wakeup这些waiters.
    n = AtomicSetValue(pf->count, reset ? 0 : LARGE_ENOUGH_NEGATIVE);
    if (0 < n) {
        retry = 10;
        m = n;
        do {
            n -= sys_futex(&pf->lock, FUTEX_WAKE, n, NULL);
            if (0 == n)
                return m;
            if (retry --) {
                nop();
            }
            else {
                retry = 10;
                thread_yield();
            }
        } while (1);
    }
    return 0;
}

static inline void futex_event_reset(struct futex* pf) {
    int n, retry = 10;
    do {
        n = AtomicSetValueIf(pf->count, 0, LARGE_ENOUGH_NEGATIVE);
        if (0<=n || LARGE_ENOUGH_NEGATIVE==n) {
            return;
        }
        if (retry --) {
            nop();
        }
        else {
            retry = 10;
            thread_yield();
        }
    } while (1);
}

2 kylin

异步框架代码

2.1 Async

kylin对于用户来说首先需要了解的概念就在Async.h文件里面,主要是下面两个类

typedef void (*JOB_PROC)(Job*);

// 对于Job这个内容我们稍后在ThreadPool部分会有详细分析
struct Job {
    DLINK link; // 使用link的话可以将Job在JobQ中串联起来可以很方便地取消
    JOB_PROC fProc; //线程池里面包含JobQ,每取一个Job出来之后就执行fProc.
};

class CAsyncClient;
struct AsyncContext : Job {
    APF_ERROR nErrCode; // 发起调用之后返回的error_code
    int nAction; // 发起什么调用
    CAsyncClient *pClient; // 应该使用什么client来处理
};

class CAsyncClient
{
protected:
    // m_nId仅仅是一个编号,每次创建一个AsyncClient都会全局+1
    // m_nHostId非常重要,使用这个可以将Job控制丢到哪个线程执行
    int m_nId, m_nHostId;
    volatile int m_nRef;
    CAsyncClient(CAsyncClient* pHost);
    CAsyncClient();
    virtual ~CAsyncClient();
public:
    int GetId() const { return m_nId; }
    int GetAsyncId() const { return m_nHostId; }
    int GetHostThreadId() const;
    bool IsInHostThread() const;
    void SetHost(CAsyncClient* pHost);
    virtual int AddRef() {
        return AtomicInc(m_nRef);
    }
    virtual int Release() {
        return AtomicDec(m_nRef);
    }
    virtual int GetRef() {
        return AtomicGetValue(m_nRef);
    }
    virtual void OnCompletion(AsyncContext* pCtx) = 0; // 用户需要重写这个过程
};

对于用户来说使用过程大致是这样的:

  • 创建一个CAsyncClient client实例.当然是我们自己需要继承CAsyncClient重写自己的类。
  • 创建一个AsyncContext ctx(或者是集成AsyncContext).然后将ctx和client绑定。
  • 发起调用op,传入这个ctx,为了方便理解包装成为Task(op,ctx)放入线程池。可能会设置nAction字段。
  • 线程池取出Task,结合ctx调用op.将op返回值放入APF_ERROR里面。
  • 然后根据ctx关联的client,调用client的OnCompletion方法。
  • 调用OnCompletion方法的话会根据ctx里面的标记,可以直接在工作线程调用,也可以丢入CPU线程调用。

可以看到在实现时候,最好一个client就绑定几个相关的ctx最方便了。这里有一个地方需要特别关注就是引用计数。因为C++本身没有GC实现,所以我们必须自己来管理内存分配和释放。 因为client可以一次多个调用,而在OnCompletion里面根本不知道谁先完成谁后完成,也就不能够确定释放责任了。通过引用计数可以很好地解决这个问题。 如果我们直接继承CAsyncClient的话,内部是有引用计数实现的,非常方便我们只需要如何适当地使用就OK了。关于如何适当使用,谢谢sunxiao同学在这里的建议。

  • 一旦发起一次异步调用,那么首先AddRef().当然需要确保这个调用内部没有帮助我们AddRef.
  • 我们不需要显示地DecRef(),因为这个事情在线程池fProc里面调用了Release.

2.2 ThreadPool

2.2.1 Overview

线程池很简单,取出一个Job出来执行就多了。但是为了更好地理解kylin有必要看看线程池接口/实现。

typedef void (*THREAD_INIT_PROC)(int type, int id); // id表示这个线程的逻辑编号

class CThreadPool
{
    bool m_bShareQ; // 是否所有线程共享一个Q
    int m_nWorkers, m_nMaxWorkers; // 当前线程数和最大线程数
    volatile int m_nJobs; // 当前有多少个Jobs
    thread_t *m_hWorkerThreads; // 每个线程的thread结构
    WorkerContext *m_pContexts; // 每个线程的context
    int m_nType; // 什么类型线程池,TT_EXEC,TT_NETWORK,TT_DISK
    THREAD_INIT_PROC m_fInit; // 线程初始化回调函数

    int _AddWorker(int nAdd); // 增加多少个工作线程
    int _DelWorker(int nDel, bool bFinal); // 取消多少个工作线程
  public:
    CThreadPool(int type, int nMaxWorkers, bool bShareQ);
    virtual ~CThreadPool();
    int Start(int nWorkers, THREAD_INIT_PROC fInit=NULL);
    void Stop();
    void QueueJob(Job* pJob, int nWhich);
    void QueueEmergentJob(Job* pJob, int nWhich);
    bool CancelJob(Job* pJob, int nWhich);
};

对于线程池部分的话我们比较关心这么几件事情:

  • 如何增加删除线程的
  • 线程是如何进行工作的
  • 如何往线程里面增加取消任务

2.2.2 How Thread Works

了解线程是怎么工作的,可以看看线程执行的函数是怎么定义的

static void*
WorkerProc(void* pData)
{
    WorkerContext* pCtx = (WorkerContext*)pData;
    JobQ* pJobQ = pCtx->pJobQ;
    Job* pJob;

    TRACE4("%s worker#%d started...\n", ThreadType2Str(pCtx->type), pCtx->id);
    pCtx->thread_id = thread_getid();
    if (pCtx->fInit) { // 如果有初始化函数的话那么执行初始化函数
        pCtx->fInit(pCtx->type, pCtx->id);
    }
    while (1) {
        pJob = pJobQ->pop_front(); // 每次得到一个Job
        ASSERT_NOT_EQUAL((Job*)NULL, pJob);
        if (pJob->fProc != 0) { // 如果是普通Job的话那么是调用里面的Job::fProc过程
            pCtx->bDoing = true;
            pJob->fProc(pJob);
            pCtx->bDoing = false;
        }
        else { // 否则是控制Job,主要是用于结束线程使用的
            ControlJob* pCtl = (ControlJob*)pJob;
            if (!pCtl->fProc(pCtl, pCtx)) {
                break;
            }
        }
    }
    TRACE4("%s worker#%d stopped.\n", ThreadType2Str(pCtx->type), pCtx->id);
    return NULL;
}

普通的Job会在每个Man里面单独提到,我们看看控制Job是怎么定义的。在ThreadPool里面就有一个TermianationJob.

struct TerminationJob : ControlJob {
    int id;
};

static bool
TerminateWorker(ControlJob* pCtl, WorkerContext* pCtx)
{
    TerminationJob* pT = (TerminationJob*)pCtl;
    if (pT->id!=-1 && pT->id!=pCtx->id) { // 如果因为共享队列而没有让对应线程得到Job的话,那么重新放入这个Job.
        pCtx->pJobQ->push_back((Job*)pCtl); // should be shared queue
        thread_yield();                     // re-enqueue this job until the owner consumes it
        return true;
    }
    return false;
}

通过这种方式来通知线程主动退出。理论上因为shared Queue可能会造成所有永远不会退出但是实际应该不会。

2.2.3 AddWorker & DelWorker

AddWorker非常简单

int CThreadPool::_AddWorker(int nAdd)
{
    int i;

    for (i=0; i<nAdd && m_nWorkers<m_nMaxWorkers; i++) {
        m_pContexts[m_nWorkers].fInit = m_fInit;
        if (m_pContexts[m_nWorkers].pJobQ == NULL) {
            m_pContexts[m_nWorkers].pJobQ = new JobQ; // 会为每一个WorkerContext分配一个JobQ.对于共享Q的话在初始化就分配好了。
        }
        if (0 != thread_create(&m_hWorkerThreads[m_nWorkers], NULL, WorkerProc, &m_pContexts[m_nWorkers])) { // 然后启动线程即可
            PERROR("thread_create");
            break;
        }
        m_nWorkers ++;
    }
    return i;
}

DelWorker因为有ControlJob的辅助所以可以很好地解决,只需要在每个线程后面增加一个TerminationJob即可

int CThreadPool::_DelWorker(int nDel, bool bFinal)
{
    TerminationJob *pTerminations = new TerminationJob[nDel];
    int i;

    TRACE4("%s start terminating %d workers...\n", ThreadType2Str(m_nType), nDel);
    for (i=0; i<nDel && m_nWorkers>0; i++) {
        m_nWorkers --;
        DLINK_INITIALIZE(&pTerminations[m_nWorkers].link);
        pTerminations[m_nWorkers].fZero = 0;
        pTerminations[m_nWorkers].fProc = TerminateWorker;
        pTerminations[m_nWorkers].id = bFinal ? -1 : m_nWorkers;
        m_pContexts[m_nWorkers].pJobQ->push_back((Job*)&pTerminations[m_nWorkers]);
    }
    for (int j=m_nWorkers; j<i+m_nWorkers; j++) {
        TRACE4("%s wait for worker #%d.\n", ThreadType2Str(m_nType), j);
        thread_join(m_hWorkerThreads[j], NULL);
    }
    TRACE4("%s end terminating workers.\n", ThreadType2Str(m_nType));
    delete[] pTerminations;
    return i;
}

2.2.4 QueueJob & CancelJob

相对来说QueueJob也更加简单一些,直接投递到某个线程对应的WorkerContext里面即可。

void QueueJob(Job* pJob, int nWhich) {
     int nJobs = atomic_add(&m_nJobs, 1);
    if (-1 == nWhich) {
        nWhich = nJobs % m_nWorkers;
    }
    atomic_add(&m_pContexts[nWhich].nJobs, 1);
    m_pContexts[nWhich].pJobQ->push_back(pJob);
}

而CancelJob则是通过加锁替换这个Job来完成的,还是比较精巧的

static void
DoNothing(Job* pJob)
{
    free(pJob);
}

bool CThreadPool::CancelJob(Job* pJob, int nWhich)
{
    Job* p = ZeroAlloc<Job>(); // 分配一个Job,而DoNothing就是将其释放掉
    p->fProc = DoNothing;
    if (m_pContexts[nWhich].pJobQ->replace(pJob, p)) { // replace这个工作是一个加锁完成的
        return true;
    }
    free(p); // 如果没有Cancel的话那么返回失败但是也会释放掉内存
    return false;
}

2.3 TranBuf

TranBuf.h CTranBufPool是一个内存分配器。对于很多系统来说,合理地使用资源是非常必要的。

作者linsd对于内存分配器看法是这样的:

要得到稳定的高吞吐,对内存的合理使用是必要条件。是否用Ring Buffer倒不一定,简单的buffer pool效果也差不多。另外,为了应付极限情况,还需要为buffer请求分级,当资源不足时优先给紧急请求。也可设定高低几条watermark,让各种复杂条件下的资源使用变得平顺。

了解一下真实系统里面定制化的内存分配器是非常有帮助的(相对应地来说 TCMalloc 是通用内存分配器).

2.3.1 Overview

首先看看CTranBufPool的数据结构,看看里面每个字段含义和作用.对于TranBuf来说的话内部 本质还是一个sample allocator,也是按照固定的BlockSize来进行分配的。构造函数可以看到水位线三个阈值都是0.

class CTranBufPool : public CBufPoolV {
  struct Handle { // 每个BlockSize字节内存内存由一个Handle管理.
    DLINK link; // 分配出来之后多个Handle组成环形双向链表.
    char* pBuffer; // 一个BlockSize的内存.
    Handle* pRealHdl; // 真实Handler.这个会在后面解释.
    int nRef; // 引用计数.
    int nConsBuf; // 对于自己引用的pBuffer后面还有多少个连续内存.
  };
  typedef TLinkedList<Handle> FreeList; //
  typedef std::map<char*, Handle*> BufferMap; // buffer和Handle映射.

  FreeList m_FreeList;
  BufferMap m_BufferMap;

  // m_nBlockSize 每个sample object即BlockSize
  // m_nBufferSize 1次连续开辟多少字节.
  // m_nBlockBase log2(BlockSize)
  int m_nBlockSize, m_nBufferSize, m_nBlockBase;

  // m_nAlloc 一次开辟多少个BlockSize.其中m_nBufferSize=nAlloc*m_nBlockSize
  // m_nMaxBuffers 最多分配多少个Blcok
  // m_nBuffers 当前分配了多少个Block
  // m_nWaterMarks 分为3个水位线
  int m_nAlloc, m_nMaxBuffers, m_nBuffers, m_nWaterMarks[3];

  // m_nMin. 一开始至少分配m_nMin*nAlloc个Block
  // m_nMax 最多分配m_nMax*nAlloc哥block.其中m_nMaxBuffers=m_nMax*nAlloc.
  int m_nMin, m_nMax;
};

  CTranBufPool(const char* name, int nCategory) : CBufPoolV(name, nCategory) {
    m_nBuffers = 0;
    m_nBlockSize = m_nBufferSize = m_nAlloc = m_nMaxBuffers = m_nMin = 0;
    m_nWaterMarks[0] = m_nWaterMarks[1] = m_nWaterMarks[2] = 0;

可以看到TranBuf分配方式是每次分配nAlloc个Block(这个过程在后面叫做AllocOnce).每个Block是BlockSize字节. 然后至少分配m_nMin*nAlloc(首先调用m_nMin个AllocOnce过程),最多分配m_nMax*nAlloc个Block.每个内存 不够的话都会调用AllocOnce这个过程。

这里稍微解释一下RealHdl这个字段的意思。对于单个Block分配出来的内存块,RealHdl==this.但是如果是 连续跨越多个Block内存快的话,那么每个Block对应的Handle里面RealHdl对应的是首地址的Handle.这样做的好处就是, 如果希望对这个内存块增加或者是减少引用计数的话,只是指引到一个Handle,对里面字段修改引用计数。否则的话, 需要遍历每个Block对应的Handle修改引用技术。

2.3.2 Create

大部分Create代码都是在设置参数,最后调用m_nMin次AllocOnce来分配初始的内存块。

bool Create(int nBlockSize, int nAlloc, int nMin, int nMax, double fRatio1, double fRatio2) {
  m_nUnitSize = nBlockSize;
  m_nBlockSize = nBlockSize;
  m_nBlockBase = Log_2(nBlockSize);
  if (-1 == m_nBlockBase) {
    TRACE0("Fatal: invalid block size of %d\n", nBlockSize);
    return false;
  }
  m_nAlloc = nAlloc;
  m_nMaxBuffers = nMax * nAlloc;
  m_nBufferSize = m_nBlockSize * m_nAlloc;
  m_nBuffers = 0;
  m_nMax = nMax;
  m_nMin = nMin;
  if (0 != fRatio1 && 0 != fRatio2) {
    m_nWaterMarks[0] = (int)((double)m_nMaxBuffers * fRatio1);
    m_nWaterMarks[1] = (int)((double)m_nMaxBuffers * fRatio2);
    m_nWaterMarks[2] = m_nMaxBuffers - 1;
  }
  for (int i = 0; i < m_nMin; i++) {
    if (!AllocOnce()) // 注意这里没有必要回滚,每次成功都会记录状态,在Destroy里面会释放掉。
      return false;
  }
  return true;
}

2.3.3 AllocOnce

之前说过AllocOnce是分配一个连续内存块,每个Block大小是m_nBlockSize,而个数是nAlloc. 同时还需要分配nAlloc个Handle.每个Handle管理一个Block.

bool AllocOnce() {
  char* pBuffer = (char*)AlignAlloc(m_nBlockSize, m_nBufferSize);
  Handle* pHdl = (Handle*)ZeroAlloc(m_nAlloc * sizeof(Handle));
  if (pBuffer && pHdl) {
    m_BufferMap.insert(BufferMap::value_type(pBuffer, pHdl)); // 记录下这个连续块的内存地址和Handle地址.
    // 在Destroy时候有用.
    m_nBuffers += m_nAlloc;
    pBuffer += m_nBufferSize - m_nBlockSize;
    pHdl += m_nAlloc - 1;

    for (int i = 0; i < m_nAlloc; i++) { // 然后将我所有的Block加入到链表里面去.
      pHdl->pBuffer = pBuffer;
      pHdl->nRef = 0;
      pHdl->nConsBuf = i + 1;
      pHdl->pRealHdl = pHdl;
      m_FreeList.push_back(pHdl); // 对于ConsBuf大的Handle放在链表最后.
      // 从后面内存分配策略就可以发现,对于分配连续Handle的话都是从最后开始的。

      pBuffer -= m_nBlockSize;
      pHdl --;
    }
    return true;
  }
  if (pBuffer)
    free(pBuffer);
  if (pHdl)
    free(pHdl);
  return false;
}

2.3.4 GetHandle

GetHandle是通过传入buffer首地址来确定管理这个buffer的Handle.但是注意不是RealHdl. 如果需要对这个内存做引用计数的话,应该是对RealHdl做引用计数。可以看看下面的AddRef实现。

Handle* GetHandle(char* pBuffer) {
  BufferMap::iterator it = m_BufferMap.upper_bound(pBuffer);
  if (it != m_BufferMap.begin()) {
    it --;

    char* pHead = it->first;
    ASSERT(pHead <= pBuffer);
    if (pBuffer < pHead + m_nBufferSize) {
      int n = (pBuffer - pHead) >> m_nBlockBase;
      Handle* pHdl = it->second + n;
      ASSERT(pHdl->pBuffer == pHead + (((uint32)n) << m_nBlockBase));
      return pHdl;
    }
  }
  return NULL;
}

2.3.5 AddRef

对某块内存进行引用计数。并且强大的是这个内存地址不必是分配的首地址,可以是连续内存内部任意地址。

int AddRef(char* p, bool bCheck = false) {
  Handle* pHdl = GetHandle(p);
  if (NULL == pHdl) {
    if (!bCheck) {
      return -1;
    }
    RaiseError(Invalid_Block);
  }

  int n = ++ pHdl->pRealHdl->nRef;
  ASSERT(2 <= n);
  return n;
}

2.3.6 Destroy

Destroy是将AllocOnce分配的内存和Handle全部回收。因为得到了所有分配内存和Handle的起始地址 保存在map里面所以释放并不麻烦.

void Destroy() {
  m_FreeList.Init();
  m_nBuffers = 0; // 将分配计数清零.

  BufferMap::iterator it;
  for (it = m_BufferMap.begin(); it != m_BufferMap.end(); it++) {
    free(it->first);
    free(it->second);
  }
  m_BufferMap.clear();
}

2.3.7 Allocate

分配内存。可以从参数里面看出来语义是说分配多少个Block.nPriority参数是说使用哪个水位线。 如果超过水位线的话,那么会使用相应的策略来处理(打印日志)。

// 从freelist里面分配一个block出来.
#define _ALLOC_TRAN_BUF(p, how)                     \
  p = m_FreeList.how();                           \
  ASSERT(DLINK_IS_STANDALONE(&p->link));          \
  ASSERT(0 == p->nRef);                           \
  ASSERT(p->pRealHdl == p);                       \
  p->nRef = 1

  char* Allocate(uint32 nPriority, int count = 1) {
    int n;
    ASSERT(0 != count);
    // 会尝试分配两次。第一次不进行AllocOnce.如果第一次失败的话那么第二次会尝试。
    for (int i = 0; i < 2; i++) {
      n = (int)m_FreeList.size();
      // 如果当前分配内存大于water mark的话会打印日志,但是为了过快的打印这里控制了打印间隔
      // 从这里可以看到这个是非多线程的。从后面BufHandle使用来看确实是这样的。
      if (m_nBuffers - n > m_nWaterMarks[nPriority]) {
        if (nPriority != 0) {
          static time_t last = 0;
          time_t now = time(NULL);
          if (now - last >= 30) {   // avoid too frequent print
            int n1 = m_nMaxBuffers - m_nBuffers + n;
            int n2 = m_nMaxBuffers - m_nWaterMarks[nPriority];
            TRACE0("Warning: available tran buf (#%d) touches watermark(#%d, %.f%%)\n",
                   n1, n2, (double)(n1 * 100) / m_nMaxBuffers);
            last = now;
          }
        }
        return NULL;
      }
      if (n >= count) {  // 如果free list里面内容>=count的话,但是有可能没有连续内存用来分配。
        Handle* pHdl, *pTmp;
        if (1 == count) { // 如果分配1个的话,那么直接从前面分配
          _ALLOC_TRAN_BUF(pHdl, pop_front);
          return pHdl->pBuffer;
        }
        // 否则会从后面分配,因为后面Consecutive Buffer的概率会更高。
        // Big block are formed by multiple consecutive blocks.
        // We try from the tail of free list, which brings higher probability.
        _ALLOC_TRAN_BUF(pHdl, pop_back);
        int i = 1;
        if (pHdl->nConsBuf >= count) { // 看看最后的Handle的consectutive number是否足够.
          for ( ; i < count; i++) { // 并且看看是否被占用(通过引用计数判断).这里没有细看链表的组织。
            pTmp = pHdl + i;
            UNLIKELY_IF (0 != pTmp->nRef) {
              break;
            }
            m_FreeList.remove(pTmp);
            DLINK_INSERT_PREV(&pHdl->link, &pTmp->link);
            pTmp->pRealHdl = pHdl;
            pTmp->nRef = 1;
          }
        }
        if (i == count) { // 如果分配OK的话,那么返回
          return pHdl->pBuffer;
        } else { // 否则的话那么需要进行回滚.
          for (int j = 0; j < i; j++) {
            pTmp = pHdl + j;
            DLINK_INITIALIZE(&pTmp->link);
            pTmp->pRealHdl = pTmp;
            pTmp->nRef = 0;
            m_FreeList.push_front(pTmp);
          }
        }
      }
      // 如果分配内存超限或者是AllocOnce分配失败的话,那么直接返回。
      if (m_nBuffers >= m_nMaxBuffers || !AllocOnce()) {
        return NULL;
      }
    }
    return NULL;
  }

2.3.8 Free

#ifdef  _DEBUG
#define _FREE_TRAN_BUF(p, how)                              \
  memset(p->pBuffer, 0xCC, m_nBlockSize);             \
  m_FreeList.how(p)
#else
#define _FREE_TRAN_BUF(p, how)                              \
  m_FreeList.how(p)
#endif

  int Free(char* p, bool bCheck = false) {
    Handle* pHdl = GetHandle(p);
    if (NULL == pHdl) {
      if (bCheck) {
        RaiseError(Invalid_Block);
      }
      return -1;
    }

    pHdl = pHdl->pRealHdl;
    int n = -- pHdl->nRef; // 修改引用计数。
    if (0 == n) {
      Handle* pTmp = dlink_get_prev(pHdl);
      if (pTmp == pHdl) { // 如果是一个Block的话.
        ASSERT_EQUAL(pHdl->pRealHdl, pHdl);
        ASSERT_EQUAL(0, pHdl->nRef);
        _FREE_TRAN_BUF(pHdl, push_front);
        return 0;
      }
      // here comes big block
      Handle* p = pHdl; // 我们知道这个Handle组织称为环形双向链表。
      // 同样按照AllocOnce的顺序,将consecutive number大的handle放在末尾.
      do {
        pHdl = pTmp;
        pTmp = dlink_get_prev(pTmp);
        ASSERT_EQUAL(1, pHdl->nRef);
        ASSERT_EQUAL(p, pHdl->pRealHdl);
        pHdl->pRealHdl = pHdl;
        pHdl->nRef = 0;
        DLINK_INITIALIZE(&pHdl->link);
        _FREE_TRAN_BUF(pHdl, push_back);
      } while (p != pTmp);
      ASSERT_EQUAL(p, p->pRealHdl);
      ASSERT_EQUAL(0, p->nRef);
      DLINK_INITIALIZE(&p->link);
      _FREE_TRAN_BUF(p, push_back);
      return 0;
    }
    return n;
  }

2.4 BufHandle

如果说TranBuf是底层内存分配器的话,那BufHandle就是应用层的内存分配器。BufHandle底层是通过 两个TranBuf来进行分配的。BufHandle本质上是chained的形式,主要是为了节省mem copy以及适应 network IO app的。通过全局的BufHandlePool对象来分配内存。

2.4.1 OverView

首先我们看看BufHandle结构以及提供的API.

struct BufHandle {
  BufHandle* _next; // 链式指针.
  char* pBuf; // 管理的内存.
  int nBufLen;      // available buffer length 可用长度
  int nDataLen;     // occupied data length 占用长度
};

// 从[pHdl,pNext)这个区间上面回收nLen长度出来分配出去.
BufHandle* Reclaim(int nLen, BufHandle* pHdl, BufHandle* pNext);

// 设置TranBuf的参数.这个应该在Kylin调用之前就设置好,如果打算使用BufHandle的话。
void SetTranBuf(int nSmallNum, int nBigNum,
                int nSmallSize = 4096,
                float fLowMark = 0.6f,
                float fHighMark = 0.9f);

// NOTICE:这里如果不允许失败的话,那么就会直接抛出异常.
// inPool表示这个buf是否在pool里面如果是的话那么可以直接使用引用计数优化减少copy
// pBuf表示src内存地址,nLen表示src内存长度.pNext表示allocate handle之后next字段值.
// 如果不是inPool的话,那么从TranBufPool里面分配.
BufHandle* AllocateHdl(bool bInPool = false, char* pBuf = NULL,
                       int nLen = 0, BufHandle* pNext = NULL);
// 从TranBufPool里面分配允许失败.
BufHandle* AllocateHdlCanFail(int nSize = 0);
// 从big pool里面分配1个block.
BufHandle* AllocateBigHdl();
BufHandle* AllocateBigHdlCanFail();

// 释放这个Handle.
void FreeHdl(BufHandle* pHdl);
// 链式释放[pHdl,pNext)的链式里面的空间.
void ChainFreeHdl(BufHandle* pHdl, BufHandle* pNext);
// 这个名字取得不太好听,本质来说就是进行Clone
// pnLen数据长度是多少.bCopyNonTranBuf表示如果不能够做引用计数的话,是否需要copy.
BufHandle* CloneHdlAndTerminate(BufHandle* pHdl, BufHandle* pNext,
                                int* pnLen = NULL, bool bCopyNonTranBuf = true);

2.4.2 SetTranBuf

首先我们先看看CBufHandlePool的结构然后在看这个API

// 继承于TObjectPool对象池可以直接高效分配出BufHandle对象出来.
class CBufHandlePool : public TObjectPool<BufHandle> {
  volatile int m_lock; // 多线程安全.
  CTranBufPool m_TranBufPool; // tran buf pool
  CTranBufPool m_BigBufPool; // big buf pool
};

  CBufHandlePool() : TObjectPool<BufHandle>("BufHandle", BUFPOOL_C2),
    m_TranBufPool("TranBuffer", BUFPOOL_C1),
    m_BigBufPool("BigBuffer", BUFPOOL_C1) {
    m_lock = 0;
    Create(1024, 1);

    int nAlloc = s_nTranBuf;
    int nMax = 1;
    // 一次不要分配超过512M.但是为了保持内存总量允许nMax增大.
    while ((s_nBufSize / 1024) * nAlloc > 524288) { /* Max alloc: 512M */
      nAlloc >>= 1;
      nMax <<= 1;
    }
    // tranbuf设置参数.
    m_TranBufPool.Create(s_nBufSize, nAlloc, 1, nMax, s_fLowMark, s_fHighMark);
    // 可以看到big buf的block size非常大.并且watermark非常高.分配次数在[0,10]之间.
    m_BigBufPool.Create(SZ_BIG_BUF, s_nBigTranBuf, 0, 10, 0.9, 0.9);
  }

// 单例模式.
static CBufHandlePool* s_pBufHandlePool = NULL;
static CBufHandlePool* GetBufHdlPool() {
  if (NULL != s_pBufHandlePool) {
    return s_pBufHandlePool;
  } else {
    LOCK_THIS_BLOCK;
    if (NULL == s_pBufHandlePool) {
      s_pBufHandlePool = new CBufHandlePool;
    }
    return s_pBufHandlePool;
  }
}

然后来看看这些参数是来如何设置的.

int s_nTranBuf = 1024;
int s_nBufSize = 4096;
int s_nBigTranBuf = 64;
float s_fLowMark = 0.6f;
float s_fHighMark = 0.9f;

void SetTranBuf(int nSmallNum, int nBigNum, int nSmallSize, float fLowMark, float fHighMark) {
  LOCK_THIS_BLOCK;

  s_nTranBuf = nSmallNum; // tran buf应该每次alloc多少个block.
  s_nBigTranBuf = nBigNum; // big tran buf每次应该allocate多少个block.
  s_nBufSize = nSmallSize; // tran buf的blocksize.
  s_fLowMark = fLowMark;
  s_fHighMark = fHighMark;
}

2.4.3 DoAllocate

这个是底层确保一定分配成功API(如果失败抛异常).来看看实现.使用hang住当前操作等待其他线程归还内存.

// 从什么pool里面进行分配,尝试多少次分配.
BufHandle* DoAllocate(CTranBufPool* pPool, int nRetry) {
  BufHandle* pHdl;

  for (int i = 0; i < nRetry; i++) {
    LOCK;
    pHdl = TObjectPool<BufHandle>::Allocate(); // 首先从对象池里面分配BufHandle对象.
    pHdl->pBuf = pPool->Allocate(i > 0 ? 2 : 1); // 然后从tran buf pool里面分配.
    // 注意这里第一次按照water mark1来分配,之后按照water mark2来分配.
    if (NULL == pHdl->pBuf) { // 如果分配失败的话,那么返回对象池.
      TObjectPool<BufHandle>::Free(pHdl);
      pHdl = NULL;
    }
    UNLOCK;
    if (NULL != pHdl) // 如果成功直接返回.
      return pHdl;
    if (i > 1) {
      TRACE0("No enough memory, sleep %d\n", i + 1);
    }
    sleep(1); // 否则会hang住等待释放.
  }
  RaiseError(TODO_NO_ENOUGH_MEMORY); // 如果没有分配成功那么就会抛出异常.
  return NULL;
}

2.4.4 DoAllocateCanFail

底层不一定保证分配成功,可能返回NULL表示失败.只是尝试一次分配.

BufHandle* DoAllocateCanFail(CTranBufPool* pPool, int nSize) {
  BufHandle* pHdl;
  int nBlockSize = pPool->GetBlockSize();
  ASSERT(0 != nSize);

  LOCK;
  pHdl = TObjectPool<BufHandle>::Allocate();
  // 以water mark0为标记.
  if (nSize == nBlockSize) {
    pHdl->pBuf = pPool->Allocate(0);
  } else {
    pHdl->pBuf = pPool->Allocate(0, (nSize + nBlockSize - 1) / nBlockSize);
  }
  if (NULL == pHdl->pBuf) {
    TObjectPool<BufHandle>::Free(pHdl);
    pHdl = NULL;
  }
  UNLOCK;
  return pHdl;
}

2.4.5 _DoAddRef

对于BufHandle的引用技术和TranPool引用计数有点不同,并且平时思考的也不同。BufHandle的引用计数 只是针对头部的BufHandle增加计数而共用其他部分的BufHandle.

(NOTICE):(不过在外部调用可以看到,CloneAndTerminate实际上也还是遍历了所有的Handle做引用计数).

BufHandle* _DoAddRef(BufHandle* pHdl, BufHandle* pNext, BufHandle** * pppLast) {
  if (-1 != m_TranBufPool.AddRef(pHdl->pBuf) || -1 != m_BigBufPool.AddRef(pHdl->pBuf)) {
    BufHandle* pTmp = TObjectPool<BufHandle>::Allocate();
    pTmp->_next = pNext;
    pTmp->pBuf = pHdl->pBuf;
    pTmp->nBufLen = pHdl->nDataLen;
    pTmp->nDataLen = pHdl->nDataLen;
    *pppLast = &pTmp->_next;
    return pTmp;
  }
  return NULL;
}

2.4.6 _DoFree

只是释放单个BufHandle对象.

void _DoFree(BufHandle* pHdl) {
  if (-1 == m_TranBufPool.Free(pHdl->pBuf))
    m_BigBufPool.Free(pHdl->pBuf);
  TObjectPool<BufHandle>::Free(pHdl);
}

2.4.7 AllocateBig

从BigTranBufPool里面分配大块内存.注意对于大块内存而言的话只允许分配一个Block.

BufHandle* AllocateBig(bool bCanFail) {
  BufHandle* pHdl;

  pHdl = bCanFail
         ? DoAllocateCanFail(&m_BigBufPool, SZ_BIG_BUF)
         : DoAllocate(&m_BigBufPool, 60); // 60s的延迟.
  if (pHdl) {
    pHdl->_next = NULL;
    pHdl->nBufLen = SZ_BIG_BUF;
    pHdl->nDataLen = 0;
  }
  return pHdl;
}

2.4.8 AllocateCanFail

从TranBufPool里面分配连续内存出来.

BufHandle* AllocateCanFail(int nSize) {
  BufHandle* pHdl = DoAllocateCanFail(&m_TranBufPool, nSize);
  if (pHdl) {
    pHdl->_next = NULL;
    pHdl->nBufLen = nSize;
    pHdl->nDataLen = 0;
  }
  return pHdl;
}

2.4.9 AllocForBuf

为某个buf分配内存.把buf内容copy进来.并且设置pNext.pppLast表示最后一个节点的next字段指针(三指针比较难理解…)

BufHandle* AllocForBuf(char* pBuf, int nLen, BufHandle* pNext, BufHandle** * pppLast) {
  BufHandle* pFirst, *pHdl, **ppLast;

  pFirst = NULL;
  ppLast = &pFirst;
  while (nLen > 0) {
    pHdl = DoAllocate(&m_TranBufPool, 120); // 120s延迟.

    pHdl->nBufLen = s_nBufSize;
    pHdl->nDataLen = nLen > s_nBufSize ? s_nBufSize : nLen;
    memcpy(pHdl->pBuf, pBuf, pHdl->nDataLen);
    pBuf += pHdl->nDataLen;
    nLen -= pHdl->nDataLen;

    pHdl->_next = pNext; // 设置next字段内容
    *ppLast = pHdl;
    ppLast = &pHdl->_next; // 并且得到最后一个item的next字段指针.
    // 不过因为设置了pNext所以感觉不是特别有用.
  }
  if (pppLast) {
    *pppLast = ppLast;
  }
  return pFirst;
}

2.4.10 Allocate

// 如果是inpool的话,那么pubuf必须是pool分配出来的,
// 那么我们只是针对这个buffer做一个引用计数

// 如果不是inpool的话,nLen==0或者是pBuf==NULL,分配出一个空单元出来.
// 否则需要做一个内存copy.使用上面AllocForBuf的API.
BufHandle* Allocate(bool bInPool = false, char* pBuf = NULL,
                    int nLen = 0, BufHandle* pNext = NULL
                   ) {
  BufHandle* pHdl;

  UNLIKELY_IF (false == bInPool) {
    LOCK;
    pHdl = TObjectPool<BufHandle>::Allocate();
    if (-1 == m_TranBufPool.AddRef(pBuf))
      m_BigBufPool.AddRef(pBuf);
    UNLOCK;

    pHdl->_next = pNext;
    pHdl->pBuf = pBuf;
    pHdl->nBufLen = nLen;
    pHdl->nDataLen = nLen;
    return pHdl;
  }
  if (pBuf == NULL || nLen == 0) {
    pHdl = DoAllocate(&m_TranBufPool, 120);

    pHdl->_next = pNext;
    pHdl->nBufLen = s_nBufSize;
    pHdl->nDataLen = nLen;
    return pHdl;
  }

  return AllocForBuf(pBuf, nLen, pNext, NULL);
}

2.4.11 ChainFree

释放[pHdl,pNext)链上的所有item.

void ChainFree(BufHandle* pHdl, BufHandle* pNext) {
  BufHandle* pTmp;
  LOCK;
  for ( ; pHdl != pNext; pHdl = pTmp) {
    ASSERT(NULL != pHdl);
    pTmp = pHdl->_next;
    _DoFree(pHdl);
  }
  UNLOCK;
}

2.4.12 CloneAndTerminate

这个API的语义在之前已经解释过了,来看看代码.

BufHandle* CloneAndTerminate(BufHandle* pHdl, BufHandle* pNext,
                             int* pnLen, bool bCopyNonTranBuf
                            ) {
  BufHandle* pFirst, *pTmp, **ppLast, **ppLastTmp;
  int nLen = 0;

  pFirst = NULL;
  ppLast = &pFirst;
  LOCK;
  for ( ; pHdl != pNext; pHdl = pHdl->_next) {
    pTmp = _DoAddRef(pHdl, NULL, &ppLastTmp); // 看看是否可以在直接做引用计数.
    if (NULL == pTmp) {
      if (bCopyNonTranBuf) { // 如果需要copy出来的话.
        UNLOCK;
        pTmp = AllocForBuf(pHdl->pBuf, pHdl->nDataLen, NULL, &ppLastTmp);
        LOCK;
      } else { // 如果显示说不copy只是引用内存的话,那么只是开辟Handle对象.
        pTmp = TObjectPool<BufHandle>::Allocate();
        pTmp->pBuf = pHdl->pBuf;
        pTmp->nDataLen = pTmp->nBufLen = pHdl->nDataLen;
        pTmp->_next = NULL;
        ppLastTmp = &pTmp->_next;
      }
    }
    nLen += pHdl->nDataLen;
    *ppLast = pTmp;
    ppLast = ppLastTmp;
  }
  UNLOCK;

  if (pnLen) {
    *pnLen = nLen;
  }
  if (nLen) {
    return pFirst;
  }
  // 如果失败的话那么释放已经分配出来的.
  ChainFreeHdl(pFirst, NULL);
  return NULL;
}

2.5 Kylin

这个模块主要负责框架的启动和停止,做了一些琐碎的事情方便用户,主要是下面这两个函数

// 启动框架,使用多少个CPU,网络和磁盘线程,至少1个CPU和1个网络线程
// f表示线程初始化函数
// nTimerPrecision会影响到定时器实现.如果超时在时间精度一下的话都会通过ExecMan直接触发
// 否则都会必须通过RunTimer来进行检查
APF_ERROR InitKylin(int nExecWorkers, int nNetWorkers, int nDiskWorkers,
                    THREAD_INIT_PROC f, uint32 nTimerPrecision);
// bWait表示是否等待ExecMan的线程池正常停止,这个会在ExecMan部分提到
APF_ERROR StopKylin(bool bWait);

对于InitKylin里面事情就是启动几个Manager,还做了一件tricky事情就是将SIGPIPE信号忽略了。而StopKylin就是停止这些Manager.我们需要仔细关注的就是这些Manager的启停。

2.6 ExecMan

2.6.1 Overview

我们首先看看ExecMan的接口

#define g_pExecMan CExecMan::Instance() // 直接使用宏g_pExecMan就可以单例

class CExecMan
{
    DECLARE_SINGLETON(CExecMan) // 单例模式
    public:
    ~CExecMan();
    APF_ERROR Start(int nWorkers, THREAD_INIT_PROC fInit, uint32 nTimerPrecision);
    void Stop(bool bWait);

    // 插入一个任务
    APF_ERROR QueueExec(AsyncContext* pCtx, bool bClientReferred);
    // 插入一个紧急任务
    APF_ERROR QueueExecEmergent(AsyncContext* pCtx, bool bClientReferred);
    // todo:
    APF_ERROR ProxyExec(int nAckCode, CAsyncClient* pClient, PROXY_EXEC_PROC fProc, ProxyExecCtx* pCtx);
    // 提交一个定时器任务
    APF_ERROR DelayExec(int nAction, CAsyncClient* pClient, uint32 nMilliseconds, AsyncContext* pCtx);
    // 取消一个任务
    APF_ERROR CancelExec(AsyncContext* pCtx);
    // 检查定时器
    void RunTimer();

  private:
    CThreadPool m_ThreadPool;
    volatile int m_nCurJobs; // 在运行期间有多少Job正在被提交
};

2.6.2 Start & Stop

Start逻辑很简单,包括计算1s对应多少cycle数目以及启动线程池。

APF_ERROR CExecMan::Start(int nWorkers, THREAD_INIT_PROC fInit, uint32 nTimerPrecision)
{
    // 计算一下CPU一个tick有多少个cycle数目,这样可以通过rdstc转换成为时间
    g_nCycleStart = rdtsc();
    g_nLastTick = 0;
    g_nTickPrecision = (nTimerPrecision >= 1000) ? 1000 : nTimerPrecision;
    g_nCyclesInTick = GetCpuFreq() / (1000 / g_nTickPrecision);
    if (0 < m_ThreadPool.Start(nWorkers, fInit)) { // 启动线程池
        AtomicSetValue(m_nCurJobs, 0);
        return APFE_OK;
    }
    return APFE_SYS_ERROR;
}

Stop逻辑的话可能需要仔细理解一下

// bWait表示是否需要等待kylin的线程池正常结束,执行完成线程池里面任务为止。
// 不断修正m_nCurJobs作用是为了阻止新任务的提交。这个我们可以在QueuExec部分联合起来一起看看
void CExecMan::Stop(bool bWait)
{
    if (bWait) {
        int n;
        while (0 != (n=atomic_comp_swap(&m_nCurJobs, LARGE_ENOUGH_NEGATIVE, 0))) {
            if (LARGE_ENOUGH_NEGATIVE == n) {
                return;
            }
            Sleep(1);
        }
        m_ThreadPool.Stop();
    }
    else {
        AtomicSetValue(m_nCurJobs, LARGE_ENOUGH_NEGATIVE);
    }
}

2.6.3 QueueExec

QueueExec和QueueExecEmergent逻辑非常相似,只不过底层调用线程池的QueueJob和QueueEmergentJob.我们这里只看QueueExec.

static void
Proc(Job* pJob)
{
    AsyncContext* pCtx = (AsyncContext*)pJob;
    CAsyncClient* pClient = pCtx->pClient;

    pCtx->fProc = NULL;
    pClient->OnCompletion(pCtx);
    pClient->Release();
}

// bClientReferref表明用户是否加了引用
// 如果按照sunxiao的说明,我们这里最好永远写true,然后我们在外面调用点自己AddRef和DecRef
APF_ERROR CExecMan::QueueExec(AsyncContext* pCtx, bool bClientReferred)
{
    VERIFY_OR_RETURN(NULL != pCtx, APFE_INVALID_ARGS);
    VERIFY_OR_RETURN(NULL != pCtx->pClient, APFE_INVALID_ARGS);

    // 如果atomic +1 <0的话,那么说明这个时候m_nCurJobs已经被置过LARGE_ENOUGH_NEGATIVE了
    // 当然我们是有假设m_nCurJobs不会非常快地复位,可以认为这个是成立的
    if (atomic_add(&m_nCurJobs, 1) >= 0) {
        // TODO: if the number of workers is dynamic, we may need to lock and re-dispatch exisiting events...
        if (!bClientReferred) {
            pCtx->pClient->AddRef();
        }
        pCtx->fProc = Proc; // 置ctx的fProc为Proc
        // 然后根据client的AsyncId来决定指派到哪一个线程工作
        m_ThreadPool.QueueJob((Job*)pCtx, pCtx->pClient->GetAsyncId() % m_ThreadPool.GetWorkerCount());
        atomic_add(&m_nCurJobs, -1); // 将当前正在提交的Jobs个数-1.
        return APFE_OK;
    }
    if (bClientReferred) {
        pCtx->pClient->Release();
    }
    // 那么将m_nCurJobs重置
    AtomicSetValue(m_nCurJobs, LARGE_ENOUGH_NEGATIVE);
    if (IsKylinRunning()) {
        TRACE0("Fatal error: Exec workers are not started\n");
    }
    return APFE_NO_WORKER;
}

我们这里可以看到m_nCurJobs在QueueExec和Stop之间的配合。然后我们稍微看看Proc这个过程,对于CPU任务直接调用OnCompletion然后调用Release.

2.6.4 Timer

定时器任务加入是DelayExec,检查触发是RunTimer.如果查看CallGraph的话会发现RunTimer都是在网络部分调用的,我们在网络部分看看触发的时机。 DelayExec里面的逻辑会根据定时时间来判断如何实现,如果定时时间超过g_nTickPrecision,那么会将超时时间加入一个map里面去,然后让RunTimer去触发。 否则会加入线程池里面去。对于加入到map里面的fProc有一个特殊的标记(JOB_PROC)2.在CancelExec时候会认识这个特殊标记,将事件从map中删除。

APF_ERROR CExecMan::DelayExec(int nAction, CAsyncClient* pClient, uint32 nMilliseconds, AsyncContext* pCtx)
{
    VERIFY_OR_RETURN(NULL != pClient, APFE_INVALID_ARGS);
    VERIFY_OR_RETURN(NULL != pCtx, APFE_INVALID_ARGS);

    pCtx->nAction = nAction;
    pCtx->pClient = pClient;
    pCtx->fProc = (JOB_PROC)2;

    if (g_nTickPrecision <= nMilliseconds) {
        pClient->AddRef();

        s_Lock.Lock();
        /* milliseconds -> ticks */
        nMilliseconds = g_nLastTick + nMilliseconds / g_nTickPrecision;
        pCtx->nErrCode = nMilliseconds;
        s_TimerMap.insert(nMilliseconds, pCtx);
        s_Lock.Unlock();
        return APFE_OK;
    }

    APF_ERROR err;
    s_Lock.Lock();
    err = QueueExec(pCtx, false);
    s_Lock.Unlock();
    return err;
}

然后我看看看RunTimer这个部分。这个部分非常简单,就是根据当前时间判断map里面哪些定时器需要进行触发,然后将触发逻辑作为Job丢入CPU线程池。 我们这里不看RunTimer具体代码,反而倒是对外面的一些小细节比较感兴趣。我们不希望RunTimer被多个实例调用,只要有一个实例调用就OK,使用CToken完成。 当然可以使用mutex+try_lock来实现但是开销应该会更大。

void CExecMan::RunTimer()
{
    static CToken token;
    UNLIKELY_IF (!token.TryAcquire(1)) {
        return;
    }
    // ...
    token.Release(1);
}

2.6.5 Example

我们这里给的例子非常简单,但是希望有启发性.我们从1开始进行打印,每打印1个数字就认为当前任务结束,一直无限打印。 但是我们同时会启动一个定时器,只允许我们做1.2s钟时间的打印。如果我们在1.2s内打印数字个数超过了100个的话,那么我们重启一个定时器1.2s, 而这次打印数字个数阈值为200个之后每次翻倍,直到1.2s内没有打印我们所希望个数的话程序退出。在主线程100ms来检查ExecMan的RunTimer.

#include <cstdio>
#include <vector>
#include <time.h>
#include "stdafx.h"
#include "Kylin.h"

static volatile int worker=16;
static const int PRINT=0;
static const int TIMEOUT=1;
static const int TIMEOUT_MS=1200;

class XAsyncClient:public CAsyncClient{
  public:
    AsyncContext print_ctx;
    AsyncContext delay_ctx;
    int id;
    int current_number;
    int threshold;
    int last_working_number;
    bool stop; // 一旦stop那么立刻后面内容都不打印了
    XAsyncClient(int id_):
            id(id_),
            current_number(1),
            threshold(100),
            last_working_number(0),
            stop(false){
        InitAsyncContext(&print_ctx);
        InitAsyncContext(&delay_ctx);
        print_ctx.pClient=this;
        delay_ctx.pClient=this;
    }
    int Release(){ // Release通常都是这样写的
        int n=CAsyncClient::Release();
        if(n==0){
            delete this;
        }
        return n;
    }
    void Start(){ // 启动时候我们发起两个Job
        print_ctx.nAction=PRINT;
        CAsyncClient::AddRef();
        g_pExecMan->QueueExec(&print_ctx,true);
        CAsyncClient::AddRef();
        g_pExecMan->DelayExec(TIMEOUT,this,TIMEOUT_MS,&delay_ctx);
    }
    void Print(){
        fprintf(stderr,"(%d)xref:%d,current:%d\n",id,CAsyncClient::GetRef(),
                current_number);
    }
    virtual void OnCompletion(AsyncContext* ctx){
        switch(ctx->nAction){ // 分别处理这两个类型Job
            case PRINT:
                if(stop){
                    break;
                }
                fprintf(stderr,"(%d)%d\n",id,current_number);
                current_number++;
                if((current_number-last_working_number)>=threshold){
                    // update
                    last_working_number=current_number;
                    threshold*=2;
                    // canel timer.
                    fprintf(stderr,"(%d)==============================restart timer==============================\n",id);
                    g_pExecMan->CancelExec(&delay_ctx);
                    g_pExecMan->DelayExec(TIMEOUT,this,TIMEOUT_MS,&delay_ctx);
                }
                CAsyncClient::AddRef();
                g_pExecMan->QueueExec(&print_ctx,true);
                break;
            case TIMEOUT:
                fprintf(stderr,"(%d)********************quit********************\n",id);
                atomic_add(&worker,-1);
                stop=true;
                break;
            default:
                assert(0);
        }
    }
};

int main(){
    // use 4 exec threads.
    InitKylin(4,0,0);
    // 100ms
    const struct timespec spec={0,100*1000000};
    const int worker_num=worker;
    std::vector< XAsyncClient* > vec;
    for(int i=0;i<worker_num;i++){
        XAsyncClient* client=new XAsyncClient(i);
        vec.push_back(client);
        client->Start();
    }
    while(1){
        nanosleep(&spec,NULL);
        //Sleep(1);
        if(AtomicGetValue(worker)==0){
            StopKylin(true);
            break;
        }else{ // 主线程我们每隔100ms检查一次超时情况
            g_pExecMan->RunTimer();
        }
    }
    for(int i=0;i<worker_num;i++){
        XAsyncClient* client=vec[i];
        client->Print(); // 退出时候打印一下信息
        delete client;
    }
    return 0;
}

2.7 DiskMan

2.7.1 Overview

我们首先看看和磁盘相关的两个比较重要的类。因为磁盘操作不像CPU操作一样不需要任何辅助数据结构,磁盘操作需要一些信息比如fd等,磁盘操作需要一个特殊的磁盘Context。 然后每次发起磁盘操作使用另外一个结构Request.这里名字上和原来的CPU事件并不太一样,我们可能需要习惯一下。实际上如果我们需要映射到CPU事件里面的话,这两个Context应该结合在一起。 只不过这里DiskContext不是经常变动的部分,而DiskRequest是经常变动的部分所以分离开了。

// 这个是磁盘操作相关的Context
struct DiskContext {
    int fd;
    int diskno;// which disk
    CAsyncClient *pClient;
    uint64 nCurOff, nRead, nWrite;
    char* pPath;// file path
    int nFlag;// file open flag
};

// 这个是一次发起的请求
struct DiskRequest {
    union { // 这里使用这种方式纯粹是为了写起来方便
        AsyncContext async;
        Job job;
    };
    /* !!the first element must be AsyncContext */
    void *buf; // 读写放到什么地方
    int request; // 读写多少字节数据
    int xfered; //当前实际读写了多少数据
    uint64 off; // 在什么偏移上读写
    DiskContext *pCtx;
};

然后在看看DiskMan接口

#define g_pDiskMan CDiskMan::Instance() // 直接使用宏g_pDiskMan就可以单例

class CDiskMan
{
    DECLARE_SINGLETON(CDiskMan) // 单例模式
    public:
    ~CDiskMan();

    APF_ERROR Start(int nDisks, THREAD_INIT_PROC fInit);
    void Stop();

    APF_ERROR Associate(int diskno, int fd, CAsyncClient* pClient, DiskContext* pContext);
    APF_ERROR Associate(int diskno, char* pPath, int nFlag, CAsyncClient* pClient, DiskContext* pContext);
    APF_ERROR Deassociate(DiskContext* pContext);

    void Read(DiskContext* pContext, void* pBuf, int count, DiskRequest* pReq);
    void Write(DiskContext* pContext, void* pBuf, int count, DiskRequest* pReq);

  private:
    CThreadPool m_ThreadPool;
    bool m_bStarted;
};

2.7.2 Start & Stop

启动停止逻辑非常简单,就是让线程池启动和停止

APF_ERROR CDiskMan::Start(int nDisks, THREAD_INIT_PROC fInit)
{
    ASSERT(nDisks <= MAX_NR_DISKS);
    ASSERT(!IsStarted());
    // TODO::: if 0 then check the number of disks
    if (m_ThreadPool.Start(nDisks, fInit) > 0) {
        m_bStarted = true;
        return APFE_OK;
    }
    return APFE_SYS_ERROR;
}

void CDiskMan::Stop()
{
    if (m_bStarted) {
        m_bStarted = false;
        m_ThreadPool.Stop();
    }
}

2.7.3 Associate & Deassociate

逻辑非常简单,就是进行一下DiskContext和CAsyncClient初始化的工作。关于DiskContext里面各个字段含义的话,都是在Read/Write时候解释的。 关于这里最重点的绑定内容就是diskno.diskno非常作用类似于CPU事件里面的AsyncId.相同AsyncId可以分摊到同一个CPU线程这件可以免去加锁开销, 而diskno可以让多个DiskContext分摊到同一个Disk线程,不同线程绑定不同的磁盘驱动器,这样可以让同一个磁盘驱动器仅仅为几个文件服务。

APF_ERROR CDiskMan::Associate(int diskno, char* pPath, int nFlag,
                              CAsyncClient* pClient, DiskContext* pContext)
{
    pContext->fd = -1;
    pContext->diskno = diskno;
    pContext->pClient = pClient;
    pContext->nCurOff = pContext->nRead = pContext->nWrite = 0;
    pContext->pPath = pPath;
    pContext->nFlag = nFlag;
    pClient->AddRef();
    return APFE_OK;
}

APF_ERROR CDiskMan::Deassociate(DiskContext* pContext)
{
    if (pContext->pPath && pContext->fd!=-1) {
        close(pContext->fd);
    }
    pContext->pClient->Release();
    return APFE_OK;
}

2.7.4 Read & Write

文件的Read/Write非常简单,因为本身就是一个阻塞的过程,发起一次就可以保证读取所有内容了,所以不像网络一样需要多次发起。

void CDiskMan::Read(DiskContext* pContext, void* pBuf, int count, DiskRequest* pReq)
{
    pReq->async.nAction = AA_READ; // 设置nAction,然后QueueTask,Task中回调就是ReadOp
    QUEUE_TASK(pContext, pReq, ReadOp, pBuf, count);
}

void CDiskMan::Write(DiskContext* pContext, void* pBuf, int count, DiskRequest* pReq)
{
    pReq->async.nAction = AA_WRITE; // 设置nAction,然后QueueTask,Task中回调就是WriteOp
    QUEUE_TASK(pContext, pReq, WriteOp, pBuf, count);
}

// 可以看到这里pClient已经帮我们AddRef了,所以我们在实际编写App不需要再次AddRef
#define QUEUE_TASK(pContext, pReq, f, pBuf, count)                  \
    pContext->pClient->AddRef();                                    \
    pReq->async.pClient = pContext->pClient;                        \
    pReq->job.fProc = f;                                            \
    pReq->buf = pBuf;                                               \
    pReq->request = count;                                          \
    pReq->xfered = 0;                                               \
    pReq->pCtx = pContext;                                          \
    m_ThreadPool.QueueJob(&pReq->job, pContext->diskno)

从上面分析的话,所有重要的工作都分摊在了ReadOp和WriteOp上面。我们需要做的是Dig下去看看两个是怎么工作的。但是很不幸,两个函数里面内容都是使用了宏DiskOp. DiskOp(a,b,c)其中a表示对应的系统调用叫什么名字,b表示这个Job,c表示读写(没有使用).

static void
ReadOp(Job* pJob)
{
    DISK_OP(read, pJob, 0);
}

static void
WriteOp(Job* pJob)
{
    DISK_OP(write, pJob, 1);
}

继续Dig看看DISKOP是怎么工作的

// 完成之后设置ErrCode,并且加入CPU线程池。用户最终处理的话需要强制转换DiskRequest.
#define NotifyClient(err, req)    {                                     \
        req->async.nErrCode = err;                                      \
        g_pExecMan->QueueExec((AsyncContext*)req, true);                \
    }

// 1.可以看到如果fd==-1的话会自动打开文件
// 2.判断一下发起的off和context是否一致,不一致的话使用pread/pwrite,然后修改off
// 3.读取完成之后使用NotifyClient通知App
#define DISK_OP(op, j, rw)                                              \
    DiskRequest* pReq = CONTAINING_RECORD(j, DiskRequest, job);         \
    DiskContext* pCtx = pReq->pCtx;                                     \
    UNLIKELY_IF (-1 == pCtx->fd) {                                      \
        pCtx->fd = open(pCtx->pPath, pCtx->nFlag, 0644);                \
        UNLIKELY_IF (-1 == pCtx->fd) {                                  \
            NotifyClient(errno, pReq);                                  \
            return;                                                     \
        }                                                               \
    }                                                                   \
    uint64 cost = rdtsc();                                              \
    int len;                                                            \
    if (pReq->off != pCtx->nCurOff) {                                   \
        len = p ## op(pCtx->fd, pReq->buf, pReq->request, pReq->off);   \
        pCtx->nCurOff = pReq->off;                                      \
    }                                                                   \
    else {                                                              \
        len = op(pCtx->fd, pReq->buf, pReq->request);                   \
    }                                                                   \
    if (len >= 0) {                                                     \
        cost = rdtsc() - cost;                                          \
        int which = (pCtx->diskno<<1) + rw;                             \
        g_nDiskStats[which] += len;                                     \
        g_nDiskCosts[which] += cost;                                    \
        pCtx->nCurOff += len;                                           \
        pReq->off += len;                                               \
        pReq->xfered = len;                                             \
        NotifyClient(0, pReq);                                          \
    }                                                                   \
    else {                                                              \
        NotifyClient(errno, pReq);                                      \
    }

2.7.5 Example

例子非常简单就是我们首先发起一个磁盘操作写文件然后在将去读取出来。

#include <cstdio>
#include <vector>
#include <string>
#include <time.h>
#include "stdafx.h"
#include "Kylin.h"

static const int worker_num=8;
static volatile int worker=worker_num;
static const char* fname_prefix="hello";
static const char* content="world";
static const int READ=0;
static const int WRITE=1;
static const int disk_thread_num=4;

class XDiskRequest:public DiskRequest{
  public:
    int nAction; // what kind of operation we init.
};

class XAsyncClient:public CAsyncClient{
  public:
    int id;
    std::string name;
    DiskContext disk_ctx;
    XDiskRequest disk_req;
    XAsyncClient(int id_):
            id(id_){
        // make filename.
        char tmp[128];
        snprintf(tmp,sizeof(tmp),"%s_%d",fname_prefix,id);
        name=tmp;
        g_pDiskMan->Associate(id%disk_thread_num,const_cast<char*>(name.c_str()),O_RDWR | O_CREAT,this,&disk_ctx);
    }
    ~XAsyncClient(){
        g_pDiskMan->Deassociate(&disk_ctx);
    }
    void Start(){
        disk_req.nAction=WRITE;
        char* s=strdup(content);
        // ctx off=0.write from the beginning
        g_pDiskMan->Write(&disk_ctx,s,strlen(s)+1,&disk_req);
    }
    void Print(){
        fprintf(stderr,"(%d)xref:%d\n",id,CAsyncClient::GetRef());
    }
    virtual void OnCompletion(AsyncContext* ctx){
        XDiskRequest* req=(XDiskRequest*)ctx;
        if(req->nAction==WRITE){
            assert(req->xfered==req->request);
            // free written buffer.
            free(req->buf);
            // begin to read.
            disk_req.nAction=READ;
            disk_req.off=0; // read from beginning
            char* s=(char*)malloc(req->request);
            g_pDiskMan->Read(&disk_ctx,s,req->request,&disk_req);
        }else if(req->nAction==READ){
            assert(req->xfered==req->request);
            fprintf(stderr,"(%d)%s\n",id,req->buf);
            // free read buffer.
            free(req->buf);
            atomic_add(&worker,-1);
        }
    }
};

int main(){
    // use 4 disk threads.
    InitKylin(1,1,disk_thread_num);
    std::vector< XAsyncClient* > vec;
    for(int i=0;i<worker_num;i++){
        XAsyncClient* client=new XAsyncClient(i);
        vec.push_back(client);
        client->Start();
    }
    // 100ms.
    const struct timespec timeout={0,100*1000000};
    while(1){
        nanosleep(&timeout,NULL);
        if(AtomicGetValue(worker)==0){
            StopKylin(true);
            break;
        }
    }
    for(int i=0;i<worker_num;i++){
        XAsyncClient* client=vec[i];
        client->Print();
        delete client;
    }
    return 0;
}

2.8 NetworkMan

2.8.1 Overview

和网络相关的也有两个比较重要的类。同样和DiskMan相同,NetworkMan也提供了NetContext和NetRequest.

// 网络请求
struct NetRequest {
    union {
        AsyncContext async;
        DLINK link;
    };
    /* !! the first element must be AsyncContext */
    union {
        BufHandle hdl;
        struct {
            BufHandle *pHdl;    // not used by read
            void* buf;
            int len;            // buffer len
            int request;        // request len
        };
    };
    int xfered; // 已经读取了多少个字节
    uint32 ip;                  // for UDP
    uint16 port;                // for UDP
};

// Socket相关状态
enum SocketState {
    SS_VOID = 0,
    SS_LISTENING_0,
    SS_LISTENING,
    SS_CONNECTING_0,
    SS_CONNECTING,
    SS_CONNECTED_0,
    SS_CONNECTED,
    SS_ERROR,
    SS_SHUTDOWN,
};

// Socket Flag
enum SocketFlag {
    SF_DIRECT_CALLBACK  = 0x1, // 处理完成之后回调函数直接在Network线程执行而不丢到CPU线程
    SF_PERMANENT        = 0x2, // todo:???
    SF_UDP              = 0x4, // 使用UDP协议
    SF_DONT_EXHAUST     = 0x8, // todo:???
};

// 网络相关操作的Context
struct NetContext {
    SOCKET s; // 网络socket
    SocketState state; // socket状态

    DLINK link;                 // to link all active sockets
    CLockedInt tWrite, tRead; // todo:???
    TranQueue qRead, qWrite; // 读写请求队列,push_back和pop_front需要加锁但是不用等待。
    NetRequest *pReadReq, *pWriteReq; // 当前读写请求
    BufHandle wHdl; // 写BufHandle,StartWrite里面多次写的话当前BufHandle就保存在这里。
    // nDelayRead表示是否已经发生了Delay操作,不允许多次发起Delay操作
    // nEnabled表示当前Context是否可用
    volatile int nDelayRead, nEnabled;
    uint32 nTimeout; // 超时时间
    uint64 tTimeoutCycle; // 超时时间转换成为的cycle,类似于一个绝对的超时时间

    CAsyncClient *pClient; // 关联的client
    CEPoller* pPoller; // 底层poller
    uint32 flag; // socket flag

    union {
        struct {                    // for connect
            uint32 ip;
            uint16 port;
        };
        uint32 backlog;             // for listen  // listen的bakclog
    };
};

然后我们看看NetworkMan的接口

#define g_pNetworkMan       CNetworkMan::Instance() // 单例

typedef TLockedQueue<NetContext, TStlList<NetContext*> > NetContexts;
class CNetworkMan
{
    DECLARE_SINGLETON(CNetworkMan) // 单例
    public:
    ~CNetworkMan();

    APF_ERROR Start(int nWorkers, THREAD_INIT_PROC fInit);
    void Stop();

    APF_ERROR Associate(NetContext* pCtx);
    APF_ERROR Deassociate(NetContext* pCtx);

    void Listen(NetContext* pCtx, int backlog);
    void Connect(NetContext* pCtx, uint32 ip, uint16 port, void *pWriteBuf,
                 int nWriteCount, uint32 timeout, NetRequest* pReq);
    void Monitor(NetContext* pCtx);
    bool Delay(NetContext* pCtx, uint32 nMilliseconds);

    void Read(NetContext* pCtx, void* pBuf, int len, int request, NetRequest* pReq);
    void Write(NetContext* pCtx, void* pBuf, int count, NetRequest* pReq);

    void Read(NetContext* pCtx, NetRequest* pReq);
    void Write(NetContext* pCtx, NetRequest* pReq);

  private:
    CThreadPool m_ThreadPool;
    volatile int m_bStarted;
    NetContexts m_NetContexts; // 管理的NetContext
};

虽然这个部分叫做NetworkMan,但是里面比较重要的逻辑部分都放在了EPoller里面来完成的,对于NetworkMan只不过是一个发起者的角色罢了。 对于NetworkMan部分的话还需要关注的是SocketState这个东西,因为对于EPoller来说的话只能够发现Socket究竟是可读还是可写, 我们必须根据SocketState来判断下一步应该进行什么操作。在最后的的话我们会给出SocketState的每个状态解释。

2.8.2 Start & Stop

过程大致是这样的,将EPoller.Run包装在一个MyJob对象里面,然后将MyJob丢入到线程池里面进行运行。一旦线程池取到MyJob, 调用的函数是EPoller.run方法。也就是说每个队列里面始终只有1个Job,所以在Stop时候我们只需要将EPoller.Close()就可以了。

APF_ERROR CNetworkMan::Start(int nWorkers, THREAD_INIT_PROC fInit)
{
    int n = m_ThreadPool.Start(nWorkers, fInit);
    if (n > 0) {
        for (int i=0; i<n; i++) { // 这里s_Jobs是静态对象
            s_Jobs[i].poller.Create(256);
            s_Jobs[i].job.fProc = Polling;
            s_Jobs[i].count = 0;
            DLINK_INITIALIZE(&s_Jobs[i].job.link);
            m_ThreadPool.QueueJob(&s_Jobs[i].job, i);
        }
        AtomicSetValue(m_bStarted, 1);
        return APFE_OK;
    }
    return APFE_SYS_ERROR;
}

void CNetworkMan::Stop()
{
    if (1 == atomic_comp_swap(&m_bStarted, -1, 1)) {
        //TODO: stop all pollers
        int n = m_ThreadPool.GetWorkerCount();
        for (int i=0; i<n; i++) {
            s_Jobs[i].poller.Close(); // 只需要Close即可
        }
        //m_ThreadPool.Stop();
    }
}

然后我们看看MyJob以及对应的Polling是如何实现的

struct MyJob {
    Job job;
    volatile int count;
    CEPoller poller;
};
static MyJob s_Jobs[MAX_NR_NETWORK_WORKERS];
static void
Polling(Job* pJob)
{
    MyJob* pMyJob = CONTAINING_RECORD(pJob, MyJob, job);
    pMyJob->poller.Run(); // 直接调用poller.Run()即可
}

2.8.3 Associate & Deassociate

Associate和Deassociate逻辑非常简单,无非就是将NetContext和网络线程池里面的EPoller绑定起来,将NetContext里面的fd给epoll来管理。

APF_ERROR CNetworkMan::Associate(NetContext* pCtx)
{
    APF_ERROR err = APFE_NOT_CREATED;
    ASSERT(pCtx->state > SS_VOID);

    // Note: we have to be careful here, if the number of pollers is dynamic
    // 选择一个关联fd比较少的Poller来绑定
    int x1=AtomicGetValue(s_Jobs[0].count), m=0, n;
    for (int i=1; i<m_ThreadPool.GetWorkerCount(); i++) {
        n = AtomicGetValue(s_Jobs[i].count);
        if (n < x1) {
            x1 = n;
            m = i;
        }
    }
    //int m = atomic_add(&n, 1) % m_ThreadPool.GetWorkerCount();
    pCtx->pPoller = &s_Jobs[m].poller;
    atomic_add(&s_Jobs[m].count, 1);

    // 选择好这个Poller之后,然后调用Poller->Associate和这个NetContext进行关联
    m_NetContexts.push_back(pCtx);
    if (APFE_OK != (err=pCtx->pPoller->Associate(pCtx))) {
        m_NetContexts.remove(pCtx);
    }
    return err;
}

APF_ERROR CNetworkMan::Deassociate(NetContext* pCtx)
{
    APF_ERROR err;
    MyJob* pJob;

    if (NULL != pCtx->pPoller) {
        if (APFE_OK == (err=pCtx->pPoller->Deassociate(pCtx))) { // 解除关联
            m_NetContexts.remove(pCtx);
            pJob = CONTAINING_RECORD(pCtx->pPoller, MyJob, poller);
            atomic_add(&pJob->count, -1);
        }
        return err;
    }
    m_NetContexts.remove(pCtx);
    return APFE_OK;
}

2.8.4 Listen

发起Listen操作

void CNetworkMan::Listen(NetContext* pCtx, int backlog)
{
    pCtx->backlog = backlog;
    pCtx->pClient->AddRef(); // 这里AddRef了
    pCtx->state = SS_LISTENING_0; // 注意状态
    Associate(pCtx);
    // TODO: error with Associate
}

2.8.5 Connect

发起Connect操作,注意这里还填写了WriteReq,timeout以及nAction=AA_CONNECT.对于pReq的话应该是在connect之后发起的写操作。

void CNetworkMan::Connect(NetContext* pCtx, uint32 ip, uint16 port,
                          void *pBuf, int count, uint32 timeout, NetRequest* pReq)
{
    ASSERT(NULL != pReq);

    pCtx->ip = ip;
    pCtx->port = port;

    pReq->async.nAction = AA_CONNECT;
    pReq->pHdl = NULL;
    pReq->buf = pBuf;
    pReq->request = count;
    pReq->xfered = 0;
    pCtx->pWriteReq = pReq;
    pCtx->nTimeout = timeout;

    pCtx->pClient->AddRef();
    pCtx->state = SS_CONNECTING_0;
    Associate(pCtx);
    // TODO: error with Associate
}

2.8.6 Monitor

发起Monitor操作。所谓Monitor操作的话是指创建Socket成功之后的操作。代码这里的意图是创建服务端的Socket成功之后准备监听的状态。

void CNetworkMan::Monitor(NetContext* pCtx)
{
    pCtx->pClient->AddRef();
    pCtx->state = SS_CONNECTED_0;
    Associate(pCtx);
    // TODO: error with Associate
}

2.8.7 Delay

发起一个读超时的操作

bool CNetworkMan::Delay(NetContext* pCtx, uint32 nMilliseconds)
{
    bool bRet = false;
    int bDelay = AtomicSetValue(pCtx->nDelayRead, 1); // 不能够重复发起
    if (0 == bDelay) {
        pCtx->pClient->AddRef();
        pCtx->pPoller->Delay(pCtx, nMilliseconds);
        bRet = true;
    }
    return bRet;
}

2.8.8 Read & Write

Read/Write过程非常简单,就是将Request封装好之后交给EPoller来处理。所以这里可以知道大部分的事情都是在Poller里面完成的。

void CNetworkMan::Read(NetContext* pCtx, NetRequest* pReq)
{
    pReq->xfered = 0;
    pCtx->pPoller->Read(pCtx, pReq);
}
void CNetworkMan::Write(NetContext* pCtx, NetRequest* pReq)
{
    pReq->xfered = 0;
    pCtx->pPoller->Write(pCtx, pReq);
}

2.8.9 SocketState

  • SS_VOID = 0, // 初始状态
  • SS_LISTENING_0, // 发起listen操作
  • SS_LISTENING, // 执行listen之后
  • SS_CONNECTING_0, // 发起connect操作
  • SS_CONNECTING, // 执行connect之后,下一步是SS_CONNECTED,但是下次发起write操作
  • SS_CONNECTED_0, // 启动monitor操作,下一步是SS_CONNECTED,但是下次发起read操作
  • SS_CONNECTED, // 连接建立成功
  • SS_ERROR, // socket错误
  • SS_SHUTDOWN, // 对端已经关闭

2.9 EPoller

2.9.1 Overview

对于EPoller来说的话里面每个接口似乎都很重要,所以有必要对每一个接口都进行分析。首先看看EPoller的完整接口

class CEPoller
{
    int m_fd; // epoll fd
    uint64 m_nRead, m_nWrite; // 读写字节数统计
    typedef TMultiMap<uint64, NetContext> DelayedList;
    DelayedList m_DelayedList; // 定时列表
    CSpinLock m_DListLock;  // 定时列表锁

    bool StartWrite(NetContext* pCtx, bool bForce);
    bool StartRead(NetContext* pCtx, bool bUser);
    void DoWithDelayed();
    bool DoConnect(NetContext* pCtx);

  public:
    CEPoller() { m_fd = 0; m_nRead = m_nWrite = 0; }
    ~CEPoller() { Close(); }

    APF_ERROR Create(int size);
    void Close();

    APF_ERROR Associate(NetContext* pCtx);
    APF_ERROR Deassociate(NetContext* pCtx);
    APF_ERROR Run();

    bool CancelDelay(NetContext* pCtx);
    void Delay(NetContext* pCtx, uint32 nMilliseconds);
    void Read(NetContext* pCtx, NetRequest* pReq);
    void Write(NetContext* pCtx, NetRequest* pReq);
};

2.9.2 Create & Close

Create和Close无非就是创建epoll fd

APF_ERROR CEPoller::Create(int size)
{
    m_fd = sys_epoll_create(size);
    if (0 < m_fd)
        return APFE_OK;
    PERROR("epoll_create");
    return GetLastError();
}

void CEPoller::Close()
{
    // TODO::: cancel epoll_waiting...
    if (m_fd) {
        close(m_fd);
        m_fd = 0;
    }
}

2.9.3 Associate & Deassociate

将NetContext的fd放入epoll中进行关联.

APF_ERROR CEPoller::Associate(NetContext* pCtx)
{
    struct epoll_event ev;
    ASSERT(pCtx->state > SS_VOID);

    SetNonBlock(pCtx->s); // 首先设置成为非阻塞模式
    ev.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLET;
    ev.data.ptr = pCtx;

    //TRACE0("before CEPoller::Associate\n");
    pCtx->pClient->AddRef(); // 注意这里AddRef
    TRACE1("Associate %x -> %s\n", pCtx->s, pCtx->pClient->GetName());
    if (0 == sys_epoll_ctl(m_fd, EPOLL_CTL_ADD, pCtx->s, &ev)) { // 加入到epoll里面
        return APFE_OK;
    }
    PERROR("epoll_ctl in Associate");
    pCtx->pClient->Release();
    return GetLastError();
}

APF_ERROR CEPoller::Deassociate(NetContext* pCtx)
{
    if (pCtx->s != INVALID_SOCKET) {
        if (0 == sys_epoll_ctl(m_fd, EPOLL_CTL_DEL, pCtx->s, NULL)) { // 从epoll删除
            TRACE1("Deassociate %x\n", pCtx->s);
            pCtx->s = INVALID_SOCKET;
            return APFE_OK;
        }
        return GetLastError();
    }
    return APFE_ALREADY_FREED;
}

2.9.4 DoConnect

DoConnect是真实地进行connect操作

bool CEPoller::DoConnect(NetContext* pCtx)
{
    SOCKADDR_IN sa;
    ZeroMemory(&sa, sizeof(SOCKADDR_IN));
    sa.sin_port = htons(pCtx->port);
    sa.sin_family = AF_INET;
    sa.sin_addr.s_addr = pCtx->ip;

    AtomicSetValue(pCtx->state, SS_CONNECTING);
    pCtx->tTimeoutCycle = 0;

    int err = connect(pCtx->s, (LPSOCKADDR)&sa, sizeof(sa));
    //TRACE0("connect = %d, %d, %s\n", err, errno, strerror(errno));
    if (EINPROGRESS == errno) {
        if (INFINITE != pCtx->nTimeout) { // 如果需要设置超时的话
            Delay(pCtx, pCtx->nTimeout); // 那么会发起一个超时操作
        }
        return true;
    }
    return 0 == err;
}

2.9.5 Delay & CancalDelay

Delay操作无非就是将超时事件到DelayedList里面,而CancalDelay是将对应的超时事件从DelayedList里面删除。

void CEPoller::Delay(NetContext* pCtx, uint32 nMilliseconds)
{
    uint64 nCycle = rdtsc();
    nCycle += (GetCpuFreq() * nMilliseconds) / 1000;

    m_DListLock.Lock();
    m_DelayedList.insert(nCycle, pCtx);
    m_DListLock.Unlock();

    pCtx->tTimeoutCycle = nCycle; // 转换成为绝对超时时间,单位是cycle.
}

bool CEPoller::CancelDelay(NetContext* pCtx)
{
    bool bRet = false;
    if (0 != pCtx->tTimeoutCycle) {
        m_DListLock.Lock();
        bRet = m_DelayedList.remove(pCtx->tTimeoutCycle, pCtx);
        m_DListLock.Unlock();
    }
    return bRet;
}

2.9.6 DoWithDelayed & DoWithError

#todo: 这两个部分都非常杂并且细节很多,没有完全看懂。另外一个疑问就是对于Delay这种超时事件的话为什么不放在CPU的超时时间里面去完成。这样似乎会更加优雅。 而现在的情况就是NetworkMan里面管理了一个定时器,而CPU线程也有一个定时器,而从代码编写者的demo来没有使用过networkman定时器。

2.9.7 Read & Write

发起Read/Write最后都是调用StartRead/StartWrite

void CEPoller::Read(NetContext* pCtx, NetRequest* pReq)
{
    pReq->async.nAction = AA_READ; // 发起的Action是AA_READ
    pCtx->pClient->AddRef();
    pCtx->qRead.push_back(pReq); // 放入read request queue.
    StartRead(pCtx, true);
}

void CEPoller::Write(NetContext* pCtx, NetRequest* pReq)
{
    pReq->async.nAction = AA_WRITE; // 发起的Action是AA_WRITE
    pCtx->pClient->AddRef();
    pCtx->qWrite.push_back(pReq); // 放入write request queue.
    StartWrite(pCtx, true);
}

首先我们看看StartRead这个过程。这里所谓的StartRead不过就是尝试阅读访问一下Context看看是否可以读出数据。调用的时机一个是刚发起Read一个是EPoller::Run里面。

bool CEPoller::StartRead(NetContext* pCtx, bool bUser)
{
    NetRequest* pReq;
    int t, n;

    while (0 == (t=pCtx->tRead.Add())) { // todo:这个地方完全没有看懂
        if (SS_CONNECTED != AtomicGetValue(pCtx->state)) {
            TRACE1("Not read due to state=%s\n", State2Str(pCtx->state));
            if (1 == pCtx->tRead.Set(0)) {
                return false;
            }
            continue;
        }
        if (NULL == pCtx->pReadReq) { // 得到第一个ReadRequest.
            pCtx->pReadReq = pCtx->qRead.pop_front();
        }
        while (pCtx->pReadReq) {
            pReq = pCtx->pReadReq;
            ASSERT(pReq->len > pReq->xfered);
            if (0 == (pCtx->flag & SF_UDP)) { // 如果是TCP的话那么直接使用recv读取
                n = recv(pCtx->s, (char*)pReq->buf+pReq->xfered, pReq->len-pReq->xfered, 0);
                if (AtomicGetValue(g_nQuickAckMode)) {
                    SetQuickAck(pCtx->s);
                }
            }
            else {
                SOCKADDR_IN sa;
                socklen_t len = sizeof(sa);
                n = recvfrom(pCtx->s, (char*)pReq->buf+pReq->xfered, pReq->len-pReq->xfered, 0, (LPSOCKADDR)&sa, &len);
                pReq->port = ntohs(sa.sin_port);
                pReq->ip = sa.sin_addr.s_addr;
            }
            //TRACE5("recv(%x, %p, %d): %d, %d(%s)\n", pCtx->s, (char*)pReq->buf+pReq->xfered,
            //      pReq->len-pReq->xfered, n, errno, strerror(errno));
            if (0>n && EAGAIN==errno) {
                pCtx->tRead.Set(0);
                return true;
            }
            else if (0 < n) {
                m_nRead += n;
                pReq->xfered += n;
                if (pReq->xfered >= pReq->request) { // 如果xfered超过request的话,那么想上进行通知
                    // 这里需要注意的就是,所谓的xfered可能会超过request,只要buffer的空间容许
                    pCtx->pReadReq = NULL;
                    NotifyClient(0, pCtx, pReq);
                    pCtx->pReadReq = pCtx->qRead.pop_front();
                }
                g_pExecMan->RunTimer(); // 并且调用RunTimer来检查超时情况
            }
            else {
                TRACE3("Error in recv(%x, %d): %d, %d(%s)\n", pCtx->s, pReq->len-pReq->xfered,
                       n, errno, strerror(errno));
                AtomicSetValueIf(pCtx->state, SS_SHUTDOWN, SS_CONNECTED);
                shutdown(pCtx->s, SHUT_RDWR);
                pCtx->tRead.Set(0);
                return false;
            }
        }
        if (1 == (t=pCtx->tRead.Set(0))) {
            return SS_CONNECTED == AtomicGetValue(pCtx->state); // app may have called shutdown...
        }
    }
    if (0 < t)
        return true;
    ASSERT(bUser);
    /* the last one in pCtx->qRead should be what we just inserted, unless
     * 1. there is concurrent read in other threads; or
     * 2. it has been notified in the failure handler of CSocketV.
     */
    pReq = pCtx->qRead.pop_back();
    if (pReq) { // todo:???
        NotifyClient(APFE_AFTER_BROKEN, pCtx, pReq);
    }
    return false;
}

StartWrite这个过程也是非常相似的,不过为了方便起见的话每个Write操作都是使用BufHandle来管理的

bool CEPoller::StartWrite(NetContext* pCtx, bool bUser)
{
    BufHandle* pHdl;
    int t, n;

    while (0 == (t=pCtx->tWrite.Add())) { // todo:完全没有看懂
        if (SS_CONNECTED != AtomicGetValue(pCtx->state)) {
            TRACE1("Not write due to state=%s\n", State2Str(pCtx->state));
            /*if (SS_ERROR == pCtx->state) {
                NetRequest* pReq;
                if (NULL != pCtx->pWriteReq) {
                    pReq = pCtx->pWriteReq;
                    pCtx->pWriteReq = NULL;
                    NotifyClient(APFE_NET_ERROR, pCtx, pReq);
                }
                for (pReq=pCtx->qWrite.pop_front(); pReq; pReq=pCtx->qWrite.pop_front()) {
                    NotifyClient(APFE_NET_ERROR, pCtx, pReq);
                }
            }*/
            if (1 == pCtx->tWrite.Set(0)) {
                return false;
            }
            continue;
        }
        n = 0;
        while (NULL != (pHdl=NotifyAndNextWrite(pCtx, n))) { // 每次取出一个合适的BufHandle出来用于写,如果已经写完成的话会在里面进行通知
            ASSERT(0 != pHdl->nDataLen);
            if (0 == (pCtx->flag & SF_UDP)) { // 如果是tcp的话那么直接send
                n = send(pCtx->s, pHdl->pBuf, pHdl->nDataLen, 0);
            }
            else {
                SOCKADDR_IN sa;
                ZeroMemory(&sa, sizeof(SOCKADDR_IN));
                sa.sin_port = htons(pCtx->pWriteReq->port);
                sa.sin_family = AF_INET;
                sa.sin_addr.s_addr = pCtx->pWriteReq->ip;
                n = sendto(pCtx->s, pHdl->pBuf, pHdl->nDataLen, 0, (LPSOCKADDR)&sa, sizeof(sa));
            }
            //TRACE5("send(%x, %d): %d, %d(%s)\n", pCtx->s, pHdl->nDataLen, n, errno, strerror(errno));
            if (0 < n) {
                pHdl->nDataLen -= n;
                ASSERT(pHdl->nDataLen >= 0);
                ASSERT(NULL != pCtx->pWriteReq);
                pCtx->pWriteReq->xfered += n;
                m_nWrite += n;
                g_pExecMan->RunTimer();
            }
            else if (EAGAIN == errno) {
                pCtx->tWrite.Set(0);
                return true;
            }
            else {
                TRACE2("Error in send(%x, %p, %d): %d, %d(%s), %s\n", pCtx->s, pHdl->pBuf,
                    pHdl->nDataLen, n, errno, strerror(errno), State2Str(pCtx->state));
                AtomicSetValueIf(pCtx->state, SS_SHUTDOWN, SS_CONNECTED);
                shutdown(pCtx->s, SHUT_RDWR);
                pCtx->tWrite.Set(0);
                return false;
            }
        }
        if (1 == (t=pCtx->tWrite.Set(0))) {
            return true;
        }
    }
    if (0 < t)
        return true;
    ASSERT(bUser);
    /* the last one in pCtx->qWrite should be what we just inserted, unless
     * 1. there is concurrent write in other threads; or
     * 2. it has been notified in the failure handler of CSocketV.
     */
    NetRequest* pReq = pCtx->qWrite.pop_back();
    if (pReq) { // todo:???
        NotifyClient(APFE_AFTER_BROKEN, pCtx, pReq);
    }
    return false;
}

这里面最主要的过程无非就是NotifyAndNextWrite.

static BufHandle*
NotifyAndNextWrite(NetContext* pCtx, int nWritten)
{
    BufHandle* pHdl = &pCtx->wHdl;
    NetRequest* pReq;

    if (0 == pHdl->nDataLen) { // 如果当前的DataLen==0的话那么就需要挑选下一个BufHandle了。
        if (NULL != (pHdl=pHdl->_next)) { // 遍历BufHandle链表确认全部写完
            pCtx->wHdl._next = pHdl->_next;
            pCtx->wHdl.pBuf = pHdl->pBuf;
            pCtx->wHdl.nDataLen = pHdl->nDataLen;
            ASSERT(pHdl->nDataLen != 0);
            return &pCtx->wHdl;
        }

        if (NULL != pCtx->pWriteReq) { // 当前的Request已经写完,那么需要一次通知
            ASSERT(0 != nWritten);
            pReq = pCtx->pWriteReq;
            pCtx->pWriteReq = NULL;
            NotifyClient(0, pCtx, pReq);
        }
        while (NULL != (pCtx->pWriteReq=pCtx->qWrite.pop_front())) { // 挑选出下一个Request.
            pHdl = &pCtx->pWriteReq->hdl;
            while (pHdl && 0==pHdl->nDataLen) {
                pHdl = pHdl->_next;
            }
            if (pHdl) {
                pCtx->wHdl._next = pHdl->_next;
                pCtx->wHdl.pBuf = pHdl->pBuf;
                pCtx->wHdl.nDataLen = pHdl->nDataLen;
                return &pCtx->wHdl;
            }
            pReq = pCtx->pWriteReq;
            pCtx->pWriteReq = NULL;
            NotifyClient(0, pCtx, pReq); // 如果所有的BufHandle为NULL的话,那么也直接通知写完
        }
        return NULL;
    }
    pHdl->pBuf += nWritten; // 如果当前BufHandle没有搞定的话,那么修改一下下一次写偏移.
    return pHdl;
}

2.9.8 NofityClient

当EPoller发现内部需要进行回调的话,那么就会通过NofityClient来通知客户进行回调。

static void
NotifyClient(int nErrCode, NetContext* pCtx, NetRequest* pReq)
{
    CAsyncClient* pClient = pCtx->pClient;

    pReq->async.nErrCode = nErrCode;
    pReq->async.pClient = pClient;
    //int nAction = pReq->async.nAction;

    TRACE5("Notify client (%p,%d) %d, %d, %p, %p\n", pClient, pClient->GetRef(), pReq->async.nAction, nErrCode, pCtx, pReq);
    if (0 != (pCtx->flag & SF_DIRECT_CALLBACK)) { // 如果flag里面有SF_DIRECT_CALLBACK,那么在本线程执行
        pClient->OnCompletion((AsyncContext*)pReq);
        pClient->Release();
    }
    else { // 否则加入CPU线程执行
        g_pExecMan->QueueExec((AsyncContext*)pReq, true);
    }
    //TRACE0("After EPoll::NotifyClient: %d\n", nAction);
}

这是NotifyClient的一个版本,另外一个版本的原型是这样的

static void NotifyClient(int nErrCode, int nAction, NetContext* pCtx);

可以看到这个地方没有和任何的NetRequest进行关联,所以在内部实现的话会直接new一个出来,用户在处理这个逻辑时候需要记得释放。

2.9.9 Run

过程比较冗长,而且里面很多细节依然并不是非常清楚,但是不妨碍大体的理解

APF_ERROR CEPoller::Run()
{
    NetContext* pCtx;
    NetRequest *pReq;
#define MAX_EPOLL_EVENT     64
    struct epoll_event events[MAX_EPOLL_EVENT];
    int nfds;
    bool bRead, bWrite, bErr;

    while (1) {
        TRACE7("epoll_wait start\n");
        nfds = sys_epoll_wait(m_fd, events, MAX_EPOLL_EVENT, g_nTickPrecision); // 进行epoll_wait
        TRACE7("epoll_wait return: nfds=%d\n", nfds);
        if (nfds >= 0) {
            for (int i=0; i<nfds; i++) {
                pCtx = (NetContext*)events[i].data.ptr;
                ASSERT(pCtx && pCtx->pClient);
                if (0 == pCtx->nEnabled) { // 如果不可用的话那么忽略
                    continue;
                }
                bRead = (events[i].events & EPOLLIN) ? true : false;
                bWrite = (events[i].events & EPOLLOUT) ? true : false;
                bErr = (events[i].events & EPOLLERR) ? true : false;

#ifdef  _DEBUG
                if (0 != bErr) {
                    TRACE4("EPOLLERR: %x(%p), state=%s\n", pCtx->s, pCtx, State2Str(pCtx->state));
                }
                if (bRead) {
                    TRACE7("EPOLLIN: %x(%p), state=%s\n", pCtx->s, pCtx, State2Str(pCtx->state));
                }
                if (bWrite) {
                    TRACE7("EPOLLOUT: %x(%p), state=%s\n", pCtx->s, pCtx, State2Str(pCtx->state));
                }
#endif
                switch (AtomicGetValue(pCtx->state)) {
                case SS_CONNECTED_0:            // support for Monitor
                    AtomicSetValue(pCtx->state, SS_CONNECTED);
                    NotifyClient(APFE_NOTIFY, AA_READ, pCtx);
                    // no break
                case SS_CONNECTED: // 如果已经连接上的话
                    if (bRead) {
                        while (pCtx->tRead.Get() > 0) {
                            thread_yield();
                        }
                        if (!StartRead(pCtx, false)) { // 发起Read操作
                            DoWithError(pCtx);
                            break;
                        }
                    }

                    if (!bErr) {
                        if (bWrite) {
                            while (pCtx->tWrite.Get() > 0) {
                                thread_yield();
                            }
                            if (!StartWrite(pCtx, false)) { // 发起Write操作
                                DoWithError(pCtx);
                                break;
                            }
                        }
                    }
                    else {
                        int bytes = 0;
                        if (0 == (pCtx->flag & SF_DONT_EXHAUST)) {
                            ioctl(pCtx->s, FIONREAD, &bytes);
                        }
                        if (0 == bytes) { // 如果在缓存里面没有数据的话,否则会先触发READ事件
                            AtomicSetValueIf(pCtx->state, SS_SHUTDOWN, SS_CONNECTED); // 那么将状态置为Shutdown
                            DoWithError(pCtx); // 交给错误处理
                        }
                    }
                    break;

                case SS_CONNECTING_0:
                    if (DoConnect(pCtx)) // 发起connect
                        break;
                    bErr = true;

                case SS_CONNECTING:
                    CancelDelay(pCtx); // 如果是正在连接的话那么取消ReConnect超时时间,Connect上之后的话那么立刻触发写事件
                    if (!bErr && bWrite) {
                        pReq = pCtx->pWriteReq;
                        pCtx->pWriteReq = NULL;
                        ASSERT(pReq != pCtx->qWrite.get_front());

                        if (0==pReq->request && NULL==pReq->pHdl) {
                            AtomicSetValue(pCtx->state, SS_CONNECTED);
                            NotifyClient(0, pCtx, pReq); // 如果没有写请求的话那么触发回调
                        }
                        else {
                            pCtx->qWrite.push_front(pReq);
                            AtomicSetValue(pCtx->state, SS_CONNECTED);
                        }
                        if (!StartWrite(pCtx, false)) { // 如果有写请求那么立刻进行Write
                            DoWithError(pCtx);
                        }
                    }
                    else if (bErr) {
                        DoWithError(pCtx);
                    }
                    break;

                case SS_LISTENING_0:
                    DoListen(pCtx);
                    break;

                case SS_LISTENING:
                    // TODO: how to deal with listen failure?
                    if (!bErr && bRead) {
                        pCtx->pClient->AddRef();
                        NotifyClient(APFE_OK, AA_LISTEN, pCtx); // AA_LISTEN完毕,那么相当于下一步我们需要进行accept.
                    }
                    break;

                case SS_SHUTDOWN:
                    DoWithError(pCtx);
                    break;

                default:
                    break;
                }
            }

            DoWithDelayed();
            g_pExecMan->RunTimer();
        }
        else if (EINTR != GetLastError()) {
            if (IsKylinRunning())
                PERROR("Error in epoll_wait");
            return GetLastError();
        }
    }
    return APFE_OK;
}

可以说看下来的话很多细节都是头昏脑胀的,以及各种情况返回什么样的错误码。估计只有写代码的人才知道里面为什么这么做,里面到底存在什么陷阱。 但是所幸的是写代码的人为我们提供了一个简单的接口可以使用,而且里面设置了很多默认的处理。大部分情况下面我们只需要使用默认处理即可,这个默认类就是Socket.

2.10 Socket

2.10.1 Overview

Socket包含了如何配合使用NetworkMan以及EPoller两个部分,并且里面提供了很多默认的封装。对于我们编写程序的话大部分时候都使用这个类即可。还是首先看看接口

// socket类型
enum SockType {
    ST_NONE = 0,
    ST_CREATED,
    ST_LISTEN,
    ST_CONNECTED,
    ST_ACCEPTED,
    ST_ERROR
};

class CSocketV : public CAsyncClient
{
  protected:
    SOCKET m_s; // socket句柄
    uint16 m_port; // 工作在什么端口上,如果是connection的话表示客户端的端口
    NetContext m_Ctx; // context
    NetRequest m_Req; // request
    volatile int m_type; // socket类型
    AsyncContext* m_pConnCtx; // 没有使用

    CSocketV() {
        m_s = INVALID_SOCKET;
        m_port = 0;
        m_type = ST_NONE;
        m_pConnCtx = NULL;
        InitAsyncContext(&m_Req.async);
    }
    virtual ~CSocketV() {
        Close();
    }

    virtual void OnCompletion(AsyncContext* pContext);

    // false means the connection is to be deleted
    virtual bool OnConnected(APF_ERROR nErrCode) { return 0==nErrCode; }
    virtual void OnListened(SOCKET s, SOCKADDR_IN* pSockAddrIn) { closesocket(s); }

    virtual void OnWritten(APF_ERROR nErrCode, NetRequest* pReq) {}
    virtual void OnRead(APF_ERROR nErrCode, NetRequest* pReq) {}

    virtual void OnBroken(APF_ERROR nErrCode) = 0;
    bool Close();

  public:
    virtual int Release() {
        int n = CAsyncClient::Release();
        if (n == 0) {
            delete this;
        }
        return n;
    }
    APF_ERROR Create(bool bTCP, uint16 port=0, uint32 ip=0, uint32 flag=0); // Check SocketFlag in NetworkMan.h
    APF_ERROR Create(SOCKET s, uint16 port, uint32 flag=0);

    void Listen(int backlog=64);
    void Connect(uint32 ip, uint16 port, uint32 timeout=INFINITE/*in Milliseconds*/);
    void ConnectAndWrite(uint32 ip, uint16 port, void* pBuf, int count, uint32 timeout=INFINITE/*in Milliseconds*/);
    void ReConnect(uint32 nMilliseconds);
    bool Shutdown(bool bLingerOff=true);

    void Read(void* pBuf, int count, int request, NetRequest* pReq);
    void Read(void* pBuf, int count, NetRequest* pReq);
    void Write(void* pBuf, int count, NetRequest* pReq);
    void Write(BufHandle* pHdl, NetRequest* pReq);

#ifdef _UNITTEST
    NetContext* GetCtx() { return &m_Ctx; }
#endif
};

2.10.2 Create

对于create的话我们看稍微复杂的一个实现,就是头参数是bTCP的原型实现。注意这里ip=0,port=0的话那么这样对于client来说的话就会随机选择一个端口。

APF_ERROR CSocketV::Create(bool bTCP, uint16 port, uint32 ip, uint32 flag)
{
    APF_ERROR err;
    ASSERT(!IsCreated());
    int retry = 0;
    SOCKADDR_IN sa;

retry1:
    m_s = WSASocket(AF_INET, bTCP ? SOCK_STREAM : SOCK_DGRAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); // 构造socket.
    if (INVALID_SOCKET != m_s) {
        atomic_add(&s_nSockets, 1);
        SetReuseAddr(m_s); // 并且设置reuseadd.
    }
    else {
        err = WSAGetLastError();
        TRACE0("WSASocket() error: %d(%s)\n", err, strerror(err));
        if (retry++ < 10) { // 创建socket可能会因为句柄上线而创建失败,这里会进行重试。
            // it should be 24: too many open files, we may need to sleep and retry
            Sleep(500);
            TRACE0("Retry %d\n", retry);
            goto retry1;
        }
        else {
            TRACE0("Total sockets: %d\n", AtomicGetValue(s_nSockets));
            return err;
        }
    }

    retry = 0;
retry2:
    ZeroMemory(&sa, sizeof(SOCKADDR_IN));
    sa.sin_port = htons(port);
    sa.sin_family = AF_INET;
    sa.sin_addr.s_addr = ip;

    UNLIKELY_IF (SOCKET_ERROR == bind(m_s, (LPSOCKADDR)&sa, sizeof(sa))) { // 然后进行绑定,其实对于client可有可无但是没有任何影响
        err = WSAGetLastError();
        TRACE0("Error in bind(%s, %d): %s\n", bTCP?"TCP":"UDP", port, strerror(err));

        retry ++;
        if (retry<3 || (0==port && retry<10)) { // todo:这里重试原因不太清楚
            Sleep(500);
            TRACE0("Retry %d\n", retry);
            goto retry2;
        }

        // fail to bind
        closesocket(m_s);
        m_s = INVALID_SOCKET;
        return err;
    }
    if (0 == port) { // 如果使用0端口来创建的话那么可以通过getsockname来得到最终绑定到的端口
        socklen_t namelen = sizeof(sa);
        getsockname(m_s, (LPSOCKADDR)&sa, &namelen);
        port = ntohs(sa.sin_port);
    }

    InitNetContext(&m_Ctx, m_s, this, flag);
    m_Ctx.ip = ip;
    m_Ctx.port = port;

    SetName("Socket(%x)", m_s);
    TRACE4("%s %s created, port=%d\n", m_name, bTCP?"TCP":"UDP", port);
    m_port = port;
    AtomicSetValue(m_type, ST_CREATED); // 这是成为ST_CRERATED类型
    if (!bTCP) {
        m_Ctx.flag |= SF_UDP;
        g_pNetworkMan->Monitor(&m_Ctx);
    }
    return APFE_OK;
}

后来我觉得这个Create原型我们也还是值得看看的,因为这个函数原型通常是在server进行accept之后触发的create

APF_ERROR CSocketV::Create(SOCKET s, uint16 port, uint32 flag)
{
    ASSERT(!IsCreated());

    s_nSockets ++;
    m_s = s;
    m_port = port;
    InitNetContext(&m_Ctx, m_s, this, flag);
    m_Ctx.port = port;

    AtomicSetValue(m_type, ST_CREATED);
    SetName("Socket(%x)", m_s);
    g_pNetworkMan->Monitor(&m_Ctx); // 立即启动monitor.monitor状态在run里面的话就会通过(APFE_NOTIFY,AA_READ)来通知server应该发起读操作了.
    // Monitor无非就是通知服务器刚连接上来的客户端可读了。
    return APFE_OK;
}

2.10.3 Close

bool CSocketV::Close()
{
    if (INVALID_SOCKET != m_s) { // 可以多次调用Close
        g_pNetworkMan->Deassociate(&m_Ctx); // 将这个NetContext从NetworkMan解除关联
        closesocket(m_s);
        m_s = INVALID_SOCKET;
        atomic_add(&s_nSockets, -1);

        // 这个地方是这么猜测的,普通类型socket的话仅仅是为了单连接通信存在的
        // 如果进行close的话完全可以直接丢弃,而对于listen来说的话,如果一旦直接关闭那么服务就不能够维持
        // 不过话说回来个人感觉这个地方还是挺non-sense的
        if (ST_LISTEN == AtomicSetValue(m_type, ST_NONE)) {
            Release();
        }
        return true;
    }
    return false;
}

2.10.4 Shutdown

bool CSocketV::Shutdown(bool bLingerOff)
{
    int state = AtomicSetValue(m_Ctx.state, SS_SHUTDOWN);
    if (INVALID_SOCKET != m_s) {
        TRACE2("Shutdown socket(%x), LingerOff=%d\n", m_s, bLingerOff);
        if (bLingerOff) { // 我们不care这个linger部分
            SetLingerOff(m_s);
        }
        shutdown(m_s, SHUT_RDWR);
        if (SS_CONNECTING_0==state && !g_pNetworkMan->Delay(&m_Ctx, 0)) { // 这个地方这么处理原因
            // 是因为如果我们当前是这样状态的话,并且已经发起了一个Delay事件的话
            // 那么肯定会被通知到然后调用NotifyClient,所以这我们需要首先AddRef.
            AddRef();
        }
        return true;
    }
    return false;
}

2.10.5 Listen

void CSocketV::Listen(int backlog)
{
    ASSERT(IsCreated());

    AtomicSetValue(m_type, ST_LISTEN); // 设置成为ST_LISTEN状态
    g_pNetworkMan->Listen(&m_Ctx, backlog); // 然后发起listen
}

2.10.6 Connect

Connect相关函数有Connect,ConnectAndWrite以及ReConnect,都非常简单通过NetworkMan发起操作

void CSocketV::Connect(uint32 ip, uint16 port, uint32 timeout)
{
    //TRACE5("Connecting(%x) %s:%d...\n", m_s, in_ntoa(ip), port);
    ASSERT(IsCreated());
    g_pNetworkMan->Connect(&m_Ctx, ip, port, NULL, 0, timeout, &m_Req);
}

void CSocketV::ConnectAndWrite(uint32 ip, uint16 port, void* pBuf, int count, uint32 timeout)
{
    //TRACE5("Connecting(%x) %s:%d...\n", m_s, in_ntoa(ip), port);
    ASSERT(IsCreated());
    g_pNetworkMan->Connect(&m_Ctx, ip, port, pBuf, count, timeout, &m_Req);
}

void CSocketV::ReConnect(uint32 nMilliseconds)
{
    m_Ctx.pWriteReq = &m_Req;
    m_Ctx.nDelayRead = 0;
    m_Ctx.tRead.Init();
    m_Ctx.tWrite.Init();
    m_Ctx.state = SS_CONNECTING_0;
    g_pNetworkMan->Delay(&m_Ctx, nMilliseconds); // 只有这种SS_CONNECTING_0状态才会发起Delay事件
}

2.10.7 Read & Write

Read/Write函数也非常简单无非就是包装一些NetRequest然后交给NetworkMan来发起

void CSocketV::Read(void* pBuf, int count, int request, NetRequest* pReq)
{
    g_pNetworkMan->Read(&m_Ctx, pBuf, count, request, pReq);
}

void CSocketV::Write(void* pBuf, int count, NetRequest* pReq)
{
    g_pNetworkMan->Write(&m_Ctx, pBuf, count, pReq);
}

2.10.8 OnCompletion

其实对于Socket部分最主要的内容还是在这里。上面那些函数无非是做一个Wrapper,而Socket本质上还是一个CAsyncClient要处理各种事件回调。 我们需要仔细看看OnCompletion的过程,因为大部分我们所需要扩展的就是这个部分了。但是如果仔细阅读这个部分的话,还是会发现有很多细节的东西的。

void CSocketV::OnCompletion(AsyncContext* pContext)
{
    NetRequest* pReq = (NetRequest*)pContext;
    int nAction = pContext->nAction;
    switch (nAction) {
    case AA_READ: // 如果我们发起的是READ的话,那么触发OnRead回调
        // 可能存在的ErrCode包括
        // APFE_AFTER_BROKEN todo:how to handle
        // APFE_NET_ERROR todo:how to handle
        // APFE_NOTIFY 需要我们自己释放pReq,因为这个是new出来的.通常我们有新的client连接上来应该发起读操作了
        // APFE_OK 发起的读操作没有任何错误
        OnRead(pContext->nErrCode, pReq);
        break;

    case AA_WRITE: // 如果我们发起的是WRITE操作的话,那么触发OnWrite回调
        // 可能存在的ErrCode包括
        // APFE_AFTER_BROKEN todo:how to handle
        // APFE_NET_ERROR todo:how to handle
        // APFE_OK 发起的写操作没有任何错误
        OnWritten(pContext->nErrCode, pReq);
        break;

    case AA_LISTEN: // 如果我们发起的是LISTEN操作的话
        if (0 == pContext->nErrCode) { // listen调用没有任何问题
            while (1) {
                SOCKADDR_IN sa;
                socklen_t saLen = sizeof(sa);

                TRACE7("To accept(%x)\n", m_s);
                SOCKET s = accept(m_s, (LPSOCKADDR)&sa, &saLen);
                if (-1 != s) {
                    TRACE1("Accepted(%x) %s:%d = %x\n", m_s, in_ntoa(sa.sin_addr.s_addr), ntohs(sa.sin_port), s);
                    OnListened(s, &sa); // 对于每一个accept都触发OnListened回调
                }
                else {
                    TRACE7("Error in accept(%x)\n", m_s);
                    break;
                }
            }
        }
        delete pReq;
        break;

    case AA_CONNECT: // 如果我们发起的是CONNECT操作的话
        AtomicSetValue(m_type, ST_CONNECTED);
        TRACE1("OnConnected(%x): %d, %p\n", m_s, pContext->nErrCode, pContext);
        if (0 == pContext->nErrCode) {
            OnConnected(0); // 并且没有任何错误,那么触发OnConnected(0)
            break;
        }
        else if (OnConnected(pContext->nErrCode)) { // 如果OnConnected(err)返回true的话那么忽略
            break;
        }

        pContext->nErrCode = APFE_CONNECT_ERROR; // 如果认为connect这样是一个错误的话,那么后面的错误都是APFE_CONNECT_ERROR
        Release(); // todo:这个地方为什么需要Release
        // 原因是因为作者假设我们编写方式都是在初始化开始AddRef一下,然后对于OnBorken这类错误的话通常需要析构CAsyncClient,所以Release让我们可以销毁
        // goes here

    case AA_BROKEN: // 连接断开,可能是客户端的正常关闭
        // 但是错误码可能有下面这些
        // APFE_CONNECT_ERROR 连接出现错误
        // APFE_TIMEDOUT 连接出现超时
        // APFE_SYS_ERROR todo:???
        // APFE_NET_ERROR todo:???
        TRACE5("CSocket::OnBroken(%x): %d\n", m_s, pContext->nErrCode);
        AtomicSetValue(m_type, ST_ERROR);
        ASSERT(NULL == m_Ctx.pReadReq);
        ASSERT(NULL == m_Ctx.pWriteReq);

        m_Ctx.tWrite.Set(LARGE_ENOUGH_NEGATIVE);
        m_Ctx.tRead.Set(LARGE_ENOUGH_NEGATIVE);
        // 如果提交了这些请求的话,那么都会重新触发一次OnWritten/OnRead并且以对应的错误号通知
        {
            NetRequest* pReq1;
            while (NULL != (pReq1=m_Ctx.qWrite.pop_front())) {
                OnWritten(pContext->nErrCode, pReq1);
                Release();
            }
            while (NULL != (pReq1=m_Ctx.qRead.pop_front())) {
                OnRead(pContext->nErrCode, pReq1);
                Release();
            }
        }
        OnBroken(pContext->nErrCode);
        // AA_CONNECT also goes here
        if (nAction == AA_BROKEN)
            delete pReq;
        break;
    }
}

2.10.9 Summary

个人觉得kylin的网络异步部分就是spaghetti,所有的nAction以及nErrCode都是相互缠绕.我们可以假设我们的场景类似于RPC然后考虑一下应该如何处理。

  • OnConnected(APF_ERROR nErrCode).如果nErrCode!=APFE_OK的话那么Close,否则认为连接成功.错误交给OnBorken处理
  • OnListened(SOCKET s, SOCKADDR_IN* pSockAddrIn).建立连接即可。
  • OnWritten(APF_ERROR nErrCode, NetRequest* pReq).如果nErrCode!=APFE_OK的话,那么Close连接。
  • OnRead(APF_ERROR nErrCode, NetRequest* pReq).如果nErrCode=APFE_NOTIFY的话那么发起Read,如果=APFE_OK的话那么处理,其他情况下面Close连接。
  • OnBroken(APF_ERROR nErrCode).直接Close即可,但是可以根据nErrCode来判断原因.可能是客户端正常关闭.通常最后需要Release一下,释放这个连接。

关于具体使用的话可以仔细参考demo/echo.

3 demo

3.1 echo

echo这个就是客户端连接上之后不断发送1K数据而服务器就直接回复这1K数据

3.1.1 client.cc

/***************************************************************************
 *
 * Copyright (c) Baidu.com, Inc. All Rights Reserved
 *
 **************************************************************************/


/**
 * @author zhangyan04(@baidu.com)
 * @brief
 *
 */

#include <cstdio>
#include <vector>
#include <string>
#include <time.h>
#include "stdafx.h"
#include "Kylin.h"
#include "Socket.h"

#define ERR(fmt,...) fprintf(stderr,fmt,##__VA_ARGS__)
#define DBG(fmt,...) fprintf(stdout,fmt,##__VA_ARGS__)
//#define DBG(fmt,...)

static const int connect_timeout_ms=5*1000;
static const int CONNECT_TIMEOUT=AA_USER_BEGIN;
static const int read_timeout_ms=1200*1000;
static const int READ_TIMEOUT=AA_USER_BEGIN+1;
static const int worker_num=8;
static volatile int worker=worker_num;
static const int packet_size=1024;

class XAsyncClient:public CSocketV {
  public:
    int id;
    AsyncContext delay_ctx;
    char packet[packet_size];
    XAsyncClient(int id_):id(id_){
        AddRef();
        InitAsyncContext(&delay_ctx);
        memset(packet,0xcc,packet_size);
    }
    ~XAsyncClient(){
    }
    void Print(){
        ERR("(%d)xref:%d\n",id,CAsyncClient::GetRef());
    }
    void Close(){
        CSocketV::Close();
        atomic_add(&worker,-1);
    }
    void StartConnnect(){
        APF_ERROR err=CSocketV::Create(true);
        if(err==APFE_OK){
            Connect(in_aton("127.0.0.1"),19870);
            // we don't need to AddRef() right here.
            g_pExecMan->DelayExec(CONNECT_TIMEOUT,this,connect_timeout_ms,&delay_ctx);
        }else{
            ERR("Create(%s)\n",APFError2Str(err));
        }
    }
    virtual void OnBroken(APF_ERROR nErrCode){
        ERR("OnBroken(%s)\n",APFError2Str(nErrCode));
        Close();
        Release();
    }
    virtual bool OnConnected(APF_ERROR nErrCode){
        g_pExecMan->CancelExec(&delay_ctx);
        if(nErrCode==APFE_OK){
            DBG("APFE_OK OnConnected ready\n");
            Write(packet,packet_size,&m_Req);
            return true;
        }else{
            ERR("OnConnected(%s)\n",APFError2Str(nErrCode));
            return false;
        }
    }
    virtual void OnWritten(APF_ERROR nErrCode,NetRequest* req){
        if(nErrCode==APFE_OK){
            DBG("APFE_OK OnWritten(%d)\n",req->xfered);
            Read(req->buf,req->xfered,&m_Req);
            // we don't need to AddRef() right here.
            g_pExecMan->DelayExec(READ_TIMEOUT,this,read_timeout_ms,&delay_ctx);
        }else{
            ERR("OnWritten(%s)\n",APFError2Str(nErrCode));
            Close();
        }
}
    virtual void OnRead(APF_ERROR nErrCode,NetRequest* req){
        g_pExecMan->CancelExec(&delay_ctx);
        if(nErrCode==APFE_OK){
            DBG("APFE_OK OnRead(%d)\n",req->xfered);
            Write(req->buf,req->xfered,&m_Req);
        }else{
            ERR("OnRead(%s)\n",APFError2Str(nErrCode));
            Close();
        }
    }
    virtual void OnCompletion(AsyncContext* ctx){
        switch(ctx->nAction){
            case CONNECT_TIMEOUT:
                ERR("connect timeout(%d ms)\n",connect_timeout_ms);
                Close();
                break;
            case READ_TIMEOUT:
                ERR("read timeout(%d ms)\n",read_timeout_ms);
                Close();
                break;
            default:
                CSocketV::OnCompletion(ctx);
                break;
        }
    }
};

int main(){
    InitKylin(2,2,0);
    std::vector< XAsyncClient* > vec;
    for(int i=0;i<worker_num;i++){
        XAsyncClient* client=new XAsyncClient(i);
        vec.push_back(client);
        client->StartConnnect();
    }
    // 100ms.
    const struct timespec timeout={0,100*1000000};
    while(1){
        nanosleep(&timeout,NULL);
        if(AtomicGetValue(worker)==0){
            StopKylin(true);
            break;
        }
    }
    return 0;
}

3.1.2 server.cc

/***************************************************************************
 *
 * Copyright (c) Baidu.com, Inc. All Rights Reserved
 *
 **************************************************************************/


/**
 * @author zhangyan04(@baidu.com)
 * @brief
 *
 */

#include <cstdio>
#include <vector>
#include <string>
#include <time.h>
#include <signal.h>
#include "stdafx.h"
#include "Kylin.h"
#include "Socket.h"

#define ERR(fmt,...) fprintf(stderr,fmt,##__VA_ARGS__)
#define DBG(fmt,...) fprintf(stdout,fmt,##__VA_ARGS__)
//#define DBG(fmt,...)

static volatile int client_id=-1;
static const int packet_size=1024;
static volatile int stop=0;
void signal_handler(int signo){
    AtomicSetValue(stop,1);
}

class XAsyncClient:public CSocketV {
  public:
    int id;
    AsyncContext delay_ctx;
    char packet[packet_size];
    XAsyncClient(int id_):id(id_){
        AddRef();
        InitAsyncContext(&delay_ctx);
        memset(packet,0xcc,packet_size);
    }
    ~XAsyncClient(){
    }
    void Print(){
        fprintf(stderr,"(%d)xref:%d\n",id,CAsyncClient::GetRef());
    }
    void StartServe(){
        APF_ERROR err=CSocketV::Create(true,19870);
        if(err==APFE_OK){
            Listen();
        }else{
            ERR("Create(%s)\n",APFError2Str(err));
        }
    }
    void OnListened(SOCKET s,SOCKADDR_IN* pSockAddrIn){
        XAsyncClient* client=new XAsyncClient(atomic_add(&client_id,1));
        APF_ERROR err=client->Create(s,pSockAddrIn->sin_port);
        if(err==APFE_OK){
            ;
        }else{
            ERR("Create(%s)\n",APFError2Str(err));
            Release();
        }
    }
    virtual void OnBroken(APF_ERROR nErrCode){
        ERR("OnBroken(%s)\n",APFError2Str(nErrCode));
        Close();
        Release(); // we have to call 'Release' right here to free connection.
    }
    virtual void OnWritten(APF_ERROR nErrCode,NetRequest* req){
        switch(nErrCode){
            case APFE_OK:
                DBG("APFE_OK OnWritten(%d)\n",req->xfered);
                Read(req->buf,req->xfered,&m_Req);
                break;
            default:
                ERR("OnWritten(%s)\n",APFError2Str(nErrCode));
                Close();
                break;
        }
    }
    virtual void OnRead(APF_ERROR nErrCode,NetRequest* req){
        switch(nErrCode){
            case APFE_NOTIFY:
                delete req;
                DBG("APFE_NOTIFY OnRead ready\n");
                Read(packet,packet_size,&m_Req);
                break;
            case APFE_OK:
                DBG("APFE_OK OnRead(%d)\n",req->xfered);
                Write(req->buf,req->xfered,&m_Req);
                break;
            default:
                ERR("OnRead(%s)\n",APFError2Str(nErrCode));
                Close();
                break;
        }
    }
};

int main(){
    signal(SIGINT,signal_handler);
    InitKylin(2,2,0);
    XAsyncClient client(0);
    client.StartServe();
    // 100ms.
    const struct timespec timeout={0,100*1000000};
    while(1){
        nanosleep(&timeout,NULL);
        if(AtomicGetValue(stop)==1){
            StopKylin(true);
            break;
        }
    }
    return 0;
}

3.2 file-request

file-request希望可以结合ExecMan,NetworkMan和DiskMan三个Manager写一个简单的demo,功能非常简单就是客户端每次发送一个文件名,服务端得到之后分析出文件名然后将内容返回给client. 消息格式都是头部带上了4个字节的长度,然后后面跟上数据。和之前的echo不同的是我们这里需要采取分段读取。

3.2.1 client.cc

/***************************************************************************
 *
 * Copyright (c) Baidu.com, Inc. All Rights Reserved
 *
 **************************************************************************/


/**
 * @author zhangyan04(@baidu.com)
 * @brief
 *
 */

#include <cstdio>
#include <vector>
#include <string>
#include <time.h>
#include "stdafx.h"
#include "Kylin.h"
#include "Socket.h"

#define ERR(fmt,...) fprintf(stderr,fmt,##__VA_ARGS__)
#define DBG(fmt,...) fprintf(stdout,fmt,##__VA_ARGS__)
//#define DBG(fmt,...)

static const int connect_timeout_ms=5*1000;
static const int CONNECT_TIMEOUT=AA_USER_BEGIN;
static const int worker_num=1;
static volatile int worker=worker_num;
static const char* filename="./file-server.cc";

class XAsyncClient:public CSocketV {
  public:
    int id;
    AsyncContext delay_ctx;
    enum status_t{
        READ_BOEY_LEN,
        READ_BODY_CONTENT,
    };
    status_t status;
    int read_body_len;
    XAsyncClient(int id_):id(id_){
        AddRef();
        InitAsyncContext(&delay_ctx);
    }
    ~XAsyncClient(){
    }
    void Print(){
        ERR("(%d)xref:%d\n",id,CAsyncClient::GetRef());
    }
    void Close(){
        CSocketV::Close();
        atomic_add(&worker,-1);
    }
    void StartConnnect(){
        APF_ERROR err=CSocketV::Create(true);
        if(err==APFE_OK){
            Connect(in_aton("127.0.0.1"),19870);
            // we don't need to AddRef() right here.
            g_pExecMan->DelayExec(CONNECT_TIMEOUT,this,connect_timeout_ms,&delay_ctx);
        }else{
            ERR("Create(%s)\n",APFError2Str(err));
        }
    }
    virtual void OnBroken(APF_ERROR nErrCode){
        ERR("OnBroken(%s)\n",APFError2Str(nErrCode));
        Close();
        Release();
    }
    int MakeFileRequest(const char* fname,char** request){
        int len=strlen(fname)+1; // include trailing '\0'
        char* tmp=(char*)malloc(len+4);
        *(int*)tmp=len;
        strcpy(tmp+sizeof(int),fname);
        *request=tmp;
        return len+4;
    }
    virtual bool OnConnected(APF_ERROR nErrCode){
        g_pExecMan->CancelExec(&delay_ctx);
        if(nErrCode==APFE_OK){
            DBG("OnConnected\n");
            char* request=NULL;
            int request_len=MakeFileRequest(filename,&request);
            DBG("MakeFileRequest (%s)(%d)\n",filename,request_len);
            Write(request,request_len,&m_Req);
            return true;
        }else{
            ERR("OnConnected(%s)\n",APFError2Str(nErrCode));
            return false;
        }
    }
    virtual void OnWritten(APF_ERROR nErrCode,NetRequest* req){
        free(req->buf);
        if(nErrCode==APFE_OK){
            DBG("ReadBodyLen\n");
            status=READ_BOEY_LEN;
            Read(&read_body_len,sizeof(read_body_len),&m_Req);
        }else{
            ERR("OnWritten(%s)\n",APFError2Str(nErrCode));
            Close();
        }
    }
    virtual void OnRead(APF_ERROR nErrCode,NetRequest* req){
        if(nErrCode==APFE_OK){
            if(status==READ_BOEY_LEN){
                DBG("ReadBodyLen OK(%d)\n",read_body_len);
                char* tmp=(char*)malloc(read_body_len);
                status=READ_BODY_CONTENT;
                Read(tmp,read_body_len,&m_Req);
            }else if(status==READ_BODY_CONTENT){
                free(req->buf);
                DBG("ReadBodyContent OK\n");
                char* request=NULL;
                int request_len=MakeFileRequest(filename,&request);
                DBG("MakeFileRequest (%s)(%d)\n",filename,request_len);
                Write(request,request_len,&m_Req);
            }
        }else{
            ERR("OnRead(%s)\n",APFError2Str(nErrCode));
            Close();
        }
    }
    virtual void OnCompletion(AsyncContext* ctx){
        switch(ctx->nAction){
            case CONNECT_TIMEOUT:
                ERR("connect timeout(%d ms)\n",connect_timeout_ms);
                Close();
                break;
            default:
                CSocketV::OnCompletion(ctx);
                break;
        }
    }
};

int main(){
    InitKylin(2,2,0);
    std::vector< XAsyncClient* > vec;
    for(int i=0;i<worker_num;i++){
        XAsyncClient* client=new XAsyncClient(i);
        vec.push_back(client);
        client->StartConnnect();
    }
    // 100ms.
    const struct timespec timeout={0,100*1000000};
    while(1){
        nanosleep(&timeout,NULL);
        if(AtomicGetValue(worker)==0){
            StopKylin(true);
            break;
        }
    }
    return 0;
}

3.2.2 server.cc

/***************************************************************************
 *
 * Copyright (c) Baidu.com, Inc. All Rights Reserved
 *
 **************************************************************************/


/**
 * @author zhangyan04(@baidu.com)
 * @brief
 *
 */

#include <cstdio>
#include <vector>
#include <string>
#include <time.h>
#include <signal.h>
#include "stdafx.h"
#include "Kylin.h"
#include "Socket.h"

#define ERR(fmt,...) fprintf(stderr,fmt,##__VA_ARGS__)
#define DBG(fmt,...) fprintf(stdout,fmt,##__VA_ARGS__)
//#define DBG(fmt,...)

static volatile int client_id=-1;
static volatile int stop=0;
static const int disk_thread_num=4;

void signal_handler(int signo){
    AtomicSetValue(stop,1);
}

class XAsyncClient:public CSocketV {
  public:
    int id;
    AsyncContext delay_ctx;
    DiskContext disk_ctx;
    DiskRequest disk_req;
    enum status_t{
        READ_BODY_LEN,
        READ_BODY_CONTENT,
    };
    status_t status;
    int read_body_len;
    char* reply;
    int reply_len;
    XAsyncClient(int id_):id(id_){
        AddRef();
        InitAsyncContext(&delay_ctx);
    }
    ~XAsyncClient(){
    }
    void Print(){
        fprintf(stderr,"(%d)xref:%d\n",id,CAsyncClient::GetRef());
    }
    void StartServe(){
        APF_ERROR err=CSocketV::Create(true,19870);
        if(err==APFE_OK){
            Listen();
        }else{
            ERR("Create(%s)\n",APFError2Str(err));
        }
    }
    void OnListened(SOCKET s,SOCKADDR_IN* pSockAddrIn){
        DBG("OnListened\n");
        XAsyncClient* client=new XAsyncClient(atomic_add(&client_id,1));
        APF_ERROR err=client->Create(s,pSockAddrIn->sin_port);
        if(err==APFE_OK){
            ;
        }else{
            ERR("Create(%s)\n",APFError2Str(err));
            Release();
        }
    }
    virtual void OnBroken(APF_ERROR nErrCode){
        ERR("OnBroken(%s)\n",APFError2Str(nErrCode));
        Close();
        Release(); // we have to call 'Release' right here to free connection.
    }
    virtual void OnWritten(APF_ERROR nErrCode,NetRequest* req){
        free(req->buf);
        switch(nErrCode){
            case APFE_OK:
                DBG("OnWritten\n");
                status=READ_BODY_LEN;
                Read(&read_body_len,sizeof(read_body_len),&m_Req);
                break;
            default:
                ERR("OnWritten(%s)\n",APFError2Str(nErrCode));
                Close();
                break;
        }
    }
    virtual void OnRead(APF_ERROR nErrCode,NetRequest* req){
        switch(nErrCode){
            case APFE_NOTIFY:
                delete req;
                DBG("OnRead APFE_NOTIFY\n");
                status=READ_BODY_LEN;
                Read(&read_body_len,sizeof(read_body_len),&m_Req);
                break;
            case APFE_OK:
                if(status==READ_BODY_LEN){
                    DBG("ReadBodyLen OK(%d)\n",read_body_len);
                    char* request=(char*)malloc(read_body_len);
                    status=READ_BODY_CONTENT;
                    Read(request,read_body_len,&m_Req);
                }else if(status==READ_BODY_CONTENT){
                    DBG("ReadBodyContent OK(%s)\n",req->buf);
                    int fd=open((char*)req->buf,O_RDONLY,0666);
                    free(req->buf);

                    // init disk read.
                    struct stat st_buf;
                    fstat(fd,&st_buf);
                    reply_len=sizeof(int)+st_buf.st_size;
                    reply=(char*)malloc(reply_len);
                    *(int*)reply=(int)st_buf.st_size;
                    g_pDiskMan->Associate(id%disk_thread_num,fd,this,&disk_ctx);
                    DBG("ReadDiskContent (%d)\n",reply_len-sizeof(int));
                    disk_req.off=0; // read from beginning.
                    g_pDiskMan->Read(&disk_ctx,reply+sizeof(int),reply_len-sizeof(int),&disk_req);
                }
                break;
            default:
                ERR("OnRead(%s)\n",APFError2Str(nErrCode));
                Close();
                break;
        }
    }
    virtual void OnCompletion(AsyncContext* ctx){
        if(ctx==&(disk_req.async)){
            assert(ctx->nAction==AA_READ); // disk read done.
            DBG("request %d, xfered %d %s\n",disk_req.request,disk_req.xfered,APFError2Str(ctx->nErrCode));
            assert(disk_req.request==disk_req.xfered);
            g_pDiskMan->Deassociate(&disk_ctx);
            close(disk_ctx.fd);
            Write(reply,reply_len,&m_Req);
        }else{
            CSocketV::OnCompletion(ctx);
        }
    }
};

int main(){
    signal(SIGINT,signal_handler);
    InitKylin(2,2,disk_thread_num);
    XAsyncClient client(0);
    client.StartServe();
    // 100ms.
    const struct timespec timeout={0,100*1000000};
    while(1){
        nanosleep(&timeout,NULL);
        if(AtomicGetValue(stop)==1){
            StopKylin(true);
            break;
        }
    }
    return 0;
}
comments powered by Disqus