1 Preface

libmill, 是Martin Sustrik发起的一个面向unix平台下c语言开发的协程库,实现了一种类似goroutine风格的协程,也支持channel,“通过通信共享数据,而非通过共享数据来完成通信”。

觉得挺有意思的,就抽周末时间看了下。大神的代码干净利索,也看到了不少令自己眼前一亮的tricks,举几个例子吧。

1 通用链表及迭代器实现

offsetof可以计算结构体中的成员的offset,如果我们知道一个struct的类型、其成员名、成员地址,我们就可以计算出struct的地址:

#define mill_cont(ptr, type, member) \
        (ptr ? ((type*) (((char*) ptr) - offsetof(type, member))) : NULL)

基于此可以进一步实现一个通用链表,怎么搞呢?

struct list_item {
    struct list_item * next;
};

struct my_struct {
    void * data; 
    struct list_item * iter;
};

我们通过list_item来构建链表,并在自定义my_struct中增加一个list_item成员,将其用作迭代器。当我们希望构建一个my_struct类型的链表时实际上构建的是list_item的列表,当我们遍历my_struct类型的链表时遍历的也是list_item构成的链表。加入现在遍历到了链表中的某个list_item item,就可以结合前面提到的mill_cont(&item, struct list_item, iter)来获得包括成员的结构体地址,进而就可以访问结构体中的data了。

其实这里Martin Sustrik的实现方式与Linux下的通用链表相关的宏实现类似,只是使用起来感觉更加自然一些,也更容易被接受。

2 栈指针调整(分配协程栈)

栈的分配有两个时机,一个是编译时,一个是运行时。对于编译时可以确定占用空间大小的就在编译时生成对应的汇编指令来分配,如:sub 0x16, %rsp;对于运行时才可以确定占用空间大小的就要在运行时分配,如:int n = srand()%16; int buf[n];,这个如何分配呢?Linux下有个库函数alloca可以在当前栈帧上继续分配空间,但是呢?它不会检查是否出现越界的行为,注意了,因为内存分配后,栈顶会发生变化,寄存器%rsp会受到影响,也是基于这个side effect,就可以实现让指定的函数go(func)将新分配的内存空间当做自己的栈帧继续运行。这样每个协程都有自己的栈空间,再存储一下协程上下文就可以很方便地实现协程切换。

#define mill_go_(fn) \
    do {\
        void *mill_sp;\
        mill_ctx ctx = mill_getctx_();\
        if(!mill_setjmp_(ctx)) {\
            mill_sp = mill_prologue_(MILL_HERE_);\
            int mill_anchor[mill_unoptimisable1_];\
            mill_unoptimisable2_ = &mill_anchor;\
            char mill_filler[(char*)&mill_anchor - (char*)(mill_sp)];\
            mill_unoptimisable2_ = &mill_filler;\
            fn;\
            mill_epilogue_();\
        }\
    } while(0)

3 其他惊喜

惊喜的点不在多,一两三个也是令人开心的 …

理解了这个goroutine风格协程库的实现,但是也更多地看到了好的设计思想,看大神的代码就是有种“听君一席话,胜读十年书”的感觉。

感慨:路漫漫其修远兮,吾将上下而求索!

2 Introduction

2.1 libmill简介

libmill是一个面向c语言的协程库,其下载地址、文档可以在这里找到:libmill, 其源代码托管在github上,点击这里查看:libmill-source

2.2 libmill vs goroutine

libmill协程库是基于goroutine移植的,libmill的api友好,与golang中的api非常接近,如下图所示。

libmill_vs_goroutine

虽然二者api比较一致,但是在实现上还是有较大区别,所以这里说“libmill是goroutine风格的协程库”只是api上接近。

  • 在libmill里面所有的协程调度都是在当前线程中的 也就是说一个单线程程序使用了libmill实现的协程,并且协程执行过程中使用了阻塞的系统调用,这样会阻塞整个进程。
  • goroutine中创建的协程会被分摊到多个物理线程上去执行 goroutine中创建的协程,一个协程中使用了阻塞的系统调用只会阻塞当前线程,并不会阻塞进程中的其他线程运行。

    备注:这里需要注意下,linux下的线程库有两个,比较早的是LinuxThreads线程库,现在用的一般都是Native POSIX Threads Library(nptl),也就是pthread线程库。其中LinuxThreads是用户级线程库,创建的线程内核无感知,调度也是用户态线程调度器自己实现的;而pthread线程库创建的线程都是一个LWP进程,它使用sys_clone()并传递CLONE_THREAD选项来创建一个线程(本质上还是LWP)并且线程所属进程id相同。

上面是libmill的简单介绍,下面开始详细介绍了。

3 Coroutine

3.1 libmill

3.1.1 ABI版本

// ABI应用程序二进制接口
#define MILL_VERSION_CURRENT 19         // 主版本
#define MILL_VERSION_REVISION 1         // 修订版本
#define MILL_VERSION_AGE 1              // 支持过去的几个版本

3.1.2 符号可见性

libmill这里是要编译成共享库的,共享库应该将实现相关的细节屏蔽,只暴露接口给外部调用就好,因此在共享库中有个“符号可见性”的问题(可参考下面的引文)。该工程在在Makefile里通过编译器选项-fvisibility inlines=hidden来设置默认对外隐藏工程中的符号,对于提供给外部使用的接口使用可见性属性__attribute__((visibility("default")))来单独设置其可见性。

Functions with default visibility have a global scope and can be called from other shared objects. Functions with hidden visibility have a local scope and cannot be called from other shared objects. Visibility can be controlled by using either compiler options or visibility attributes. 更多关于符号可见性的描述,可以参考:点击查看

#if !defined __GNUC__ && !defined __clang__
#error "Unsupported compiler!"
#endif

#if defined MILL_NO_EXPORTS
#   define MILL_EXPORT
#else
#   if defined _WIN32
#      ......
#   else
#      if defined __SUNPRO_C
#          ......
#      elif (defined __GNUC__ && __GNUC__ >= 4) || defined __INTEL_COMPILER || defined __clang__
#          define MILL_EXPORT __attribute__ ((visibility("default")))
#      else
#          define MILL_EXPORT
#      endif
#   endif
#endif

如果函数名前有宏MILL_EXPORT,表示该函数具有默认可见性,可在libmill.so外的代码中被调用。这里我们举个两字来说明一下:

// mill_concat_可见性为Makefile中指定的hidden,只能在当前so中使用,对外不可见
#define mill_concat_(x,y) x##y

// mill_now_、mill_mfork_可见性为default,对外可见,可在so外部调用
MILL_EXPORT int64_t mill_now_(void);
MILL_EXPORT pid_t mill_mfork_(void);

libmill.h中涉及到大量的函数名导出的问题,这里由于篇幅的原因不再一一列出。

3.1.3 定时器精度

获取系统时间函数gettimeofday还是比较耗时的,对于频繁需要获取系统时间的情景下,需要对获取到的系统时间做一定的cache。为了保证时间精度,这里的cache更新时间必须要控制好。

如何决定何时更新cache的系统时间呢?rdtsc(read timestamp counter)指令执行只需要几个时钟周期,它返回系统启动后经过的时钟周期数。这里可以根据CPU频率指定一个时钟周期数量作为阈值,当前后两次rdtsc读取到的时钟周期数的差值超过这个阈值再调用gettimeofday来更新系统时间。这里libmill中的定时器timer就是这么实现的。

下面是rdtsc指令的简要说明,详情请查看:wiki rdtsc

The Time Stamp Counter (TSC) is a 64-bit register present on all x86 processors since the Pentium. It counts the number of cycles since reset. The instruction RDTSC returns the TSC in EDX:EAX. In x86-64 mode, RDTSC also clears the higher 32 bits of RAX and RDX. Its opcode is 0F 31.[1] Pentium competitors such as the Cyrix 6x86 did not always have a TSC and may consider RDTSC an illegal instruction. Cyrix included a Time Stamp Counter in their MII.

该头文件涉及源码较多,这里只列出了比较关键的代码,感兴趣的可以查看源代码来了解更多细节。

3.1.4 mill_fdwait关注的io事件

// mill_fdwait关注的读写错误事件
#define MILL_FDW_IN_ 1
#define MILL_FDW_OUT_ 2
#define MILL_FDW_ERR_ 4

3.1.5 协程上下文的保存和切换

3.1.5.1 协程上下文

#if defined __x86_64__
typedef uint64_t *mill_ctx;
#else
typedef sigjmp_buf *mill_ctx;
#endif

3.1.5.2 协程上下文的保存

// x86_64平台下协程上下文保存实现
#if defined(__x86_64__)
......
// 保存当前协程运行时上下文(就是保存处理器硬件上下文到指定内存区域ctx中备用)
//
// 这里使用宏来实现可以避免函数调用堆栈创建、销毁带来的开销,实现更高效地协程切换,
// linux gcc内联汇编,汇编相关参数可以分为“指令部”、“输出部”、“输入部”、“破坏部”,
// 这里将内存变量ctx的值传入寄存器rdx,并将最后rax寄存器的值赋值给变量ret,
// 指令将rax清零,将rbx、r12、rsp、r13、r14、r15、rcx、rdi、rsi依次保存到ctx为起始地址的内存中
#define mill_setjmp_(ctx) ({\
    int ret;\
    asm("lea     LJMPRET%=(%%rip), %%rcx\n\t"\  //==>返回地址(LJMPRET标号处)送入%%rcx
        "xor     %%rax, %%rax\n\t"\
        "mov     %%rbx, (%%rdx)\n\t"\
        "mov     %%rbp, 8(%%rdx)\n\t"\
        "mov     %%r12, 16(%%rdx)\n\t"\
        "mov     %%rsp, 24(%%rdx)\n\t"\
        "mov     %%r13, 32(%%rdx)\n\t"\
        "mov     %%r14, 40(%%rdx)\n\t"\
        "mov     %%r15, 48(%%rdx)\n\t"\
        "mov     %%rcx, 56(%%rdx)\n\t"\         //==>rcx又存储到56(%%rdx)
        "mov     %%rdi, 64(%%rdx)\n\t"\
        "mov     %%rsi, 72(%%rdx)\n\t"\
        "LJMPRET%=:\n\t"\
        : "=a" (ret)\
        : "d" (ctx)\
        : "memory", "rcx", "r8", "r9", "r10", "r11",\
          "xmm0", "xmm1", "xmm2", "xmm3", "xmm4", "xmm5", "xmm6", "xmm7",\
          "xmm8", "xmm9", "xmm10", "xmm11", "xmm12", "xmm13", "xmm14", "xmm15"\
          MILL_CLOBBER\
          );\
    ret;\
})

3.1.5.3 协程上下文的恢复

// x86_64平台下协程上下文恢复实现

#if defined(__x86_64__)
......
// 恢复协程上下文信息到处理器中(从ctx开始的内存区域中加载之前保存的处理器硬件上下文)
//
// 要恢复某个协程cr的运行时,先获取其挂起之前保存的上下文cr->ctx,然后mill_longjmp(ctx)即可,
// 将ctx值加载到rax,采用相对寻址依次加载ctx为起始地址的内存区域中保存的上下文信息到寄存器,
// 最后回复执行
#define mill_longjmp_(ctx) \
    asm("movq   (%%rax), %%rbx\n\t"\
	    "movq   8(%%rax), %%rbp\n\t"\
	    "movq   16(%%rax), %%r12\n\t"\
	    "movq   24(%%rax), %%rdx\n\t"\
	    "movq   32(%%rax), %%r13\n\t"\
	    "movq   40(%%rax), %%r14\n\t"\
	    "mov    %%rdx, %%rsp\n\t"\
	    "movq   48(%%rax), %%r15\n\t"\
	    "movq   56(%%rax), %%rdx\n\t"\     //==>56(%%rax)中地址为返回地址,送入%%rdx
	    "movq   64(%%rax), %%rdi\n\t"\
	    "movq   72(%%rax), %%rsi\n\t"\
	    "jmp    *%%rdx\n\t"\
        : : "a" (ctx) : "rdx" \
    )
#else
// 非x86_64要借助sigsetjmp\siglongjmp来实现协程上下文切换
#define mill_setjmp_(ctx) \
    sigsetjmp(*ctx, 0)
#define mill_longjmp_(ctx) \
    siglongjmp(*ctx, 1)
#endif

3.1.6 go(func)实现

go(func)的作用是挂起当前协程,并在一个新创建的协程中运行指定的函数func,func执行完成后再销毁新创建的协程,并重新调度其他协程运行(也包括当前协程)。

mill_go_(fn)的实现,我花了不少时间才看懂,这里还要感谢libmill贡献者raedwulf对我的帮助,通过交流他一下就明白了我困惑的源头并给我指出了不该忽略的关键4行代码!

...

int mill_anchor[mill_unoptimisable1_];\
mill_unoptimisable2_ = &mill_anchor;\
char mill_filler[(char*)&mill_anchor - (char*)(mill_sp)];\
mill_unoptimisable2_ = &mill_filler;\

...

这4行代码确实令人困惑,Stack Overflow上的朋友们看到提问的这个问题甚至给踩了好几次,同事看后也觉得有点无厘头,也难怪被raedwulf戏称为black magic around c language!

// go()的实现
#define mill_go_(fn) \
    do {\
        void *mill_sp;\
        // 获取当前正在运行的协程上下文,并及时进行保存,因为我们马上要调整栈帧了
        mill_ctx ctx = mill_getctx_();\
        if(!mill_setjmp_(ctx)) {\
            // 为即将新创建的协程分配对应的内存空间,并返回stack部分的当前栈顶(等于栈底)位置
            mill_sp = mill_prologue_(MILL_HERE_);\
            // 下面4行代码困扰了我很久,原因还是对linux c、gcc、栈帧分配不够精通。
            // - 栈帧中的空间分配,对于编译时可确定尺寸的就编译时分配,通过sub <size>,%rsp来实现;
            // - 栈帧中的空间分配,运行时才可以确定的就需要运行时分配,通过alloca(size)来实现;
            // 注意:
            // - gcc在x86_64下alloca的工作主要是sub <size>,%rsp外加一些内存对齐的操作,alloca在当
            //   前栈帧中分配空间并返回空间起始地址,但是不检查栈是否越界;
            // - 另外指针运算是无符号计算,小地址减去大地址的结果会在整个虚拟内存地址空间中滚动;
            // - mill_filler数组的分配是由alloca完成,分配完成后rsp将被调整为mill_sp指向的内存空间;
            // - 新的协程将以mill_sp作为当前栈顶运行,等当前协程恢复上下文并运行时,其根本意识不
            //   到该所谓的mill_filler的存在,因为保存其上下文操作是早于栈调整操作的;
            int mill_anchor[mill_unoptimisable1_];\
            mill_unoptimisable2_ = &mill_anchor;\
            char mill_filler[(char*)&mill_anchor - (char*)(mill_sp)];\
            mill_unoptimisable2_ = &mill_filler;\
            // 在新创建的协程栈空间中调用函数fn
            fn;\
            // fn执行结束后释放占用的协程内存空间,并mill_suspend让出cpu给其他协程
            mill_epilogue_();\
        }\
    } while(0)

3.1.7 chan实现

3.1.7.1 常用数据结构

// 这里只是声明,定义在cr.h中
struct mill_chan_;

typedef struct{
    void *f1; 
    void *f2; 
    void *f3; 
    void *f4; 
    void *f5; 
    void *f6; 
    int f7; 
    int f8; 
    int f9;
} mill_clause_;

#define MILL_CLAUSELEN_ (sizeof(mill_clause_))

3.1.7.2 发送数据到chan

// 发送type类型的数据value到channel
#define mill_chs__(channel, type, value) \
    do {\
        type mill_val = (value);\
        mill_chs_((channel), &mill_val, sizeof(type), MILL_HERE_);\
    } while(0)

3.1.7.3 从chan接收数据

// 从channel接收type类型的数据
#define mill_chr__(channel, type) \
    (*(type*)mill_chr_((channel), sizeof(type), MILL_HERE_))

3.1.7.4 chan操作结束

// 用type类型数据value来标记channel操作结束
#define mill_chdone__(channel, type, value) \
    do {\
        type mill_val = (value);\
        mill_chdone_((channel), &mill_val, sizeof(type), MILL_HERE_);\
    } while(0)

3.1.8 choose从句实现

3.1.8.1 从句初始化

#define mill_choose_init__ \
    {\
        mill_choose_init_(MILL_HERE_);\
        int mill_idx = -2;\
        while(1) {\
            if(mill_idx != -2) {\
                if(0)

3.1.8.2 读就绪事件

#define mill_choose_in__(chan, type, name, idx) \
                    break;\
                }\
                goto mill_concat_(mill_label, idx);\
            }\
            char mill_concat_(mill_clause, idx)[MILL_CLAUSELEN_];\
            mill_choose_in_(\
                &mill_concat_(mill_clause, idx)[0],\
                (chan),\
                sizeof(type),\
                idx);\
            if(0) {\
                type name;\
                mill_concat_(mill_label, idx):\
                if(mill_idx == idx) {\
                    name = *(type*)mill_choose_val_(sizeof(type));\
                    goto mill_concat_(mill_dummylabel, idx);\
                    mill_concat_(mill_dummylabel, idx)

3.1.8.3 写就绪事件

#define mill_choose_out__(chan, type, val, idx) \
                    break;\
                }\
                goto mill_concat_(mill_label, idx);\
            }\
            char mill_concat_(mill_clause, idx)[MILL_CLAUSELEN_];\
            type mill_concat_(mill_val, idx) = (val);\
            mill_choose_out_(\
                &mill_concat_(mill_clause, idx)[0],\
                (chan),\
                &mill_concat_(mill_val, idx),\
                sizeof(type),\
                idx);\
            if(0) {\
                mill_concat_(mill_label, idx):\
                if(mill_idx == idx) {\
                    goto mill_concat_(mill_dummylabel, idx);\
                    mill_concat_(mill_dummylabel, idx)

3.1.8.4 deadline实现

#define mill_choose_deadline__(ddline, idx) \
                    break;\
                }\
                goto mill_concat_(mill_label, idx);\
            }\
            mill_choose_deadline_(ddline);\
            if(0) {\
                mill_concat_(mill_label, idx):\
                if(mill_idx == -1) {\
                    goto mill_concat_(mill_dummylabel, idx);\
                    mill_concat_(mill_dummylabel, idx)

3.1.8.5 otherwise实现

#define mill_choose_otherwise__(idx) \
                    break;\
                }\
                goto mill_concat_(mill_label, idx);\
            }\
            mill_choose_otherwise_();\
            if(0) {\
                mill_concat_(mill_label, idx):\
                if(mill_idx == -1) {\
                    goto mill_concat_(mill_dummylabel, idx);\
                    mill_concat_(mill_dummylabel, idx)

3.1.8.6 从句结束

#define mill_choose_end__ \
                    break;\
                }\
            }\
            mill_idx = mill_choose_wait_();\
        }

3.2 stack

stack是coroutine stack的抽象,这里coroutine stack可以看作是一个trick,我们把要并发执行的任务放在一个coroutine stack上执行,并且允许程序上下文在这些并发的任务之间来回切换,以实现更细粒度的并发。每个并发任务都有一个coroutine stack与之对应,每个任务中都涉及到对栈的操作,对栈的操作与普通程序对栈的操作一样都是从高地址向低地址方向增长的,这是由编译器决定的。

3.2.1 stack.h

/* Purges all the existing cached stacks and preallocates 'count' new stacks
   of size 'stack_size'. Sets errno in case of error. */
void mill_preparestacks(int count, size_t stack_size);

/* Allocates new stack. Returns pointer to the *top* of the stack.
   For now we assume that the stack grows downwards. */
void *mill_allocstack(size_t *stack_size);

/* Deallocates a stack. The argument is pointer to the top of the stack. */
void mill_freestack(void *stack);

3.2.2 stack.c

// 获取内存页面大小(只查询一次)
static size_t mill_page_size(void) {
    static long pgsz = 0;
    if(mill_fast(pgsz))
        return (size_t)pgsz;
    pgsz = sysconf(_SC_PAGE_SIZE);
    mill_assert(pgsz > 0);
    return (size_t)pgsz;
}

// stack size,也可以由用户指定
static size_t mill_stack_size = 256 * 1024 - 256;

// 实际的stack size
static size_t mill_sanitised_stack_size = 0;

// 获取stack size
static size_t mill_get_stack_size(void) {
#if defined HAVE_POSIX_MEMALIGN && HAVE_MPROTECT
    /* If sanitisation was already done, return the precomputed size. */
    if(mill_fast(mill_sanitised_stack_size))
        return mill_sanitised_stack_size;
    mill_assert(mill_stack_size > mill_page_size());
    /* Amount of memory allocated must be multiply of the page size otherwise
       the behaviour of posix_memalign() is undefined. */
    size_t sz = (mill_stack_size + mill_page_size() - 1) &
        ~(mill_page_size() - 1);
    /* Allocate one additional guard page. */
    mill_sanitised_stack_size = sz + mill_page_size();
    return mill_sanitised_stack_size;
#else
    return mill_stack_size;
#endif
}

// 未使用的cached的stack的最大数量
// 如果我们的代码还在一个stack上运行那就不能释放它,因此至少需要有一个cached的stack
static int mill_max_cached_stacks = 64;

// 未使用的coroutine stacks构成的stack
//
// 该stack用于快速分配coroutine stack,当一个coroutine被释放其之前的stack被放置在栈顶,
// 假如此时有新的coroutine创建,那么该stack对应的虚拟内存页面有极大的概率还在RAM中,
// 因此可以减少page miss的几率,快速分配coroutine stack的目的就达到了
static int mill_num_cached_stacks = 0;
static struct mill_slist mill_cached_stacks = {0};

// 分配coroutine stack,返回地址为stack+mill_stack_size,即栈顶(栈从高地址向低地址方向增长)
static void *mill_allocstackmem(void) {
    void *ptr;
#if defined HAVE_POSIX_MEMALIGN && HAVE_MPROTECT
    /* Allocate the stack so that it's memory-page-aligned. */
    int rc = posix_memalign(&ptr, mill_page_size(), mill_get_stack_size());
    if(mill_slow(rc != 0)) {
        errno = rc;
        return NULL;
    }
    /* The bottom page is used as a stack guard. This way stack overflow will
       cause segfault rather than randomly overwrite the heap. */
    rc = mprotect(ptr, mill_page_size(), PROT_NONE);
    if(mill_slow(rc != 0)) {
        int err = errno;
        free(ptr);
        errno = err;
        return NULL;
    }
#else
    ptr = malloc(mill_get_stack_size());
    if(mill_slow(!ptr)) {
        errno = ENOMEM;
        return NULL;
    }
#endif
    return (void*)(((char*)ptr) + mill_get_stack_size());
}

// 预分配coroutine stacks(分配count个栈尺寸为stack_size的协程栈)
void mill_preparestacks(int count, size_t stack_size) {
    // 释放cached的所有coroutine stack
    while(1) {
        struct mill_slist_item *item = mill_slist_pop(&mill_cached_stacks);
        if(!item)
            break;
        free(((char*)(item + 1)) - mill_get_stack_size());
    }
    // 现在没有分配的coroutine stacks,可以调整一下stack尺寸了
    size_t old_stack_size = mill_stack_size;
    size_t old_sanitised_stack_size = mill_sanitised_stack_size;
    mill_stack_size = stack_size;
    mill_sanitised_stack_size = 0;
    // 分配新的coroutine stacks并cache起来备用
    int i;
    for(i = 0; i != count; ++i) {
        void *ptr = mill_allocstackmem();
        if(!ptr)
            goto error;
        struct mill_slist_item *item = ((struct mill_slist_item*)ptr) - 1;
        mill_slist_push_back(&mill_cached_stacks, item);
    }
    mill_num_cached_stacks = count;
    // 确保这里分配的coroutine stacks不会被销毁,即便当前没有使用
    mill_max_cached_stacks = count;
    errno = 0;
    return;
error:
    // 如果无法分配所有的coroutine stacks,那就一个也不分配(已分配的释放),还原状态并返回错误
    while(1) {
        struct mill_slist_item *item = mill_slist_pop(&mill_cached_stacks);
        if(!item)
            break;
        free(((char*)(item + 1)) - mill_get_stack_size());
    }
    mill_num_cached_stacks = 0;
    mill_stack_size = old_stack_size;
    mill_sanitised_stack_size = old_sanitised_stack_size;
    errno = ENOMEM;
}

// 分配一个coroutine stack(先从cached stacks里面取,如果没有获取到再从内存分配)
void *mill_allocstack(size_t *stack_size) {
    if(!mill_slist_empty(&mill_cached_stacks)) {
        --mill_num_cached_stacks;
        return (void*)(mill_slist_pop(&mill_cached_stacks) + 1);
    }
    void *ptr = mill_allocstackmem();
    if(!ptr)
        mill_panic("not enough memory to allocate coroutine stack");
    if(stack_size)
        *stack_size = mill_get_stack_size();
    return ptr;
}

// 释放coroutine stack(参数stack为栈底)
// 如果当前cached stacks小于阈值则将当前待释放的stack cache起来,反之释放其内存
void mill_freestack(void *stack) {
    /* Put the stack to the list of cached stacks. */
    struct mill_slist_item *item = ((struct mill_slist_item*)stack) - 1;
    mill_slist_push_back(&mill_cached_stacks, item);
    if(mill_num_cached_stacks < mill_max_cached_stacks) {
        ++mill_num_cached_stacks;
        return;
    }
    /* We can't deallocate the stack we are running on at the moment.
       Standard C free() is not required to work when it deallocates its
       own stack from underneath itself. Instead, we'll deallocate one of
       the unused cached stacks. */
    item = mill_slist_pop(&mill_cached_stacks);
    void *ptr = ((char*)(item + 1)) - mill_get_stack_size();
#if HAVE_POSIX_MEMALIGN && HAVE_MPROTECT
    int rc = mprotect(ptr, mill_page_size(), PROT_READ|PROT_WRITE);
    mill_assert(rc == 0);
#endif
    free(ptr);
}

3.3 chan

通过通信来共享数据,而非通过共享数据来通信”,这是golang里chan的设计思想,libmill中也基于这一思想实现了chan。chan可以理解为管道。

3.3.1 chan.h

// choose语句是根据chan的状态来决定是否执行对应动作的分支控制语句
//
// 每个协程都会有一个choose数据结构来跟踪其当前正在执行的choose操作
struct mill_choosedata {
    // 每个choose语句中,又包含了多个从句构成的列表
    struct mill_slist clauses;
    // choose语句中otherwise从句是可选的,是否有otherwise从句,0否1是
    int othws;
    // 当前choose语句中,是否有指定deadline,未指定时为-1
    int64_t ddline;
    // 当前choose语句中,chan上事件就绪的从句数量
    int available;
};

// chan ep是对chan的使用者的描述,每个ep要么利用chan发送消息,要么接收消息
//
// 每个chan有一个sender和receiver,所以每个chan包括了sender、receiver两个mill_ep成员
struct mill_ep {
    // 类型(数据发送方 或 数据接收方)
    enum {MILL_SENDER, MILL_RECEIVER} type;
    // 初始化的choose操作的序号
    int seqnum;
    // choose语句中引用该mill_ep的从句数量
    int refs;
    // choose语句中引用该mill_ep并且已经处理过的数量
    int tmp;
    // choose语句中仍然在等待该mill_ep上事件就绪的从句列表
    struct mill_list clauses;
};

// chan
struct mill_chan_ {
    // channel里面存储的元素的尺寸(单位字节)
    size_t sz;
    // 每个chan上有一个seader和receiver
    // sender记录了等待在chan上执行数据发送操作的从句列表,receiver则记录了等待接收数据的从句列表
    struct mill_ep sender;
    struct mill_ep receiver;
    // 当前chan的引用计数(引用计数为0的时候chclose才会真正释放资源)
    int refcount;
    // 该chan上是否已经调用了chdone(),0否1是
    int done;
    // 存储消息数据的缓冲区紧跟在chan结构体后面
    // - bufsz代表消息缓冲区可容纳的最大消息数量
    // - items表示缓冲区中当前的消息数量
    // - first代表缓冲区中可接收的下一个消息的位置,缓冲区末尾有一个元素来存储chdone()写的数据
    size_t bufsz;
    size_t items;
    size_t first;
    // 调试信息
    struct mill_debug_chan debug;
};

// 该结构体代表choose语句中的一个从句,例如in、out、otherwise
struct mill_clause {
    // 等待this.ep事件就绪的从句列表(迭代器)
    struct mill_list_item epitem;
    // 该从句隶属的choose语句所包含的从句列表(迭代器)
    struct mill_slist_item chitem;
    // 创建该从句的协程
    struct mill_cr *cr;
    // 该从句正在等待的chan endpoint
    struct mill_ep *ep;
    // 对于out从句,val指向要发送的数据;对于in从句,val为NULL
    void *val;
    // 该从句执行完成后要跳转到第idx个从句
    int idx;
    // 是否有与当前从句匹配的pee(比如当前从句为ch上的写,是否有ch上的读从句),0否1是
    int available;
    // 该从句是否在chan的sender或receiver列表中,0否1是
    int used;
};

// 返回包含该endpoint的chan
struct mill_chan_ *mill_getchan(struct mill_ep *ep);

3.3.2 chan.c

// 每个choose语句都要分配一个单独的序号
static int mill_choose_seqnum = 0;
// 返回包含ep的chan(根据端点类型获取)
struct mill_chan_ *mill_getchan(struct mill_ep *ep) {
    switch(ep->type) {
    case MILL_SENDER:
        return mill_cont(ep, struct mill_chan_, sender);
    case MILL_RECEIVER:
        return mill_cont(ep, struct mill_chan_, receiver);
    default:
        assert(0);
    }
}
// 创建一个chan
struct mill_chan_ *mill_chmake_(size_t sz, size_t bufsz, const char *created) {
    mill_preserve_debug();
    // 分配消息缓冲区的时候多申请一个元素空间用于存chdone()提交的数据,
    // chdone不能写消息缓冲区,因为会因为缓冲区满而阻塞chdone()操作,
    // libmill是单线程调度,一个阻塞就会导致整个进程被阻塞了
    struct mill_chan_ *ch = 
        (struct mill_chan_*)malloc(sizeof(struct mill_chan_) + (sz * (bufsz + 1)));
    if(!ch)
        return NULL;
    mill_register_chan(&ch->debug, created);
    // 初始化chan
    ch->sz = sz;
    ch->sender.type = MILL_SENDER;
    ch->sender.seqnum = mill_choose_seqnum;
    mill_list_init(&ch->sender.clauses);
    ch->receiver.type = MILL_RECEIVER;
    ch->receiver.seqnum = mill_choose_seqnum;
    mill_list_init(&ch->receiver.clauses);
    ch->refcount = 1;
    ch->done = 0;
    ch->bufsz = bufsz;
    ch->items = 0;
    ch->first = 0;
    mill_trace(created, "<%d>=chmake(%d)", (int)ch->debug.id, (int)bufsz);
    return ch;
}
// dup操作,只是增加chan引用计数
struct mill_chan_ *mill_chdup_(struct mill_chan_ *ch, const char *current) {
    if(mill_slow(!ch))
        mill_panic("null channel used");
    mill_trace(current, "chdup(<%d>)", (int)ch->debug.id);
    ++ch->refcount;
    return ch;
}
// 关闭chan,实际上减少引用计数直到为0再释放chan
void mill_chclose_(struct mill_chan_ *ch, const char *current) {
    if(mill_slow(!ch))
        mill_panic("null channel used");
    mill_trace(current, "chclose(<%d>)", (int)ch->debug.id);
    assert(ch->refcount > 0);
    --ch->refcount;
    if(ch->refcount)
        return;
    // 仍有依赖该chan的从句存在的话,关闭chan会出错
    if(!mill_list_empty(&ch->sender.clauses) ||
          !mill_list_empty(&ch->receiver.clauses))
        mill_panic("attempt to close a channel while it is still being used");
    mill_unregister_chan(&ch->debug);
    // 释放chan
    free(ch);
}
// 唤醒一个因为调用mill_choose_wait而阻塞的协程
// 
// choose从句中协程因为等待io事件而阻塞,所以这里唤醒阻塞的协程也意味着要清除掉这里的从句
static void mill_choose_unblock(struct mill_clause *cl) {
    struct mill_slist_item *it;
    struct mill_clause *itcl;
    for(it = mill_slist_begin(&cl->cr->choosedata.clauses); it; it = mill_slist_next(it)) {
        itcl = mill_cont(it, struct mill_clause, chitem);
        // 如果当前从句不再当前chan的sender/receiver列表中则不予处理;
        // 已经在的话则要将该从句删除,正式因为这个从句的io事件使得协程被阻塞的
        if(!itcl->used)
            continue;
        mill_list_erase(&itcl->ep->clauses, &itcl->epitem);
    }
    // 如果有指定deadline,也删除对应的定时器
    if(cl->cr->choosedata.ddline >= 0)
        mill_timer_rm(&cl->cr->timer);
    // 恢复该协程的执行
    mill_resume(cl->cr, cl->idx);
}
// choose语句初始化
static void mill_choose_init(const char *current) {
    mill_set_current(&mill_running->debug, current);
    mill_slist_init(&mill_running->choosedata.clauses);
    mill_running->choosedata.othws = 0;
    mill_running->choosedata.ddline = -1;
    mill_running->choosedata.available = 0;
    ++mill_choose_seqnum;
}

void mill_choose_init_(const char *current) {
    mill_trace(current, "choose()");
    mill_running->state = MILL_CHOOSE;
    mill_choose_init(current);
}
// choose in从句
void mill_choose_in_(void *clause, struct mill_chan_ *ch, size_t sz, int idx) {
    if(mill_slow(!ch))
        mill_panic("null channel used");
    if(mill_slow(ch->sz != sz))
        mill_panic("receive of a type not matching the channel");
    // 检查当前从句对应的可读事件是否就绪,就绪则++available记录一下
    int available = ch->done || !mill_list_empty(&ch->sender.clauses) || ch->items ? 1 : 0;
    if(available)
        ++mill_running->choosedata.available;
    // 如果当前从句可读事件未就绪,但是当前运行协程中choose语句中有从句事件就绪,返回
    if(!available && mill_running->choosedata.available)
        return;
    /* Fill in the clause entry. */
    struct mill_clause *cl = (struct mill_clause*) clause;
    cl->cr = mill_running;
    cl->ep = &ch->receiver;
    cl->val = NULL;
    cl->idx = idx;
    cl->available = available;
    cl->used = 1;
    mill_slist_push_back(&mill_running->choosedata.clauses, &cl->chitem);
    if(cl->ep->seqnum == mill_choose_seqnum) {
        ++cl->ep->refs;
        return;
    }
    cl->ep->seqnum = mill_choose_seqnum;
    cl->ep->refs = 1;
    cl->ep->tmp = -1;
}

// choose out从句
void mill_choose_out_(void *clause, struct mill_chan_ *ch, void *val, size_t sz, int idx) {
    if(mill_slow(!ch))
        mill_panic("null channel used");
    // 调用了chdone的chan不能再执行写操作
    if(mill_slow(ch->done))
        mill_panic("send to done-with channel");
    if(mill_slow(ch->sz != sz))
        mill_panic("send of a type not matching the channel");
    // 检查chan上是否写就绪
    int available = !mill_list_empty(&ch->receiver.clauses) || ch->items < ch->bufsz ? 1 : 0;
    if(available)
        ++mill_running->choosedata.available;
    // 如果chan上没有写就绪事件,但是当前协程上有其他choose从句事件就绪,返回
    if(!available && mill_running->choosedata.available)
        return;
    /* Fill in the clause entry. */
    struct mill_clause *cl = (struct mill_clause*) clause;
    cl->cr = mill_running;
    cl->ep = &ch->sender;
    cl->val = val;
    cl->available = available;
    cl->idx = idx;
    cl->used = 1;
    mill_slist_push_back(&mill_running->choosedata.clauses, &cl->chitem);
    if(cl->ep->seqnum == mill_choose_seqnum) {
        ++cl->ep->refs;
        return;
    }
    cl->ep->seqnum = mill_choose_seqnum;
    cl->ep->refs = 1;
    cl->ep->tmp = -1;
}
// choose从句deadline对应的超时回调,销毁所有的choose从句并resume协程
static void mill_choose_callback(struct mill_timer *timer) {
    struct mill_cr *cr = mill_cont(timer, struct mill_cr, timer);
    struct mill_slist_item *it;
    for(it = mill_slist_begin(&cr->choosedata.clauses); it; it = mill_slist_next(it)) {
        struct mill_clause *itcl = mill_cont(it, struct mill_clause, chitem);
        mill_assert(itcl->used);
        mill_list_erase(&itcl->ep->clauses, &itcl->epitem);
    }
    mill_resume(cr, -1);
}

// choose deadline从句
void mill_choose_deadline_(int64_t ddline) {
    if(mill_slow(mill_running->choosedata.othws || mill_running->choosedata.ddline >= 0))
        mill_panic("multiple 'otherwise' or 'deadline' clauses in a choose statement");
    if(ddline < 0)
        return;
    mill_running->choosedata.ddline = ddline;
}

// choose otherwise从句
void mill_choose_otherwise_(void) {
    if(mill_slow(mill_running->choosedata.othws ||
          mill_running->choosedata.ddline >= 0))
        mill_panic("multiple 'otherwise' or 'deadline' clauses in a choose statement");
    mill_running->choosedata.othws = 1;
}

// 往chan追加数据val
static void mill_enqueue(struct mill_chan_ *ch, void *val) {
    // 如果chan上还有关联的receiver执行choose in从句,唤醒对应的协程收数据(当然先写数据再唤醒)
    if(!mill_list_empty(&ch->receiver.clauses)) {
        mill_assert(ch->items == 0);
        struct mill_clause *cl = mill_cont(
            mill_list_begin(&ch->receiver.clauses), struct mill_clause, epitem);
        // 写数据
        memcpy(mill_valbuf(cl->cr, ch->sz), val, ch->sz);
        // 唤醒收数据的协程
        mill_choose_unblock(cl);
        return;
    }
    // 只写数据
    assert(ch->items < ch->bufsz);
    size_t pos = (ch->first + ch->items) % ch->bufsz;
    memcpy(((char*)(ch + 1)) + (pos * ch->sz) , val, ch->sz);
    ++ch->items;
}

// 从chan中取队首的数据val
static void mill_dequeue(struct mill_chan_ *ch, void *val) {
    // 拿chan上sender的第一个choose out从句
    struct mill_clause *cl = mill_cont(
        mill_list_begin(&ch->sender.clauses), struct mill_clause, epitem);
    // chan中valbuf当前无数据可读
    if(!ch->items) {
        // 调用了chdone后肯定没有sender要发送数据了,直接拷走数据即可(chdone追加的)
        if(mill_slow(ch->done)) {
            mill_assert(!cl);
            memcpy(val, ((char*)(ch + 1)) + (ch->bufsz * ch->sz), ch->sz);
            return;
        }
        // 还没有调用chdone,直接从choose out从句中拷走数据,再唤醒因为执行choose out阻塞的协程
        mill_assert(cl);
        memcpy(val, cl->val, ch->sz);
        mill_choose_unblock(cl);
        return;
    }
    // chan中valbuf当前有数据可读
    // - 读取chan中的数据;
    // - 如果对应的choose out从句cl存在,则拷贝其数据到chan valbuf并唤醒执行该从句的协程
    memcpy(val, ((char*)(ch + 1)) + (ch->first * ch->sz), ch->sz);
    ch->first = (ch->first + 1) % ch->bufsz;
    --ch->items;
    if(cl) {
        assert(ch->items < ch->bufsz);
        size_t pos = (ch->first + ch->items) % ch->bufsz;
        memcpy(((char*)(ch + 1)) + (pos * ch->sz) , cl->val, ch->sz);
        ++ch->items;
        mill_choose_unblock(cl);
    }
}

// choose wait从句
int mill_choose_wait_(void) {
    struct mill_choosedata *cd = &mill_running->choosedata;
    struct mill_slist_item *it;
    struct mill_clause *cl;

    // 每个协程都有一个对应的choosedata数据结构
    //    
    // 如果当前有就绪的choose in/out从句,则选择一个并执行
    if(cd->available > 0) {
        // 只有1个就绪的choose从句直接去检查el->ep->type就知道干什么了
        // 如果有多个就绪的choose从句,随机选择一个就绪的从句去执行
        int chosen = cd->available == 1 ? 0 : (int)(random() % (cd->available));
        
        for(it = mill_slist_begin(&cd->clauses); it; it = mill_slist_next(it)) {
            cl = mill_cont(it, struct mill_clause, chitem);
            if(!cl->available)
                continue;
            if(!chosen)
                break;
            --chosen;
        }
        struct mill_chan_ *ch = mill_getchan(cl->ep);
        // 根据choose从句类型决定是向chan发送数据,还是从chan读取数据
        if(cl->ep->type == MILL_SENDER)
            mill_enqueue(ch, cl->val);
        else
            mill_dequeue(ch, mill_valbuf(cl->cr, ch->sz));
        mill_resume(mill_running, cl->idx);
        return mill_suspend();
    }

    // 如果没有choose in/out从句事件就绪但是有otherwise从句,直接执行otherwise从句
    // - 这里实际上相当于将当前运行的协程重新加入调度队列,然后主动挂起当前协程
    if(cd->othws) {
        mill_resume(mill_running, -1);
        return mill_suspend();
    }

    // 如果指定了deadline从句,为其启动一个定时器,并绑定超时回调
    if(cd->ddline >= 0)
        mill_timer_add(&mill_running->timer, cd->ddline, mill_choose_callback);

    // 其他情况下,将当前协程和被查询的chan进行注册,等到直到有一个choose从句unblock
    for(it = mill_slist_begin(&cd->clauses); it; it = mill_slist_next(it)) {
        cl = mill_cont(it, struct mill_clause, chitem);
        if(mill_slow(cl->ep->refs > 1)) {
            if(cl->ep->tmp == -1)
                cl->ep->tmp =
                    cl->ep->refs == 1 ? 0 : (int)(random() % cl->ep->refs);
            if(cl->ep->tmp) {
                --cl->ep->tmp;
                cl->used = 0;
                continue;
            }
            cl->ep->tmp = -2;
        }
        mill_list_insert(&cl->ep->clauses, &cl->epitem, NULL);
    }
    // 如果有多个协程并发的执行chdone,只可能有一个执行成功,其他的都必须阻塞在下面这行
    return mill_suspend();
}

// 获取正在运行的协程的chan数据存储缓冲区valbuf
void *mill_choose_val_(size_t sz) {
    return mill_valbuf(mill_running, sz);
}

// 向chan中发送数据
void mill_chs_(struct mill_chan_ *ch, void *val, size_t sz,
      const char *current) {
    if(mill_slow(!ch))
        mill_panic("null channel used");
    mill_trace(current, "chs(<%d>)", (int)ch->debug.id);
    mill_choose_init(current);
    mill_running->state = MILL_CHS;
    struct mill_clause cl;
    mill_choose_out_(&cl, ch, val, sz, 0);
    mill_choose_wait_();
}

// 从chan中接收数据
void *mill_chr_(struct mill_chan_ *ch, size_t sz, const char *current) {
    if(mill_slow(!ch))
        mill_panic("null channel used");
    mill_trace(current, "chr(<%d>)", (int)ch->debug.id);
    mill_running->state = MILL_CHR;
    mill_choose_init(current);
    struct mill_clause cl;
    mill_choose_in_(&cl, ch, sz, 0);
    mill_choose_wait_();
    return mill_choose_val_(sz);
}

// chan上的chdone操作
void mill_chdone_(struct mill_chan_ *ch, void *val, size_t sz,
      const char *current) {
    if(mill_slow(!ch))
        mill_panic("null channel used");
    mill_trace(current, "chdone(<%d>)", (int)ch->debug.id);
    if(mill_slow(ch->done))
        mill_panic("chdone on already done-with channel");
    if(mill_slow(ch->sz != sz))
        mill_panic("send of a type not matching the channel");
    /* Panic if there are other senders on the same channel. */
    if(mill_slow(!mill_list_empty(&ch->sender.clauses)))
        mill_panic("send to done-with channel");
    /* Put the channel into done-with mode. */
    ch->done = 1;
    
    // 在valbuf末尾再追加一个元素,不能chs往valbuf中写因为这样没有receiver的情况下会阻塞
    memcpy(((char*)(ch + 1)) + (ch->bufsz * ch->sz) , val, ch->sz);
    
    // 追加上述一个多余的元素后,需要唤醒chan上所有等待的receiver
    while(!mill_list_empty(&ch->receiver.clauses)) {
        struct mill_clause *cl = mill_cont(
            mill_list_begin(&ch->receiver.clauses), struct mill_clause, epitem);
        memcpy(mill_valbuf(cl->cr, ch->sz), val, ch->sz);
        mill_choose_unblock(cl);
    }
}

3.4 cr

3.4.1 cr.h

// coroutine state
enum mill_state {
    MILL_READY,         //可以被调度
    MILL_MSLEEP,        //mill_suspend挂起等待mill_resume唤醒
    MILL_FDWAIT,        //mill_fdwait_,等待mill_poller_wait或者timer回调唤醒
    MILL_CHR,           //...
    MILL_CHS,           //...
    MILL_CHOOSE         //...
};

/* 
   协程内存布局如下:
   +----------------------------------------------------+--------+---------+
   |                                              stack | valbuf | mill_cr |
   +----------------------------------------------------+--------+---------+
   - mill_cr:包括coroutine的通用信息
   - valbuf:临时存储从chan中接收到的数据
   - stack:标准的c程序栈,栈从高地址向低地址方向增长
*/
struct mill_cr {
    // 协程状态,用于调试目的
    enum mill_state state;

    // 协程如果没有阻塞并且等待执行,会被加入到ready队列中,并设置is_ready=1;
    // 反之,设置is_ready=0,不加入ready队列中
    int is_ready;
    struct mill_slist_item ready;

    // 如果协程需要等待一个截止时间,就需要下面的定时器来实现超时回调
    struct mill_timer timer;

    // 协程在fdwait中等待fd上的io事件就绪,若fd为-1表示当前协程没关注特定fd上的io事件
    int fd;

    // 协程在fdwait中等待fd上的io就绪事件events,用于调试目的
    int events;

    // 协程执行choose语句时要使用的结构体
    struct mill_choosedata choosedata;

    // 协程暂停、恢复执行的时候需要保存、还原其上下文信息
#if defined(__x86_64__)
    uint64_t ctx[10];
#else
    sigjmp_buf ctx;
#endif

    // suspend挂起协程后resume恢复协程执行,resume第二个参数result会被设置到cr->result成员;
    // 其他协程suspend并切换到被resumed的线程时会return mill_running->result
    int result;

    // 如果协程需要的valbuf比预设的mill_valbuf要大的话,那就得从heap中动态分配;
    // 分配的内存空间地址、尺寸记录在这两个成员中
    void *valbuf;
    size_t valbuf_sz;

    // 协程本地存储(有点类似线程local存储)
    void *clsval;

#if defined MILL_VALGRIND
    /* Valgrind stack identifier. */
    int sid;
#endif

    // 调试信息
    struct mill_debug_cr debug;
};

// 主线程对应的假的coroutine
extern struct mill_cr mill_main;

// 记录当前正在运行的协程
extern struct mill_cr *mill_running;

// 挂起当前正在运行的协程,并切换到一个不同的is_ready=1的协程取运行;
// 一旦某个协程resume这个被挂起的协程,resume中传递的参数result将被该suspend函数返回
int mill_suspend(void);

// 调度之前被挂起的协程cr恢复执行,其实只是将其加入ready队列等待被调度而已
void mill_resume(struct mill_cr *cr, int result);

// 返回一个执行协程临时数据区valbuf的指针,返回的数据区容量至少为size bytes
void *mill_valbuf(struct mill_cr *cr, size_t size);

// 子进程中调用,目的是为了停止运行从父进程继承的协程
void mill_cr_postfork(void);

3.4.2 cr.c

// 协程临时数据区valbuf的大小,这里的临时数据区应该合理对齐;
// 如果当前有任何分配的协程栈,就不应该改变这里的尺寸,可能会影响到协程不同内存区域的计算
size_t mill_valbuf_size = 128;

// 主线程这个假协程对应的valbuf
char mill_main_valbuf[128];

volatile int mill_unoptimisable1_ = 1;
volatile void *mill_unoptimisable2_ = NULL;

// 主协程
struct mill_cr mill_main = {0};

// 默认当前正在运行的协程就是mill_run
struct mill_cr *mill_running = &mill_main;

// 等待被调度的就绪协程队列
struct mill_slist mill_ready = {0};

// 返回当前上下文信息
inline mill_ctx mill_getctx_(void) {
#if defined __x86_64__
    return mill_running->ctx;
#else
    return &mill_running->ctx;
#endif
}

// 返回协程临时数据区valbuf的起始地址
static void *mill_getvalbuf(struct mill_cr *cr, size_t size) {
    // 如果请求较小的valbuf则不需要在heap上动态分配
    // 另外要注意主协程没有为其分配栈,但是单独为其分配了valbuf
    if(mill_fast(cr != &mill_main)) {
        if(mill_fast(size <= mill_valbuf_size))
            return (void*)(((char*)cr) - mill_valbuf_size);
    }
    else {
        if(mill_fast(size <= sizeof(mill_main_valbuf)))
            return (void*)mill_main_valbuf;
    }
    // 如果请求较大的valbuf则需要在heap上动态分配,fixme!!!
    if(mill_fast(cr->valbuf && cr->valbuf_sz <= size))
        return cr->valbuf;
    void *ptr = realloc(cr->valbuf, size);
    if(!ptr)
        return NULL;
    cr->valbuf = ptr;
    cr->valbuf_sz = size;
    return cr->valbuf;
}

// 预准备count个协程,并分别初始化其栈尺寸、valbuf、valbuf_sz
void mill_goprepare_(int count, size_t stack_size, size_t val_size) {
    if(mill_slow(mill_hascrs())) {errno = EAGAIN; return;}
    // poller初始化
    mill_poller_init();
    if(mill_slow(errno != 0)) return;
    // 可能的话尅设置val_size稍微大一点以便能合理内存对齐
    mill_valbuf_size = (val_size + 15) & ~((size_t)0xf);
    // 为主协程(假的)分配valbuf
    if(mill_slow(!mill_getvalbuf(&mill_main, mill_valbuf_size))) {
        errno = ENOMEM;
        return;
    }
    // 为协程分配栈(这里分配时计算了stack+valbuf+mill_cr,是一个完整协程的内存空间大小)
    mill_preparestacks(count, stack_size + mill_valbuf_size + sizeof(struct mill_cr));
}

// 挂起当前正在运行的协程,并切换到一个is_ready=1的协程上去执行
// 被挂起的协程需要另一个协程调用resume(cr, result)方法来恢复其执行,恢复后suspend将返回result
int mill_suspend(void) {
    /* Even if process never gets idle, we have to process external events
       once in a while. The external signal may very well be a deadline or
       a user-issued command that cancels the CPU intensive operation. */
    static int counter = 0;
    if(counter >= 103) {
        mill_wait(0);
        counter = 0;
    }
    // 保存当前协程运行时的上下文信息
    if(mill_running) {
        mill_ctx ctx = mill_getctx_();
        if (mill_setjmp_(ctx))
            return mill_running->result;
    }
    while(1) {
        // 寻找一个is_ready=1的可运行的协程并恢复其执行
        if(!mill_slist_empty(&mill_ready)) {
            ++counter;
            struct mill_slist_item *it = mill_slist_pop(&mill_ready);
            mill_running = mill_cont(it, struct mill_cr, ready);
            mill_assert(mill_running->is_ready == 1);
            mill_running->is_ready = 0;
            mill_longjmp_(mill_getctx_());
        }
        // 找不到就要wait,可能要挂起当前协程直到被外部事件唤醒(io事件或者定时器超时)
        mill_wait(1);
        mill_assert(!mill_slist_empty(&mill_ready));
        counter = 0;
    }
}

// 恢复一个协程的运行,每个协程cr都在其内部保存了其运行时上下文信息
// 这里其实只是将其重新加入就绪队列等待被调度而已
inline void mill_resume(struct mill_cr *cr, int result) {
    mill_assert(!cr->is_ready);
    cr->result = result;
    cr->state = MILL_READY;
    cr->is_ready = 1;
    mill_slist_push_back(&mill_ready, &cr->ready);
}

/* mill_prologue_() and mill_epilogue_() live in the same scope with
   libdill's stack-switching black magic. As such, they are extremely
   fragile. Therefore, the optimiser is prohibited to touch them. */
#if defined __clang__
#define dill_noopt __attribute__((optnone))
#elif defined __GNUC__
#define dill_noopt __attribute__((optimize("O0")))
#else
#error "Unsupported compiler!"
#endif

// go()开始部分,启动一个新的协程,返回指向栈顶的指针
__attribute__((noinline)) dill_noopt 
void *mill_prologue_(const char *created) {
    // 先从cache中取,取不到动态分配
    struct mill_cr *cr = ((struct mill_cr*)mill_allocstack(NULL)) - 1;
    mill_register_cr(&cr->debug, created);
    cr->is_ready = 0;
    cr->valbuf = NULL;
    cr->valbuf_sz = 0;
    cr->clsval = NULL;
    cr->timer.expiry = -1;
    cr->fd = -1;
    cr->events = 0;
    // 挂起父协程并调度新创建的协程来运行
    mill_resume(mill_running, 0); 
    mill_running = cr;
    // 计算返回valbuf栈顶尺寸
    return (void*)(((char*)cr) - mill_valbuf_size);
}

// go结束部分,协程结束的时候执行清零动作
__attribute__((noinline)) dill_noopt
void mill_epilogue_(void) {
    mill_trace(NULL, "go() done");
    mill_unregister_cr(&mill_running->debug);
    if(mill_running->valbuf)
        free(mill_running->valbuf);
#if defined MILL_VALGRIND
    //......
#endif
    mill_freestack(mill_running + 1);
    mill_running = NULL;
    // 考虑到这里没有运行中的协程了,所以mill_suspend永远不会返回了
    mill_suspend();
}

void mill_yield_(const char *current) {
    mill_trace(current, "yield()");
    mill_set_current(&mill_running->debug, current);
    // 这里看起来有点可疑,但是没问题,我们可以在挂起一个协程之前就resume它来执行;
    // 这样做的目的是为了suspend之后能够使该协程重新获得被调度执行的机会
    mill_resume(mill_running, 0);
    mill_suspend();
}

// 返回valbuf起始地址
void *mill_valbuf(struct mill_cr *cr, size_t size) {
    void *ptr = mill_getvalbuf(cr, size);
    if(!ptr)
        mill_panic("not enough memory to receive from channel");
    return ptr;
}

// 返回协程本地存储指针
void *mill_cls_(void) {
    return mill_running->clsval;
}

// 设置协程本地存储操作
void mill_setcls_(void *val) {
    mill_running->clsval = val;
}

// fork之后子进程清空就绪协程队列列表
void mill_cr_postfork(void) {
    /* Drop all coroutines in the "ready to execute" list. */
    mill_slist_init(&mill_ready);
}

3.5 mfork

// 创建子进程
pid_t mill_mfork_(void) {
    pid_t pid = fork();
    if(pid != 0) {
        // 父进程
        return pid;
    }
    // 子进程,这里会对子进程进行一些特殊的处理
    // 包括重新初始化协程队列mill_ready、fd监听pollset、定时器timers list
    mill_cr_postfork();
    mill_poller_postfork();
    mill_timer_postfork();
    return 0;
}

4 Network

4.1 tcp

// tcp接收缓冲大小
// 根据Ethernet MTU(1500字节)进行配置的,小了不是最优但是大了也没有实质好处
// 这里设置的小一点以满足IPv4/IPv6的headers,多省几个字节出来可以控制IP、TCP选项
#ifndef MILL_TCP_BUFLEN
#define MILL_TCP_BUFLEN (1500 - 68)
#endif

// tcp socket 类型
enum mill_tcptype {
   MILL_TCPLISTENER,
   MILL_TCPCONN
};

struct mill_tcpsock_ {
    enum mill_tcptype type;
};

// tcp listen socket
struct mill_tcplistener {
    struct mill_tcpsock_ sock;
    int fd;
    int port;
};

// tcp conn socket
struct mill_tcpconn {
    struct mill_tcpsock_ sock;
    int fd;
    size_t ifirst;  // 接收缓冲区剩余数据的起始位置
    size_t ilen;    // 接收缓冲区剩余数据的长度
    size_t olen;    // 发送缓冲区剩余数据的长度
    char ibuf[MILL_TCP_BUFLEN]; // 接收缓冲区
    char obuf[MILL_TCP_BUFLEN]; // 发送缓冲区
    ipaddr addr;    // peer socket地址
};

// tcp socket tune(设为非阻塞模式、地址立即可重用(主动关闭不进入timed wait)、屏蔽SIGPIPE)
static void mill_tcptune(int s) {
    /* Make the socket non-blocking. */
    int opt = fcntl(s, F_GETFL, 0);
    if (opt == -1)
        opt = 0;
    int rc = fcntl(s, F_SETFL, opt | O_NONBLOCK);
    mill_assert(rc != -1);
    /*  Allow re-using the same local address rapidly. */
    opt = 1;
    rc = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));
    mill_assert(rc == 0);
    /* If possible, prevent SIGPIPE signal when writing to the connection
        already closed by the peer. */
#ifdef SO_NOSIGPIPE
    opt = 1;
    rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof (opt));
    mill_assert (rc == 0 || errno == EINVAL);
#endif
}

// tcp conn socket 初始化
static void tcpconn_init(struct mill_tcpconn *conn, int fd) {
    conn->sock.type = MILL_TCPCONN;
    conn->fd = fd;
    conn->ifirst = 0;
    conn->ilen = 0;
    conn->olen = 0;
}

// tcp listen socket 初始化(非阻塞socket)
struct mill_tcpsock_ *mill_tcplisten_(ipaddr addr, int backlog) {
    /* Open the listening socket. */
    int s = socket(mill_ipfamily(addr), SOCK_STREAM, 0);
    if(s == -1)
        return NULL;
    mill_tcptune(s);

    /* Start listening. */
    int rc = bind(s, (struct sockaddr*)&addr, mill_iplen(addr));
    if(rc != 0)
        return NULL;
    rc = listen(s, backlog);
    if(rc != 0)
        return NULL;

    // 如果参数addr中没有指定port信息,bind的时候os回自动分配一个,这里需要获取分配的port
    int port = mill_ipport(addr);
    if(!port) {
        ipaddr baddr;
        socklen_t len = sizeof(ipaddr);
        rc = getsockname(s, (struct sockaddr*)&baddr, &len);
        if(rc == -1) {
            int err = errno;
            fdclean(s);
            close(s);
            errno = err;
            return NULL;
        }
        port = mill_ipport(baddr);
    }

    /* Create the object. */
    struct mill_tcplistener *l = malloc(sizeof(struct mill_tcplistener));
    if(!l) {
        fdclean(s);
        close(s);
        errno = ENOMEM;
        return NULL;
    }
    l->sock.type = MILL_TCPLISTENER;
    l->fd = s;
    l->port = port;
    errno = 0;
    return &l->sock;
}

// 获取peer socket对应的port信息(统一处理连接socket和监听socket)
int mill_tcpport_(struct mill_tcpsock_ *s) {
    if(s->type == MILL_TCPCONN) {
        struct mill_tcpconn *c = (struct mill_tcpconn*)s;
        return mill_ipport(c->addr);
    }
    else if(s->type == MILL_TCPLISTENER) {
        struct mill_tcplistener *l = (struct mill_tcplistener*)s;
        return l->port;
    }
    mill_assert(0);
}

// tcp listen socket 接受一个连接(非阻塞方式accept,tcp conn socket设为非阻塞)
struct mill_tcpsock_ *mill_tcpaccept_(struct mill_tcpsock_ *s, int64_t deadline) {
    if(s->type != MILL_TCPLISTENER)
        mill_panic("trying to accept on a socket that isn't listening");
    struct mill_tcplistener *l = (struct mill_tcplistener*)s;
    socklen_t addrlen;
    ipaddr addr;
    while(1) {
        /* Try to get new connection (non-blocking). */
        addrlen = sizeof(addr);
        int as = accept(l->fd, (struct sockaddr *)&addr, &addrlen);
        if (as >= 0) {
            mill_tcptune(as);
            struct mill_tcpconn *conn = malloc(sizeof(struct mill_tcpconn));
            if(!conn) {
                fdclean(as);
                close(as);
                errno = ENOMEM;
                return NULL;
            }
            tcpconn_init(conn, as);
            conn->addr = addr;
            errno = 0;
            return (tcpsock)conn;
        }
        mill_assert(as == -1);
        if(errno != EAGAIN && errno != EWOULDBLOCK)
            return NULL;
        /* Wait till new connection is available. */
        int rc = fdwait(l->fd, FDW_IN, deadline);
        if(rc == 0) {
            errno = ETIMEDOUT;
            return NULL;
        }
        if(rc & FDW_ERR)
            return NULL;
        mill_assert(rc == FDW_IN);
    }
}

// tcp conn socket 连接到指定地址(非阻塞方式)
struct mill_tcpsock_ *mill_tcpconnect_(ipaddr addr, int64_t deadline) {
    /* Open a socket. */
    int s = socket(mill_ipfamily(addr), SOCK_STREAM, 0);
    if(s == -1)
        return NULL;
    mill_tcptune(s);

    /* Connect to the remote endpoint. */
    int rc = connect(s, (struct sockaddr*)&addr, mill_iplen(addr));
    if(rc != 0) {
        mill_assert(rc == -1);
        if(errno != EINPROGRESS)
            return NULL;
        rc = fdwait(s, FDW_OUT, deadline);
        if(rc == 0) {
            errno = ETIMEDOUT;
            return NULL;
        }
        int err;
        socklen_t errsz = sizeof(err);
        rc = getsockopt(s, SOL_SOCKET, SO_ERROR, (void*)&err, &errsz);
        if(rc != 0) {
            err = errno;
            fdclean(s);
            close(s);
            errno = err;
            return NULL;
        }
        if(err != 0) {
            fdclean(s);
            close(s);
            errno = err;
            return NULL;
        }
    }

    /* Create the object. */
    struct mill_tcpconn *conn = malloc(sizeof(struct mill_tcpconn));
    if(!conn) {
        fdclean(s);
        close(s);
        errno = ENOMEM;
        return NULL;
    }
    tcpconn_init(conn, s);
    errno = 0;
    return (tcpsock)conn;
}

// tcp socket 发送(非阻塞方式)
size_t mill_tcpsend_(struct mill_tcpsock_ *s, const void *buf, size_t len, int64_t deadline) {
    if(s->type != MILL_TCPCONN)
        mill_panic("trying to send to an unconnected socket");
    struct mill_tcpconn *conn = (struct mill_tcpconn*)s;

    // 如果发送缓冲区剩余空间可以容纳待发送数据,直接拷贝到发送缓冲
    if(conn->olen + len <= MILL_TCP_BUFLEN) {
        memcpy(&conn->obuf[conn->olen], buf, len);
        conn->olen += len;
        errno = 0;
        return len;
    }

    // 如果剩余空间不太够,则先发送完发送缓冲中的数据
    tcpflush(s, deadline);
    if(errno != 0)
        return 0;

    // tcpflush不一定全部发送完(如超时返回)
    // 继续检查剩余空间是不是够容纳带发送缓冲,可以则直接拷贝到发送缓冲
    if(conn->olen + len <= MILL_TCP_BUFLEN) {
        memcpy(&conn->obuf[conn->olen], buf, len);
        conn->olen += len;
        errno = 0;
        return len;
    }

    // 尝试tcpflush之后发送缓冲还是不够大则直接就地发送,即以指定buf为发送缓冲进行发送
    char *pos = (char*)buf;
    size_t remaining = len;
    while(remaining) {
        ssize_t sz = send(conn->fd, pos, remaining, 0);
        if(sz == -1) {
            /* Operating systems are inconsistent w.r.t. returning EPIPE and
               ECONNRESET. Let's paper over it like this. */
            if(errno == EPIPE) {
                errno = ECONNRESET;
                return 0;
            }
            if(errno != EAGAIN && errno != EWOULDBLOCK)
                return 0;
            int rc = fdwait(conn->fd, FDW_OUT, deadline);
            if(rc == 0) {
                errno = ETIMEDOUT;
                return len - remaining;
            }
            continue;
        }
        pos += sz;
        remaining -= sz;
    }
    errno = 0;
    return len;
}

// tcp conn socket flush发送缓冲数据(非阻塞方式)
void mill_tcpflush_(struct mill_tcpsock_ *s, int64_t deadline) {
    if(s->type != MILL_TCPCONN)
        mill_panic("trying to send to an unconnected socket");
    struct mill_tcpconn *conn = (struct mill_tcpconn*)s;
    if(!conn->olen) {
        errno = 0;
        return;
    }
    char *pos = conn->obuf;
    size_t remaining = conn->olen;
    while(remaining) {
        ssize_t sz = send(conn->fd, pos, remaining, 0);
        if(sz == -1) {
            /* Operating systems are inconsistent w.r.t. returning EPIPE and
               ECONNRESET. Let's paper over it like this. */
            if(errno == EPIPE) {
                errno = ECONNRESET;
                return;
            }
            if(errno != EAGAIN && errno != EWOULDBLOCK)
                return;
            int rc = fdwait(conn->fd, FDW_OUT, deadline);
            if(rc == 0) {
                errno = ETIMEDOUT;
                return;
            }
            continue;
        }
        pos += sz;
        remaining -= sz;
    }
    conn->olen = 0;
    errno = 0;
}

// tcp conn socket 接收数据(非阻塞方式)
size_t mill_tcprecv_(struct mill_tcpsock_ *s, void *buf, size_t len, int64_t deadline) {
    if(s->type != MILL_TCPCONN)
        mill_panic("trying to receive from an unconnected socket");
    struct mill_tcpconn *conn = (struct mill_tcpconn*)s;

    // 如果接收缓冲中有足够多数据,则直接返回len长度的数据给buf
    if(conn->ilen >= len) {
        memcpy(buf, &conn->ibuf[conn->ifirst], len);
        conn->ifirst += len;
        conn->ilen -= len;
        errno = 0;
        return len;
    }

    // 接收缓冲中数据少于请求数据量,先拷贝已接收的部分,再继续接收数据
    char *pos = (char*)buf;
    size_t remaining = len;
    memcpy(pos, &conn->ibuf[conn->ifirst], conn->ilen);
    pos += conn->ilen;
    remaining -= conn->ilen;
    conn->ifirst = 0;
    conn->ilen = 0;

    // 继续接收剩余数据
    mill_assert(remaining);
    while(1) {

        if(remaining > MILL_TCP_BUFLEN) {
            // 如果请求数据量大于tcp接收缓冲大小,为了减少系统调用次数直接就地recv
            ssize_t sz = recv(conn->fd, pos, remaining, 0);
            if(!sz) {
		        errno = ECONNRESET;
		        return len - remaining;
            }
            if(sz == -1) {
                if(errno != EAGAIN && errno != EWOULDBLOCK)
                    return len - remaining;
                sz = 0;
            }
            if((size_t)sz == remaining) {
                errno = 0;
                return len;
            }
            pos += sz;
            remaining -= sz;
        }
        else {
            // 剩余请求数据量小于接收缓存大小,但是仍按MILL_TCP_BUFLEN进行接收,
            // 这样可以减少后续mill_tcprecv时调用系统调用recv的次数
            ssize_t sz = recv(conn->fd, conn->ibuf, MILL_TCP_BUFLEN, 0);
            if(!sz) {
		        errno = ECONNRESET;
		        return len - remaining;
            }
            if(sz == -1) {
                if(errno != EAGAIN && errno != EWOULDBLOCK)
                    return len - remaining;
                sz = 0;
            }
            if((size_t)sz < remaining) {
                memcpy(pos, conn->ibuf, sz);
                pos += sz;
                remaining -= sz;
                conn->ifirst = 0;
                conn->ilen = 0;
            }
            else {
                memcpy(pos, conn->ibuf, remaining);
                conn->ifirst = remaining;
                conn->ilen = sz - remaining;
                errno = 0;
                return len;
            }
        }

        // 等待数据可读事件就绪,继续读取更多数据
        int res = fdwait(conn->fd, FDW_IN, deadline);
        if(!res) {
            errno = ETIMEDOUT;
            return len - remaining;
        }
    }
}

// tcp conn socket 接收数据(遇到指定分界符会停止接收数据,非阻塞工作方式)
size_t mill_tcprecvuntil_(struct mill_tcpsock_ *s, void *buf, size_t len,
      const char *delims, size_t delimcount, int64_t deadline) {
    if(s->type != MILL_TCPCONN)
        mill_panic("trying to receive from an unconnected socket");
    char *pos = (char*)buf;
    size_t i;
    for(i = 0; i != len; ++i, ++pos) {
        size_t res = tcprecv(s, pos, 1, deadline);
        if(res == 1) {
            size_t j;
            for(j = 0; j != delimcount; ++j)
                if(*pos == delims[j])
                    return i + 1;
        }
        if (errno != 0)
            return i + res;
    }
    errno = ENOBUFS;
    return len;
}

// tcp conn socket 关闭(读关闭或写关闭或both)
void mill_tcpshutdown_(struct mill_tcpsock_ *s, int how) {
    mill_assert(s->type == MILL_TCPCONN);
    struct mill_tcpconn *c = (struct mill_tcpconn*)s;
    int rc = shutdown(c->fd, how);
    mill_assert(rc == 0 || errno == ENOTCONN);
}

// tcp socket 关闭(统一处理listen socket和conn socket)
void mill_tcpclose_(struct mill_tcpsock_ *s) {
    if(s->type == MILL_TCPLISTENER) {
        struct mill_tcplistener *l = (struct mill_tcplistener*)s;
        fdclean(l->fd);
        int rc = close(l->fd);
        mill_assert(rc == 0);
        free(l);
        return;
    }
    if(s->type == MILL_TCPCONN) {
        struct mill_tcpconn *c = (struct mill_tcpconn*)s;
        fdclean(c->fd);
        int rc = close(c->fd);
        mill_assert(rc == 0);
        free(c);
        return;
    }
    mill_assert(0);
}

// 获取tcp连接peer socket地址
ipaddr mill_tcpaddr_(struct mill_tcpsock_ *s) {
    if(s->type != MILL_TCPCONN)
        mill_panic("trying to get address from a socket that isn't connected");
    struct mill_tcpconn *l = (struct mill_tcpconn *)s;
    return l->addr;
}

/* This function is to be used only internally by libmill. Take into account
   that once there are data in tcpsock's tx/rx buffers, the state of fd may
   not match the state of tcpsock object. Works only on connected sockets. */
int mill_tcpfd(struct mill_tcpsock_ *s) {
    return ((struct mill_tcpconn*)s)->fd;
}

4.2 udp

// udp socket
struct mill_udpsock_ {
    int fd;
    int port;
};

// udp socket tune(设置为nonblocking)
static void mill_udptune(int s) {
    /* Make the socket non-blocking. */
    int opt = fcntl(s, F_GETFL, 0);
    if (opt == -1)
        opt = 0;
    int rc = fcntl(s, F_SETFL, opt | O_NONBLOCK);
    mill_assert(rc != -1);
}

// udp listen socket 创建(socket为非阻塞)
struct mill_udpsock_ *mill_udplisten_(ipaddr addr) {
    /* Open the listening socket. */
    int s = socket(mill_ipfamily(addr), SOCK_DGRAM, 0);
    if(s == -1)
        return NULL;
    mill_udptune(s);

    /* Start listening. */
    int rc = bind(s, (struct sockaddr*)&addr, mill_iplen(addr));
    if(rc != 0)
        return NULL;

    // 参数addr中可能没有指定port信息(用户可能希望监听指定ip、自动分配的port),
    // 此时需要获取os自动分配的port信息
    int port = mill_ipport(addr);
    if(!port) {
        ipaddr baddr;
        socklen_t len = sizeof(ipaddr);
        rc = getsockname(s, (struct sockaddr*)&baddr, &len);
        if(rc == -1) {
            int err = errno;
            fdclean(s);
            close(s);
            errno = err;
            return NULL;
        }
        port = mill_ipport(baddr);
    }

    /* Create the object. */
    struct mill_udpsock_ *us = malloc(sizeof(struct mill_udpsock_));
    if(!us) {
        fdclean(s);
        close(s);
        errno = ENOMEM;
        return NULL;
    }
    us->fd = s;
    us->port = port;
    errno = 0;
    return us;
}

// unix socket 获取绑定的port
int mill_udpport_(struct mill_udpsock_ *s) {
    return s->port;
}

// udp socket 发送(socket需提前设置为非阻塞方式)
void mill_udpsend_(struct mill_udpsock_ *s, ipaddr addr, const void *buf, size_t len) {
    struct sockaddr *saddr = (struct sockaddr*) &addr;
    ssize_t ss = sendto(s->fd, buf, len, 0, saddr, saddr->sa_family ==
        AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6));
    if(mill_fast(ss == (ssize_t)len)) {
        errno = 0;
        return;
    }
    mill_assert(ss < 0);
    if(errno == EAGAIN || errno == EWOULDBLOCK)
        errno = 0;
}

// udp socket 接收(socket需提前设置为非阻塞方式)
size_t mill_udprecv_(struct mill_udpsock_ *s, ipaddr *addr, void *buf, size_t len, int64_t deadline) {
    ssize_t ss;
    while(1) {
        socklen_t slen = sizeof(ipaddr);
        ss = recvfrom(s->fd, buf, len, 0, (struct sockaddr*)addr, &slen);
        if(ss >= 0)
            break;
        if(errno != EAGAIN && errno != EWOULDBLOCK)
            return 0;
        int rc = fdwait(s->fd, FDW_IN, deadline);
        if(rc == 0) {
            errno = ETIMEDOUT;
            return 0;
        }
    }
    errno = 0;
    return (size_t)ss;
}

// udp socket 关闭
void mill_udpclose_(struct mill_udpsock_ *s) {
    fdclean(s->fd);
    int rc = close(s->fd);
    mill_assert(rc == 0);
    free(s);
}

这里udp socket接收、发送之前需要自己显示地进行mill_udptune(int s),这个感觉封装地不友好。

4.3 unix

// unix socket接收缓冲大小
#ifndef MILL_UNIX_BUFLEN
#define MILL_UNIX_BUFLEN (4096)
#endif

// unix socket类型
enum mill_unixtype {
   MILL_UNIXLISTENER,
   MILL_UNIXCONN
};

// unix socket
struct mill_unixsock_ {
    enum mill_unixtype type;
};

// unix listen socket
struct mill_unixlistener {
    struct mill_unixsock_ sock;
    int fd;
};

// unix conn socket
struct mill_unixconn {
    struct mill_unixsock_ sock;
    int fd;
    size_t ifirst;  // 接收缓冲区中剩余数据起始位置
    size_t ilen;    // 接收缓冲区中剩余数据长度
    size_t olen;    // 发送缓冲区中剩余数据长度
    char ibuf[MILL_UNIX_BUFLEN];    // 接收缓冲区
    char obuf[MILL_UNIX_BUFLEN];    // 发送缓冲区
};

// unix socket tune (设为nonblocking & 屏蔽SIGPIPE)
static void mill_unixtune(int s) {
    /* Make the socket non-blocking. */
    int opt = fcntl(s, F_GETFL, 0);
    if (opt == -1)
        opt = 0;
    int rc = fcntl(s, F_SETFL, opt | O_NONBLOCK);
    mill_assert(rc != -1);
    /* If possible, prevent SIGPIPE signal when writing to the connection
        already closed by the peer. */
#ifdef SO_NOSIGPIPE
    opt = 1;
    rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof (opt));
    mill_assert (rc == 0 || errno == EINVAL);
#endif
}

// unix socket 地址设置
static int mill_unixresolve(const char *addr, struct sockaddr_un *su) {
    mill_assert(su);
    if (strlen(addr) >= sizeof(su->sun_path)) {
        errno = EINVAL;
        return -1;
    }
    su->sun_family = AF_UNIX;
    strncpy(su->sun_path, addr, sizeof(su->sun_path));
    errno = 0;
    return 0;
}

// unix conn socket 初始化
static void unixconn_init(struct mill_unixconn *conn, int fd) {
    conn->sock.type = MILL_UNIXCONN;
    conn->fd = fd;
    conn->ifirst = 0;
    conn->ilen = 0;
    conn->olen = 0;
}

// unix listen socket 初始化
struct mill_unixsock_ *mill_unixlisten_(const char *addr, int backlog) {
    struct sockaddr_un su;
    int rc = mill_unixresolve(addr, &su);
    if (rc != 0) {
        return NULL;
    }
    /* Open the listening socket. */
    int s = socket(AF_UNIX, SOCK_STREAM, 0);
    if(s == -1)
        return NULL;
    mill_unixtune(s);

    /* Start listening. */
    rc = bind(s, (struct sockaddr*)&su, sizeof(struct sockaddr_un));
    if(rc != 0)
        return NULL;
    rc = listen(s, backlog);
    if(rc != 0)
        return NULL;

    /* Create the object. */
    struct mill_unixlistener *l = malloc(sizeof(struct mill_unixlistener));
    if(!l) {
        fdclean(s);
        close(s);
        errno = ENOMEM;
        return NULL;
    }
    l->sock.type = MILL_UNIXLISTENER;
    l->fd = s;
    errno = 0;
    return &l->sock;
}

// unix listen socket 接受一个连接请求(非阻塞方式)
struct mill_unixsock_ *mill_unixaccept_(struct mill_unixsock_ *s, int64_t deadline) {
    if(s->type != MILL_UNIXLISTENER)
        mill_panic("trying to accept on a socket that isn't listening");
    struct mill_unixlistener *l = (struct mill_unixlistener*)s;
    while(1) {
        /* Try to get new connection (non-blocking). */
        int as = accept(l->fd, NULL, NULL);
        if (as >= 0) {
            mill_unixtune(as);
            struct mill_unixconn *conn = malloc(sizeof(struct mill_unixconn));
            if(!conn) {
                fdclean(as);
                close(as);
                errno = ENOMEM;
                return NULL;
            }
            unixconn_init(conn, as);
            errno = 0;
            return (struct mill_unixsock_*)conn;
        }
        mill_assert(as == -1);
        if(errno != EAGAIN && errno != EWOULDBLOCK)
            return NULL;
        /* Wait till new connection is available. */
        int rc = fdwait(l->fd, FDW_IN, deadline);
        if(rc == 0) {
            errno = ETIMEDOUT;
            return NULL;
        }
        if(rc & FDW_ERR)
            return NULL;
        mill_assert(rc == FDW_IN);
    }
}

// unix socket 连接到服务地址(非阻塞方式)
struct mill_unixsock_ *mill_unixconnect_(const char *addr) {
    struct sockaddr_un su;
    int rc = mill_unixresolve(addr, &su);
    if (rc != 0) {
        return NULL;
    }

    /* Open a socket. */
    int s = socket(AF_UNIX,  SOCK_STREAM, 0);
    if(s == -1)
        return NULL;
    mill_unixtune(s);

    /* Connect to the remote endpoint. */
    rc = connect(s, (struct sockaddr*)&su, sizeof(struct sockaddr_un));
    if(rc != 0) {
        int err = errno;
        mill_assert(rc == -1);
        fdclean(s);
        close(s);
        errno = err;
        return NULL;
    }

    /* Create the object. */
    struct mill_unixconn *conn = malloc(sizeof(struct mill_unixconn));
    if(!conn) {
        fdclean(s);
        close(s);
        errno = ENOMEM;
        return NULL;
    }
    unixconn_init(conn, s);
    errno = 0;
    return (struct mill_unixsock_*)conn;
}

// 创建 unix socket pair (非阻塞方式)
void mill_unixpair_(struct mill_unixsock_ **a, struct mill_unixsock_ **b) {
    if(!a || !b) {
        errno = EINVAL;
        return;
    }
    int fd[2];
    int rc = socketpair(AF_UNIX, SOCK_STREAM, 0, fd);
    if (rc != 0)
        return;
    mill_unixtune(fd[0]);
    mill_unixtune(fd[1]);
    struct mill_unixconn *conn = malloc(sizeof(struct mill_unixconn));
    if(!conn) {
        fdclean(fd[0]);
        close(fd[0]);
        fdclean(fd[1]);
        close(fd[1]);
        errno = ENOMEM;
        return;
    }
    unixconn_init(conn, fd[0]);
    *a = (struct mill_unixsock_*)conn;
    conn = malloc(sizeof(struct mill_unixconn));
    if(!conn) {
        free(*a);
        fdclean(fd[0]);
        close(fd[0]);
        fdclean(fd[1]);
        close(fd[1]);
        errno = ENOMEM;
        return;
    }
    unixconn_init(conn, fd[1]);
    *b = (struct mill_unixsock_*)conn;
    errno = 0;
}

// unix socket send(非阻塞方式)
size_t mill_unixsend_(struct mill_unixsock_ *s, const void *buf, size_t len, int64_t deadline) {
    if(s->type != MILL_UNIXCONN)
        mill_panic("trying to send to an unconnected socket");
    struct mill_unixconn *conn = (struct mill_unixconn*)s;

    // 如果输出缓冲中剩余空间可以容纳待发送数据,则将待发送数据直接拷贝到输出缓冲
    if(conn->olen + len <= MILL_UNIX_BUFLEN) {
        memcpy(&conn->obuf[conn->olen], buf, len);
        conn->olen += len;
        errno = 0;
        return len;
    }

    // 如果输出缓冲剩余空间不能容纳待发送数据,则先发送输出缓冲中数据腾空间
    unixflush(s, deadline);
    if(errno != 0)
        return 0;

    // unixflush不一定把数据发送完(超时返回情况下),需再次检查剩余空间是否能容纳待发送数据,
    // 能容纳则直接将待发送数据拷贝到输出缓冲区中
    if(conn->olen + len <= MILL_UNIX_BUFLEN) {
        memcpy(&conn->obuf[conn->olen], buf, len);
        conn->olen += len;
        errno = 0;
        return len;
    }

    // 经过上述unixflush处理后,发送缓冲区还是无法容纳下待发送数据,则直接就地发送待发送数据
    char *pos = (char*)buf;
    size_t remaining = len;
    while(remaining) {
        ssize_t sz = send(conn->fd, pos, remaining, 0);
        if(sz == -1) {
            /* Operating systems are inconsistent w.r.t. returning EPIPE and
               ECONNRESET. Let's paper over it like this. */
            if(errno == EPIPE) {
                errno = ECONNRESET;
                return 0;
            }
            if(errno != EAGAIN && errno != EWOULDBLOCK)
                return 0;
            int rc = fdwait(conn->fd, FDW_OUT, deadline);
            if(rc == 0) {
                errno = ETIMEDOUT;
                return len - remaining;
            }
            continue;
        }
        pos += sz;
        remaining -= sz;
    }
    errno = 0;
    return len;
}

// unix socket 将发送缓冲区中的数据发送出去(非阻塞方式,超时返回停止发送)
void mill_unixflush_(struct mill_unixsock_ *s, int64_t deadline) {
    if(s->type != MILL_UNIXCONN)
        mill_panic("trying to send to an unconnected socket");
    struct mill_unixconn *conn = (struct mill_unixconn*)s;
    if(!conn->olen) {
        errno = 0;
        return;
    }
    char *pos = conn->obuf;
    size_t remaining = conn->olen;
    while(remaining) {
        ssize_t sz = send(conn->fd, pos, remaining, 0);
        if(sz == -1) {
            /* Operating systems are inconsistent w.r.t. returning EPIPE and
               ECONNRESET. Let's paper over it like this. */
            if(errno == EPIPE) {
                errno = ECONNRESET;
                return;
            }
            if(errno != EAGAIN && errno != EWOULDBLOCK)
                return;
            int rc = fdwait(conn->fd, FDW_OUT, deadline);
            if(rc == 0) {
                errno = ETIMEDOUT;
                return;
            }
            continue;
        }
        pos += sz;
        remaining -= sz;
    }
    conn->olen = 0;
    errno = 0;
}

// unix socket recv(非阻塞方式)
size_t mill_unixrecv_(struct mill_unixsock_ *s, void *buf, size_t len, int64_t deadline) {
    if(s->type != MILL_UNIXCONN)
        mill_panic("trying to receive from an unconnected socket");
    struct mill_unixconn *conn = (struct mill_unixconn*)s;

    // 如果接收缓冲区中有足够的数据,直接拷贝到用户指定buf即可
    if(conn->ilen >= len) {
        memcpy(buf, &conn->ibuf[conn->ifirst], len);
        conn->ifirst += len;
        conn->ilen -= len;
        errno = 0;
        return len;
    }

    // 如果接收缓冲区中没有足够的数据,则先拷贝有的数据到用户指定buf,再继续接收剩余数据
    char *pos = (char*)buf;
    size_t remaining = len;
    memcpy(pos, &conn->ibuf[conn->ifirst], conn->ilen);
    pos += conn->ilen;
    remaining -= conn->ilen;
    conn->ifirst = 0;
    conn->ilen = 0;

    // 继续读取剩余数据
    mill_assert(remaining);
    while(1) {
        if(remaining > MILL_UNIX_BUFLEN) {
            // 如果还有很多数据要读,为减少系统调用次数直接将数据读取到用户指定buf
            ssize_t sz = recv(conn->fd, pos, remaining, 0);
            if(!sz) {
                errno = ECONNRESET;
                return len - remaining;
            }
            if(sz == -1) {
                if(errno != EAGAIN && errno != EWOULDBLOCK)
                    return len - remaining;
                sz = 0;
            }
            if((size_t)sz == remaining) {
                errno = 0;
                return len;
            }
            pos += sz;
            remaining -= sz;
        }
        else {
            // 如果要接收的剩余数据不多了,则接收到接收缓冲区中,再拷贝到用户指定buf
            // 接收MILL_UNIX_BUFLEN的数据,目的是减少之后mill_recv可能引发的recv系统调用的次数
            ssize_t sz = recv(conn->fd, conn->ibuf, MILL_UNIX_BUFLEN, 0);
            if(!sz) {
                errno = ECONNRESET;
                return len - remaining;
            }
            if(sz == -1) {
                if(errno != EAGAIN && errno != EWOULDBLOCK)
                    return len - remaining;
                sz = 0;
            }
            if((size_t)sz < remaining) {
                memcpy(pos, conn->ibuf, sz);
                pos += sz;
                remaining -= sz;
                conn->ifirst = 0;
                conn->ilen = 0;
            }
            else {
                memcpy(pos, conn->ibuf, remaining);
                conn->ifirst = remaining;
                conn->ilen = sz - remaining;
                errno = 0;
                return len;
            }
        }

        // 继续等待后续数据可读事件到达,然后读取(若超时则返回停止接收)
        int res = fdwait(conn->fd, FDW_IN, deadline);
        if(!res) {
            errno = ETIMEDOUT;
            return len - remaining;
        }
    }
}

// unix socket recv 读取len bytes到buf,读取到指定delimiter会停止读取(非阻塞方式)
size_t mill_unixrecvuntil_(struct mill_unixsock_ *s, void *buf, size_t len,
      const char *delims, size_t delimcount, int64_t deadline) {
    if(s->type != MILL_UNIXCONN)
        mill_panic("trying to receive from an unconnected socket");
    unsigned char *pos = (unsigned char*)buf;
    size_t i;
    for(i = 0; i != len; ++i, ++pos) {
        size_t res = unixrecv(s, pos, 1, deadline);
        if(res == 1) {
            size_t j;
            for(j = 0; j != delimcount; ++j)
                if(*pos == delims[j])
                    return i + 1;
        }
        if (errno != 0)
            return i + res;
    }
    errno = ENOBUFS;
    return len;
}

// 关闭socket,写关闭或者读关闭或者both
void mill_unixshutdown_(struct mill_unixsock_ *s, int how) {
    mill_assert(s->type == MILL_UNIXCONN);
    struct mill_unixconn *c = (struct mill_unixconn*)s;
    int rc = shutdown(c->fd, how);
    mill_assert(rc == 0 || errno == ENOTCONN);
}

// 关闭unix socket(统一处理监听socket和连接socket)
void mill_unixclose_(struct mill_unixsock_ *s) {
    if(s->type == MILL_UNIXLISTENER) {
        struct mill_unixlistener *l = (struct mill_unixlistener*)s;
        fdclean(l->fd);
        int rc = close(l->fd);
        mill_assert(rc == 0);
        free(l);
        return;
    }
    if(s->type == MILL_UNIXCONN) {
        struct mill_unixconn *c = (struct mill_unixconn*)s;
        fdclean(c->fd);
        int rc = close(c->fd);
        mill_assert(rc == 0);
        free(c);
        return;
    }
    mill_assert(0);
}

4.4 ip

4.4.1 ip.h

int mill_ipfamily(ipaddr addr);
int mill_iplen(ipaddr addr);
int mill_ipport(ipaddr addr);

4.4.2 ip.c

MILL_CT_ASSERT(sizeof(ipaddr) >= sizeof(struct sockaddr_in));
MILL_CT_ASSERT(sizeof(ipaddr) >= sizeof(struct sockaddr_in6));

static struct dns_resolv_conf *mill_dns_conf = NULL;
static struct dns_hosts *mill_dns_hosts = NULL;
static struct dns_hints *mill_dns_hints = NULL;

// 创建一个ip地址为INADDR_ANY、指定端口port、mode=ipv4/ipv6的ip地址
static ipaddr mill_ipany(int port, int mode)
{
    ipaddr addr;
    if(mill_slow(port < 0 || port > 0xffff)) {
        ((struct sockaddr*)&addr)->sa_family = AF_UNSPEC;
        errno = EINVAL;
        return addr;
    }
    if (mode == 0 || mode == IPADDR_IPV4 || mode == IPADDR_PREF_IPV4) {
        struct sockaddr_in *ipv4 = (struct sockaddr_in*)&addr;
        ipv4->sin_family = AF_INET;
        ipv4->sin_addr.s_addr = htonl(INADDR_ANY);
        ipv4->sin_port = htons((uint16_t)port);
    }
    else {
        struct sockaddr_in6 *ipv6 = (struct sockaddr_in6*)&addr;
        ipv6->sin6_family = AF_INET6;
        memcpy(&ipv6->sin6_addr, &in6addr_any, sizeof(in6addr_any));
        ipv6->sin6_port = htons((uint16_t)port);
    }
    errno = 0;
    return addr;
}

// 将点分十进制ipv4地址:端口号=ip:port转换为二进制格式(已转为字节序)
static ipaddr mill_ipv4_literal(const char *addr, int port) {
    ipaddr raddr;
    struct sockaddr_in *ipv4 = (struct sockaddr_in*)&raddr;
    int rc = inet_pton(AF_INET, addr, &ipv4->sin_addr);
    mill_assert(rc >= 0);
    if(rc == 1) {
        ipv4->sin_family = AF_INET;
        ipv4->sin_port = htons((uint16_t)port);
        errno = 0;
        return raddr;
    }
    ipv4->sin_family = AF_UNSPEC;
    errno = EINVAL;
    return raddr;
}

// 将点分ipv4地址:端口号=ipv6:port转换为二进制格式(已转网络字节序)
static ipaddr mill_ipv6_literal(const char *addr, int port) {
    ipaddr raddr;
    struct sockaddr_in6 *ipv6 = (struct sockaddr_in6*)&raddr;
    int rc = inet_pton(AF_INET6, addr, &ipv6->sin6_addr);
    mill_assert(rc >= 0);
    if(rc == 1) {
        ipv6->sin6_family = AF_INET6;
        ipv6->sin6_port = htons((uint16_t)port);
        errno = 0;
        return raddr;
    }
    ipv6->sin6_family = AF_UNSPEC;
    errno = EINVAL;
    return raddr;
}

// 转换ipv4地址或者ipv6地址为二进制地址格式(组合调用上面的函数mill_ipv4/6_literal)
static ipaddr mill_ipliteral(const char *addr, int port, int mode) {
    ipaddr raddr;
    struct sockaddr *sa = (struct sockaddr*)&raddr;
    if(mill_slow(!addr || port < 0 || port > 0xffff)) {
        sa->sa_family = AF_UNSPEC;
        errno = EINVAL;
        return raddr;
    }
    switch(mode) {
        case IPADDR_IPV4:
            return mill_ipv4_literal(addr, port);
        case IPADDR_IPV6:
            return mill_ipv6_literal(addr, port);
        case 0:
        case IPADDR_PREF_IPV4:
            raddr = mill_ipv4_literal(addr, port);
            if(errno == 0)
                return raddr;
            return mill_ipv6_literal(addr, port);
        case IPADDR_PREF_IPV6:
            raddr = mill_ipv6_literal(addr, port);
            if(errno == 0)
                return raddr;
            return mill_ipv4_literal(addr, port);
        default:
            mill_assert(0);
    }
}

// 返回ip地址的地址族
int mill_ipfamily(ipaddr addr) {
    return ((struct sockaddr*)&addr)->sa_family;
}

// 返回ip地址结构体长度
int mill_iplen(ipaddr addr) {
    return mill_ipfamily(addr) == AF_INET ? 
                        sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
}

// 返回ip地址端口号
int mill_ipport(ipaddr addr) {
    return ntohs(mill_ipfamily(addr) == AF_INET ? 
                        ((struct sockaddr_in*)&addr)->sin_port : ((struct sockaddr_in6*)&addr)->sin6_port);
}

// 转换二进制格式的ip地址为点分ipv4或者ipv6地址格式
const char *mill_ipaddrstr_(ipaddr addr, char *ipstr) {
    if (mill_ipfamily(addr) == AF_INET) {
        return inet_ntop(AF_INET, &(((struct sockaddr_in*)&addr)->sin_addr),
            ipstr, INET_ADDRSTRLEN);
    }
    else {
        return inet_ntop(AF_INET6, &(((struct sockaddr_in6*)&addr)->sin6_addr),
            ipstr, INET6_ADDRSTRLEN);
    }
}

// 获取本地ip地址
// 二进制格式地址,name可以为空、可以为ipv4或ipv6点分表示形式、可以为接口名称
ipaddr mill_iplocal_(const char *name, int port, int mode) {
    if(!name)
        return mill_ipany(port, mode);
    ipaddr addr = mill_ipliteral(name, port, mode);
#if defined __sun
    return addr;
#else
    if(errno == 0)
       return addr;
    /* Address is not a literal. It must be an interface name then. */
    struct ifaddrs *ifaces = NULL;
    int rc = getifaddrs (&ifaces);
    mill_assert (rc == 0);
    mill_assert (ifaces);
    /*  Find first IPv4 and first IPv6 address. */
    struct ifaddrs *ipv4 = NULL;
    struct ifaddrs *ipv6 = NULL;
    struct ifaddrs *it;
    for(it = ifaces; it != NULL; it = it->ifa_next) {
        if(!it->ifa_addr)
            continue;
        if(strcmp(it->ifa_name, name) != 0)
            continue;
        switch(it->ifa_addr->sa_family) {
        case AF_INET:
            mill_assert(!ipv4);
            ipv4 = it;
            break;
        case AF_INET6:
            mill_assert(!ipv6);
            ipv6 = it;
            break;
        }
        if(ipv4 && ipv6)
            break;
    }
    /* Choose the correct address family based on mode. */
    switch(mode) {
    case IPADDR_IPV4:
        ipv6 = NULL;
        break;
    case IPADDR_IPV6:
        ipv4 = NULL;
        break;
    case 0:
    case IPADDR_PREF_IPV4:
        if(ipv4)
           ipv6 = NULL;
        break;
    case IPADDR_PREF_IPV6:
        if(ipv6)
           ipv4 = NULL;
        break;
    default:
        mill_assert(0);
    }
    if(ipv4) {
        struct sockaddr_in *inaddr = (struct sockaddr_in*)&addr;
        memcpy(inaddr, ipv4->ifa_addr, sizeof (struct sockaddr_in));
        inaddr->sin_port = htons(port);
        freeifaddrs(ifaces);
        errno = 0;
        return addr;
    }
    if(ipv6) {
        struct sockaddr_in6 *inaddr = (struct sockaddr_in6*)&addr;
        memcpy(inaddr, ipv6->ifa_addr, sizeof (struct sockaddr_in6));
        inaddr->sin6_port = htons(port);
        freeifaddrs(ifaces);
        errno = 0;
        return addr;
    }
    freeifaddrs(ifaces);
    ((struct sockaddr*)&addr)->sa_family = AF_UNSPEC;
    errno = ENODEV;
    return addr;
#endif
}

// 获取远程机器ip地址的二进制格式
// name可以是ipv4或ipv6点分表示形式、域名domain(这里要用到dns查询)
ipaddr mill_ipremote_(const char *name, int port, int mode, int64_t deadline) {
    int rc;
    ipaddr addr = mill_ipliteral(name, port, mode);
    if(errno == 0)
       return addr;
    /* Load DNS config files, unless they are already chached. */
    if(mill_slow(!mill_dns_conf)) {
        /* TODO: Maybe re-read the configuration once in a while? */
        mill_dns_conf = dns_resconf_local(&rc);
        mill_assert(mill_dns_conf);
        mill_dns_hosts = dns_hosts_local(&rc);
        mill_assert(mill_dns_hosts);
        mill_dns_hints = dns_hints_local(mill_dns_conf, &rc);
        mill_assert(mill_dns_hints);
    }
    /* Let's do asynchronous DNS query here. */
    struct dns_resolver *resolver = dns_res_open(mill_dns_conf, mill_dns_hosts,
        mill_dns_hints, NULL, dns_opts(), &rc);
    mill_assert(resolver);
    mill_assert(port >= 0 && port <= 0xffff);
    char portstr[8];
    snprintf(portstr, sizeof(portstr), "%d", port);
    struct addrinfo hints;
    memset(&hints, 0, sizeof(hints));
    hints.ai_family = PF_UNSPEC;
    struct dns_addrinfo *ai = dns_ai_open(name, portstr, DNS_T_A, &hints,
        resolver, &rc);
    mill_assert(ai);
    dns_res_close(resolver);
    struct addrinfo *ipv4 = NULL;
    struct addrinfo *ipv6 = NULL;
    struct addrinfo *it = NULL;
    while(1) {
        rc = dns_ai_nextent(&it, ai);
        if(rc == EAGAIN) {
            int fd = dns_ai_pollfd(ai);
            mill_assert(fd >= 0);
            int events = fdwait(fd, FDW_IN, deadline);
            /* There's no guarantee that the file descriptor will be reused
               in next iteration. We have to clean the fdwait cache here
               to be on the safe side. */
            fdclean(fd);
            if(mill_slow(!events)) {
                errno = ETIMEDOUT;
                return addr;
            }
            mill_assert(events == FDW_IN);
            continue;
        }
        if(rc == ENOENT)
            break;

        if(!ipv4 && it && it->ai_family == AF_INET) {
            ipv4 = it;
        }
        else if(!ipv6 && it && it->ai_family == AF_INET6) {
            ipv6 = it;
        }
        else {
            free(it);
        }
        
        if(ipv4 && ipv6)
            break;
    }
    switch(mode) {
    case IPADDR_IPV4:
        if(ipv6) {
            free(ipv6);
            ipv6 = NULL;
        }
        break;
    case IPADDR_IPV6:
        if(ipv4) {
            free(ipv4);
            ipv4 = NULL;
        }
        break;
    case 0:
    case IPADDR_PREF_IPV4:
        if(ipv4 && ipv6) {
            free(ipv6);
            ipv6 = NULL;
        }
        break;
    case IPADDR_PREF_IPV6:
        if(ipv6 && ipv4) {
            free(ipv4);
            ipv4 = NULL;
        }
        break;
    default:
        mill_assert(0);
    }
    if(ipv4) {
        struct sockaddr_in *inaddr = (struct sockaddr_in*)&addr;
        memcpy(inaddr, ipv4->ai_addr, sizeof (struct sockaddr_in));
        inaddr->sin_port = htons(port);
        dns_ai_close(ai);
        free(ipv4);
        errno = 0;
        return addr;
    }
    if(ipv6) {
        struct sockaddr_in6 *inaddr = (struct sockaddr_in6*)&addr;
        memcpy(inaddr, ipv6->ai_addr, sizeof (struct sockaddr_in6));
        inaddr->sin6_port = htons(port);
        dns_ai_close(ai);
        free(ipv6);
        errno = 0;
        return addr;
    }
    dns_ai_close(ai);
    ((struct sockaddr*)&addr)->sa_family = AF_UNSPEC;
    errno = EADDRNOTAVAIL;
    return addr;
}

5 IO Multiplex

5.1 poller

libmill里面网络io是基于io多路复用实现的,在linux平台下面基于epoll实现,在其他平台下面基于对应的io多路复用系统调用实现,这里只关注linux平台下的实现细节。

5.1.1 poller.h

void mill_poller_init(void);

// poller也实现了libmill.h中定义的mill_wait()和mill_fdwait()

// 等待至少一个coroutine恢复执行
// 如果block=0,轮询事件时没有事件就绪则立即返回
// 如果block=1,轮询事件时没有事件就绪则阻塞,直到至少有一个事件就绪
void mill_wait(int block);

// 子进程中调用该函数将创建一个全新的pollset,与父进程的pollset脱离关系
void mill_poller_postfork(void);

5.1.2 poller.c

/* Forward declarations for the functions implemented by specific poller
   mechanisms (poll, epoll, kqueue). */
void mill_poller_init(void);
static void mill_poller_add(int fd, int events);
static void mill_poller_rm(struct mill_cr *cr);
static void mill_poller_clean(int fd);
static int mill_poller_wait(int timeout);

// poller是否已经被初始化过,1是,0否
static int mill_poller_initialised = 0;

// 检查poller是否已初始化,没有初始化则初始化,已经初始化不处理
#define check_poller_initialised() \
do {\
    if(mill_slow(!mill_poller_initialised)) {\
        mill_poller_init();\
        mill_assert(errno == 0);\
        mill_main.fd = -1;\
        mill_main.timer.expiry = -1;\
        mill_poller_initialised = 1;\
    }\
} while(0)

// 挂起当前运行中的coroutine一定的时间
void mill_msleep_(int64_t deadline, const char *current) {
    mill_fdwait_(-1, 0, deadline, current);
}

// 
static void mill_poller_callback(struct mill_timer *timer) {
    struct mill_cr *cr = mill_cont(timer, struct mill_cr, timer);
    mill_resume(cr, -1);
    if (cr->fd != -1)
        mill_poller_rm(cr);
}

// 等待fd上的events就绪或者直到超时
int mill_fdwait_(int fd, int events, int64_t deadline, const char *current) {
    check_poller_initialised();
    // 如果指定了deadline,则添加一个定时器,超时回调mill_poller_callback唤醒定时器关联的协程
    if(deadline >= 0)
        mill_timer_add(&mill_running->timer, deadline, mill_poller_callback);
    // 如果指定了fd,则注册coroutine对该fd上的io事件监听
    if(fd >= 0)
        mill_poller_add(fd, events);
    // 执行了实际的wait,mill_suspend挂起当前协程切换到其他ready状态的协程
    mill_running->state = fd < 0 ? MILL_MSLEEP : MILL_FDWAIT;
    mill_running->fd = fd;
    mill_running->events = events;
    mill_set_current(&mill_running->debug, current);
    // 当前协程要等待其他协程调用mill_resume来唤醒当前协程才会从这里继续向下执行
    // mill_suspend返回值为mill_resume(struct mill_cr *cr, int result)中的参数result
    // 谁来唤醒当前协程呢?epfd事件轮询的时候有fd事件就绪则会唤醒等待在该fd上的协程!
    int rc = mill_suspend();
    /* Handle file descriptor events. */
    if(rc >= 0) {
        mill_assert(!mill_timer_enabled(&mill_running->timer));
        return rc;
    }
    /* Handle the timeout. */
    mill_assert(mill_running->fd == -1);
    return 0;
}

// 将fd从epfd上取消注册
void mill_fdclean_(int fd) {
    check_poller_initialised();
    mill_poller_clean(fd);
}

// 事件轮询,block=1表示阻塞直到有事件到达(这里的事件包括定时器超时事件、io就绪事件) 
void mill_wait(int block) {
    check_poller_initialised();
    while(1) {
        // 计算下次轮询的超时时间
        int timeout = block ? mill_timer_next() : 0;
        // 检查fd上的io事件
        int fd_fired = mill_poller_wait(timeout);
        // 检查定时器超时时间
        int timer_fired = mill_timer_fire();
        // 非阻塞情况下不重试
        if(!block || fd_fired || timer_fired)
            break;
        /* If timeout was hit but there were no expired timers do the poll
           again. This should not happen in theory but let's be ready for the
           case when the system timers are not precise. */
    }
}

// libmill在linux平台下基于epoll实现事件轮询
#include "epoll.inc"

5.1.3 epoll.inc

#define MILL_ENDLIST 0xffffffff
#define MILL_EPOLLSETSIZE 128

// 全局pollset,linux下其实是个epfd
static int mill_efd = -1;

// epoll允许为每个fd注册一个单独的指针,我们对每个fd需要注册两个coroutine指针,
// 一个是等待从fd接收数据的coroutine指针,一个是等待向fd发送数据的coroutine指针
// 因此,我们需要创建一个coroutine指针pair数组来跟踪打开的socket fds上的io
struct mill_crpair {
    struct mill_cr *in;
    struct mill_cr *out;
    uint32_t currevs;
    // 1-based索引值,0代表不属于这个list,MILL_ENDLIST代表list结束(通过数组索引实现的链表)
    uint32_t next;
};

static struct mill_crpair *mill_crpairs = NULL;
static int mill_ncrpairs = 0;
static uint32_t mill_changelist = MILL_ENDLIST;

// poller初始化
void mill_poller_init(void) {
    // 检查系统允许的进程最大可打开文件数量
    struct rlimit rlim;
    int rc = getrlimit(RLIMIT_NOFILE, &rlim);
    if(mill_slow(rc < 0)) 
        return;
    mill_ncrpairs = rlim.rlim_max;
    // 最多可打开mill_ncrpairs个socket,每个socket应该有对应的一个struct_crpair
    mill_crpairs = (struct mill_crpair*)calloc(mill_ncrpairs, sizeof(struct mill_crpair));
    if(mill_slow(!mill_crpairs)) {
        errno = ENOMEM; 
        return;
    }
    // 创建epfd准备socket io事件进行监听
    mill_efd = epoll_create(1);
    if(mill_slow(mill_efd < 0)) {
        free(mill_crpairs);
        mill_crpairs = NULL;
        return;
    }
    errno = 0;
}

// 子进程中重新创建一个pollset,与父进程的pollset脱离了关系
void mill_poller_postfork(void) {
    if(mill_efd != -1) {
        int rc = close(mill_efd);
        mill_assert(rc == 0);
    }
    mill_efd = -1;
    mill_crpairs = NULL;
    mill_ncrpairs = 0;
    mill_changelist = MILL_ENDLIST;
    mill_poller_init();
}

// 注册当前协程对fd上events就绪的监听
static void mill_poller_add(int fd, int events) {
    // 每个fd都是从最小未被使用的整数开始分配的,因此fd取值一定在[0,rlim.rlim_max]范围内,
    // 因此,这里的mill_crpairs[fd]一定不会出现内存越界
    struct mill_crpair *crp = &mill_crpairs[fd];
    if(events & FDW_IN) {
        // 不允许多个协程wait同一个fd,不然io就乱了
        if(crp->in)
            mill_panic("multiple coroutines waiting for a single file descriptor");
        crp->in = mill_running;
    }
    if(events & FDW_OUT) {
        // 不允许多个协程wait同一个fd,不然io就乱了
        if(crp->out)
            mill_panic("multiple coroutines waiting for a single file descriptor");
        crp->out = mill_running;
    }
    if(!crp->next) {
        crp->next = mill_changelist;
        mill_changelist = fd + 1;           // 1-based索引值
    }
}

// 取消协程cr当前对fd上事件的监听
// 注意这里并没有从epfd中清除对fd的监听,其他协程还可能监听呢
static void mill_poller_rm(struct mill_cr *cr) {
    int fd = cr->fd;
    mill_assert(fd != -1);
    struct mill_crpair *crp = &mill_crpairs[fd];
    if(crp->in == cr) {
        crp->in = NULL;
        cr->fd = -1;
    }
    if(crp->out == cr) {
        crp->out = NULL;
        cr->fd = -1;
    }
    if(!crp->next) {
        crp->next = mill_changelist;
        mill_changelist = fd + 1;           // 1-based索引值
    }
}

// 从epfd中清除对fd的监听
// 注意必须所有coroutine都没有监听这个fd
static void mill_poller_clean(int fd) {
    struct mill_crpair *crp = &mill_crpairs[fd];
    // 断言必须所有coroutine都没有监听这个fd
    mill_assert(!crp->in);
    mill_assert(!crp->out);
    /* Remove the file descriptor from the pollset, if it is still present. */
    if(crp->currevs) {   
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = 0;
        int rc = epoll_ctl(mill_efd, EPOLL_CTL_DEL, fd, &ev);
        mill_assert(rc == 0 || errno == ENOENT);
    }
    /* Clean the cache. */
    crp->currevs = 0;
    if(!crp->next) {
        crp->next = mill_changelist;
        mill_changelist = fd + 1;
    }
}

// epoll轮询事件就绪状态
static int mill_poller_wait(int timeout) {
    /* Apply any changes to the pollset.
       TODO: Use epoll_ctl_batch once available. */
    while(mill_changelist != MILL_ENDLIST) {
        int fd = mill_changelist - 1;
        struct mill_crpair *crp = &mill_crpairs[fd];
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = 0;
        // 根据crp中是否有coroutine监听crp->fd上的io事件来更新crp->currevs,并更新epfd事件注册
        if(crp->in)
            ev.events |= EPOLLIN;
        if(crp->out)
            ev.events |= EPOLLOUT;
        if(crp->currevs != ev.events) {
            int op;
            if(!ev.events)
                 op = EPOLL_CTL_DEL;
            else if(!crp->currevs)
                 op = EPOLL_CTL_ADD;
            else
                 op = EPOLL_CTL_MOD;
            crp->currevs = ev.events;
            int rc = epoll_ctl(mill_efd, op, fd, &ev);
            mill_assert(rc == 0);
        }
        mill_changelist = crp->next;
        crp->next = 0;
    }
    // epoll_wait事件轮询,返回就绪事件
    struct epoll_event evs[MILL_EPOLLSETSIZE];
    int numevs;
    while(1) {
        numevs = epoll_wait(mill_efd, evs, MILL_EPOLLSETSIZE, timeout);
        if(numevs < 0 && errno == EINTR)
            continue;
        mill_assert(numevs >= 0);
        break;
    }
    // 遍历fd就绪的事件,并唤醒等待该fd事件就绪的coroutine
    int i;
    for(i = 0; i != numevs; ++i) {
        struct mill_crpair *crp = &mill_crpairs[evs[i].data.fd];
        int inevents = 0;
        int outevents = 0;
        /* Set the result values. */
        if(evs[i].events & EPOLLIN)
            inevents |= FDW_IN;
        if(evs[i].events & EPOLLOUT)
            outevents |= FDW_OUT;
        if(evs[i].events & (EPOLLERR | EPOLLHUP)) {
            inevents |= FDW_ERR;
            outevents |= FDW_ERR;
        }
        // 唤醒等待该fd上就绪事件的coroutine
        if(crp->in == crp->out) {
            struct mill_cr *cr = crp->in;
            mill_resume(cr, inevents | outevents);
            mill_poller_rm(cr);
            if(mill_timer_enabled(&cr->timer))
                mill_timer_rm(&cr->timer);
        }
        else {
            // 唤醒等待该fd读就绪事件的协程
            if(crp->in && inevents) {
                struct mill_cr *cr = crp->in;
                mill_resume(cr, inevents);
                mill_poller_rm(cr);
                if(mill_timer_enabled(&cr->timer))
                    mill_timer_rm(&cr->timer);
            }
            // 唤醒等待该fd写就绪事件的协程
            if(crp->out && outevents) {
                struct mill_cr *cr = crp->out;
                mill_resume(cr, outevents);
                mill_poller_rm(cr);
                if(mill_timer_enabled(&cr->timer))
                    mill_timer_rm(&cr->timer);
            }
        }
    }
    
    // 至少有一个协程被唤醒则返回1,反之返回0
    return numevs > 0 ? 1 : 0;
}

5.2 file

epoll并不支持所有的fd类型,一般将epoll应用于socket、pipe、tty以及其他有限的设备类型,epoll不支持regular file。尽管我们可以传递regular file的fd给select、poll但这只是因为select、poll在接口上允许而已,并没有什么效果(总是返回事件就绪),既然没有什么效果,epoll接口在设计的时候就决定根本不接受regular file fd,实际上epoll也只是为了改善select、poll并没打算额外支持regular file。这里只是将epoll应用在/dev/pts设备上,stdin、stdout、stderr都事这种设备类型,并不是针对regular file的。

难道os就无法支持regular file的io多路复用吗?怎么可能不能?只是epoll不支持,bsd下kqueue就支持!

这里的一篇文章对kqueue和epoll进行了对比:Scalable Event Multiplexing: epoll vs. kqueue

The last issue is that epoll does not even support all kinds of file descriptors; select()/poll()/epoll do not work with regular (disk) files. This is because epoll has a strong assumption of the readiness model; you monitor the readiness of sockets, so that subsequent IO calls on the sockets do not block. However, disk files do not fit this model, since simply they are always ready.

Disk I/O blocks when the data is not cached in memory, not because the client did not send a message. For disk files, the completion notification model fits. In this model, you simply issue I/O operations on the disk files, and get notified when they are done. kqueue supports this approach with the EVFILT_AIO filter type, in conjunction with POSIX AIO functions, such as aio_read(). In Linux, you should simply pray that disk access would not block with high cache hit rate (surprisingly common in many network servers), or have separate threads so that disk I/O blocking does not affect network socket processing (e.g., the FLASH architecture).

这里不再过分展开了,看下这里的代码吧。

#ifndef MILL_FILE_BUFLEN
#define MILL_FILE_BUFLEN (4096)
#endif

// 文件
struct mill_file {
    int fd;
    size_t ifirst;  //file输入缓冲区剩余数据起始位置
    size_t ilen;    //file输入缓冲区剩余数据长度
    size_t olen;    //file输出缓冲区剩余数据长度
    char ibuf[MILL_FILE_BUFLEN];    //file输入缓冲区
    char obuf[MILL_FILE_BUFLEN];    //file输出缓冲区
};

// 文件fd tune操作(设为非阻塞)
static void mill_filetune(int fd) {
    int opt = fcntl(fd, F_GETFL, 0);
    if (opt == -1)
        opt = 0;
    int rc = fcntl(fd, F_SETFL, opt | O_NONBLOCK);
    mill_assert(rc != -1);
}

// 打开文件(fd已调优成非阻塞方式)
struct mill_file *mill_mfopen_(const char *pathname, int flags, mode_t mode) {
    /* Open the file. */
    int fd = open(pathname, flags, mode);
    if (fd == -1)
        return NULL;
    mill_filetune(fd);

    /* Create the object. */
    struct mill_file *f = malloc(sizeof(struct mill_file));
    if(!f) {
        fdclean(fd);
        close(fd);
        errno = ENOMEM;
        return NULL;
    }
    f->fd = fd;
    f->ifirst = 0;
    f->ilen = 0;
    f->olen = 0;
    errno = 0;
    return f;
}

// 文件写操作(这里的写操作对缓冲区的操作逻辑与tcp、unix socket基本一致,不再赘述)
size_t mill_mfwrite_(struct mill_file *f, const void *buf, size_t len, int64_t deadline) {
    /* If it fits into the output buffer copy it there and be done. */
    if(f->olen + len <= MILL_FILE_BUFLEN) {
        memcpy(&f->obuf[f->olen], buf, len);
        f->olen += len;
        errno = 0;
        return len;
    }

    /* If it doesn't fit, flush the output buffer first. */
    mfflush(f, deadline);
    if(errno != 0)
        return 0;

    /* Try to fit it into the buffer once again. */
    if(f->olen + len <= MILL_FILE_BUFLEN) {
        memcpy(&f->obuf[f->olen], buf, len);
        f->olen += len;
        errno = 0;
        return len;
    }

    /* The data chunk to send is longer than the output buffer. Let's do the writing in-place. */
    char *pos = (char*)buf;
    size_t remaining = len;
    while(remaining) {
        ssize_t sz = write(f->fd, pos, remaining);
        if(sz == -1) {
            if(errno != EAGAIN && errno != EWOULDBLOCK)
                return 0;
            int rc = fdwait(f->fd, FDW_OUT, deadline);
            if(rc == 0) {
                errno = ETIMEDOUT;
                return len - remaining;
            }
            mill_assert(rc == FDW_OUT);
            continue;
        }
        pos += sz;
        remaining -= sz;
    }
    return len;
}

// 文件写缓冲flush操作(这里对缓冲区的操作与对tcp、unix socket的操作基本一致,不再赘述)
void mill_mfflush_(struct mill_file *f, int64_t deadline) {
    if(!f->olen) {
        errno = 0;
        return;
    }
    char *pos = f->obuf;
    size_t remaining = f->olen;
    while(remaining) {
        ssize_t sz = write(f->fd, pos, remaining);
        if(sz == -1) {
            if(errno != EAGAIN && errno != EWOULDBLOCK)
                return;
            int rc = fdwait(f->fd, FDW_OUT, deadline);
            if(rc == 0) {
                errno = ETIMEDOUT;
                return;
            }
            mill_assert(rc == FDW_OUT);
            continue;
        }
        pos += sz;
        remaining -= sz;
    }
    f->olen = 0;
    errno = 0;
}

// 文件读操作(这里的读操作对缓冲区的操作逻辑与tcp、unix socket基本一致,不再赘述)
size_t mill_mfread_(struct mill_file *f, void *buf, size_t len, int64_t deadline) {
    /* If there's enough data in the buffer it's easy. */
    if(f->ilen >= len) {
        memcpy(buf, &f->ibuf[f->ifirst], len);
        f->ifirst += len;
        f->ilen -= len;
        errno = 0;
        return len;
    }

    /* Let's move all the data from the buffer first. */
    char *pos = (char*)buf;
    size_t remaining = len;
    memcpy(pos, &f->ibuf[f->ifirst], f->ilen);
    pos += f->ilen;
    remaining -= f->ilen;
    f->ifirst = 0;
    f->ilen = 0;

    mill_assert(remaining);
    while(1) {
        if(remaining > MILL_FILE_BUFLEN) {
            /* If we still have a lot to read try to read it in one go directly
             into the destination buffer. */
            ssize_t sz = read(f->fd, pos, remaining);
            if(!sz) {
                return len - remaining;
            }
            if(sz == -1) {
                if(errno != EAGAIN && errno != EWOULDBLOCK)
                    return len - remaining;
                sz = 0;
            }
            if((size_t)sz == remaining) {
                errno = 0;
                return len;
            }
            pos += sz;
            remaining -= sz;
            if (sz != 0 && mfeof(f)) {
                return len - remaining;
            }
        }
        else {
            /* If we have just a little to read try to read the full connection
             buffer to minimise the number of system calls. */
            ssize_t sz = read(f->fd, f->ibuf, MILL_FILE_BUFLEN);
            if(!sz) {
                return len - remaining;
            }
            if(sz == -1) {
                if(errno != EAGAIN && errno != EWOULDBLOCK)
                    return len - remaining;
                sz = 0;
            }
            if((size_t)sz < remaining) {
                memcpy(pos, f->ibuf, sz);
                pos += sz;
                remaining -= sz;
                f->ifirst = 0;
                f->ilen = 0;
            }
            else {
                memcpy(pos, f->ibuf, remaining);
                f->ifirst = remaining;
                f->ilen = sz - remaining;
                errno = 0;
                return len;
            }
            if (sz != 0 && mfeof(f)) {
                return len - remaining;
            }
        }

        /* Wait till there's more data to read. */
        int res = fdwait(f->fd, FDW_IN, deadline);
        if (!res) {
            errno = ETIMEDOUT;
            return len - remaining;
        }
    }
}

// 文件关闭
void mill_mfclose_(struct mill_file *f) {
    fdclean(f->fd);
    int rc = close(f->fd);
    mill_assert(rc == 0);
    free(f);
    return;
}

// 文件读写位置查看
off_t mill_mftell_(struct mill_file *f) {
    return lseek(f->fd, 0, SEEK_CUR) - f->ilen;
}

// 文件读写位置定位
off_t mill_mfseek_(struct mill_file *f, off_t offset) {
    f->ifirst = 0;
    f->ilen = 0;
    f->olen = 0;
    return lseek(f->fd, offset, SEEK_SET);
}

// 判断是否读到文件末尾
int mill_mfeof_(struct mill_file *f) {
    // 首先获取当前位置
    off_t current = lseek(f->fd, 0, SEEK_CUR);
    if (current == -1)
        return -1;
    // 再获取文件末尾位置
    off_t eof = lseek(f->fd, 0, SEEK_END);
    if (eof == -1)
        return -1;
    // 恢复读写位置为之前的读写位置
    off_t res = lseek(f->fd, current, SEEK_SET);
    if (res == -1)
        return -1;
    // 比较是否到达文件末尾位置
    return (current == eof);
}

// stdin
struct mill_file *mill_mfin_(void) {
    static struct mill_file f = {-1, 0, 0, 0};
    if(mill_slow(f.fd < 0)) {
        mill_filetune(STDIN_FILENO);
        f.fd = STDIN_FILENO;
    }
    return &f;
}

// stdout
struct mill_file *mill_mfout_(void) {
    static struct mill_file f = {-1, 0, 0, 0};
    if(mill_slow(f.fd < 0)) {
        mill_filetune(STDOUT_FILENO);
        f.fd = STDOUT_FILENO;
    }
    return &f;
}

// stderr
struct mill_file *mill_mferr_(void) {
    static struct mill_file f = {-1, 0, 0, 0};
    if(mill_slow(f.fd < 0)) {
        mill_filetune(STDERR_FILENO);
        f.fd = STDERR_FILENO;
    }
    return &f;
}

6 Data Structure

6.1 slist

mill_slist,它实现的是一个单链表,它也实现了poppushpush_back操作,这意味着也可以把它当做stack、queue来使用。

如何当做stack来使用?push、pop! 如何当做queue来使用?push_back、pop!

对于当做单链表来使用,其使用方式与list类似,前面也分析了mill_list的相关实现,这里mill_slist的实现思路与也之类似。

6.1.1 slist.h

struct mill_slist_item {
    struct mill_slist_item *next;
};

struct mill_slist {
    struct mill_slist_item *first;
    struct mill_slist_item *last;
};

/* Initialise the list. To statically initialise the list use = {0}. */
void mill_slist_init(struct mill_slist *self);

/* True is the list has no items. */
#define mill_slist_empty(self) (!((self)->first))

/* Returns iterator to the first item in the list or NULL if the list is empty. */
#define mill_slist_begin(self) ((self)->first)

/* Returns iterator to one past the item pointed to by 'it'. If there are no more items returns NULL. */
#define mill_slist_next(it) ((it)->next)

/* Push the item to the beginning of the list. */
void mill_slist_push(struct mill_slist *self, struct mill_slist_item *item);

/* Push the item to the end of the list. */
void mill_slist_push_back(struct mill_slist *self, struct mill_slist_item *item);

/* Pop an item from the beginning of the list. */
struct mill_slist_item *mill_slist_pop(struct mill_slist *self);

6.1.2 slist.c

// slist初始化
void mill_slist_init(struct mill_slist *self) {
    self->first = NULL;
    self->last = NULL;
}

// slist push操作(插到链表头部)
void mill_slist_push(struct mill_slist *self, struct mill_slist_item *item) {
    item->next = self->first;
    self->first = item;
    if(!self->last)
        self->last = item;
}

// slist push_back(插到链表尾部)
void mill_slist_push_back(struct mill_slist *self,
      struct mill_slist_item *item) {
    item->next = NULL;
    if(!self->last)
        self->first = item;
    else
        self->last->next = item;
    self->last = item;
}

// slist pop(从链表头部pop)
struct mill_slist_item *mill_slist_pop(struct mill_slist *self) {
    if(!self->first)
        return NULL;
    struct mill_slist_item *it = self->first;
    self->first = self->first->next;
    if(!self->first)
        self->last = NULL;
    return it;
}

6.2 list

6.2.1 list.h

mill_list是一个双向链表,链表内部的链接通过mill_list_item来维护,mill_list_item与mill_cont相当于实现了list中的iterator。

struct mill_list_item {
    struct mill_list_item *next;
    struct mill_list_item *prev;
};

struct mill_list {
    struct mill_list_item *first;
    struct mill_list_item *last;
};

/* Initialise the list. To statically initialise the list use = {0}. */
void mill_list_init(struct mill_list *self);

/* True is the list has no items. */
#define mill_list_empty(self) (!((self)->first))

/* Returns iterator to the first item in the list or NULL if
   the list is empty. */
#define mill_list_begin(self) ((self)->first)

/* Returns iterator to one past the item pointed to by 'it'. */
#define mill_list_next(it) ((it)->next)

/* Adds the item to the list before the item pointed to by 'it'.
   If 'it' is NULL the item is inserted to the end of the list. */
void mill_list_insert(struct mill_list *self, struct mill_list_item *item, struct mill_list_item *it);

/* Removes the item from the list and returns pointer to the next item in the
   list. Item must be part of the list. */
struct mill_list_item *mill_list_erase(struct mill_list *self, struct mill_list_item *item);

当我们创建一个list时,我们创建一个struct mill_list变量,当我们要插入元素到list中时,实际上插入的是一个mill_list_item,当我们要遍历一个list时实际上是通过mill_list.first/last以及mill_list_item.next/prev来进行遍历。

不禁要问,我们要保存的链表元素肯定不只是mill_list_item啊?struct元素是不是还要包括其他成员?是。这里就要提到mill_cont这个函数了,该方法可以获取包含某个member的struct结构体的地址,如果在自定义struct中额外增加一个成员mill_list_item,保存链接关系的时候使用mill_list_item成员,访问自定义struct完整信息的时候再通过该成员以及mill_cont来获取自定义结构体地址,进而解引用访问,这样问题就解决了?

我们要存储的结构体以struct Student为例,来演示一下上述操作:

// 自定义struct
struct Student {
    char *name;
    int age;
    int sex;
    struct list_mill_item item;
};

// 创建链表
struct mill_list students = {0};

// 添加元素到链表
struct Student stu_x = {.name="x", .age=10, .sex=0};
struct Student stu_y = {.name="y", .age=11, .sex=1};

mill_list_init(&students, &stu_x->item, NULL);
mill_list_init(&students, &stu_y->item, NULL);

// 遍历链表元素
struct mill_list_item *iter = students.first;
while(iter) {
    struct Student *stu = (struct Student *)mill_cont(iter, struct Student, item);
    printf("student name:%s, age:%d, sex:%d\n", stu->name, stu->age, stu->sex);
    
    iter = iter->next;
}

6.1.2 list.c

//初始化一个空链表
void mill_list_init(struct mill_list *self)
{
    self->first = NULL;
    self->last = NULL;
}

// 在链表self中在元素it前面插入item
void mill_list_insert(struct mill_list *self, struct mill_list_item *item, struct mill_list_item *it)
{
    item->prev = it ? it->prev : self->last;
    item->next = it;
    if(item->prev)
        item->prev->next = item;
    if(item->next)
        item->next->prev = item;
    if(!self->first || self->first == it)
        self->first = item;
    if(!it)
        self->last = item;
}

// 从链表self中删除元素item
struct mill_list_item *mill_list_erase(struct mill_list *self, struct mill_list_item *item)
{
    struct mill_list_item *next;

    if(item->prev)
        item->prev->next = item->next;
    else
        self->first = item->next;
    if(item->next)
        item->next->prev = item->prev;
    else
        self->last = item->prev;

    next = item->next;

    item->prev = NULL;
    item->next = NULL;

    return next;
}

7 Common Utils

7.1 utils

// ptr是指向结构体type中成员member的指针,计算包含该member的结构体的地址
// - 在list等实现中,mill_cont用于获取“迭代器”对应的元素结构体地址
#define mill_cont(ptr, type, member) \
    (ptr ? ((type*) (((char*) ptr) - offsetof(type, member))) : NULL)
// 编译时断言
#define MILL_CT_ASSERT_HELPER2(prefix, line)    prefix##line
#define MILL_CT_ASSERT_HELPER1(prefix, line)    MILL_CT_ASSERT_HELPER2(prefix, line)
#define MILL_CT_ASSERT(x) \
    typedef int MILL_CT_ASSERT_HELPER1(ct_assert_,__COUNTER__) [(x) ? 1 : -1]
// 分支判断,便于编译器分支预测
#if defined __GNUC__ || defined __llvm__
#define mill_fast(x) __builtin_expect(!!(x), 1)
#define mill_slow(x) __builtin_expect(!!(x), 0)
#else
#define mill_fast(x) (x)
#define mill_slow(x) (x)
#endif
// 自定义断言
#define mill_assert(x) \
    do {\
        if (mill_slow(!(x))) {\
            fprintf(stderr, "Assert failed: " #x " (%s:%d)\n",\
                __FILE__, __LINE__);\
            fflush(stderr);\
            abort();\
        }\
    } while (0)
#endif

7.2 timer

7.2.1 timer.h

struct mill_timer {
    // mill_list_item结合mill_cont来实现了list iterator
    /* Item in the global list of all timers. */
    struct mill_list_item item;
    /* The deadline when the timer expires. -1 if the timer is not active. */
    int64_t expiry;
    /* Callback invoked when timer expires. Pfui Teufel! */
    mill_timer_callback callback;
};
/* Test wheather the timer is active. */
#define mill_timer_enabled(tm)  ((tm)->expiry >= 0)

/* Add a timer for the running coroutine. */
void mill_timer_add(struct mill_timer *timer, int64_t deadline, mill_timer_callback callback);

/* Remove the timer associated with the running coroutine. */
void mill_timer_rm(struct mill_timer *timer);

/* Number of milliseconds till the next timer expires. If there are no timers returns -1. */
int mill_timer_next(void);

/* Resumes all coroutines whose timers have already expired. Returns zero if no coroutine was resumed, 1 otherwise. */
int mill_timer_fire(void);

/* Called after fork in the child process to deactivate all the timers inherited from the parent. */
void mill_timer_postfork(void);

7.2.2 timer.c

// 定时器精度控制,rdtsc先后读取ticks差值超过这个值则gettimeofday更新last_now
#define MILL_CLOCK_PRECISION 1000000
// 返回gettimeofday获取的系统时间,单位seconds
static int64_t mill_os_time(void) {
#if defined __APPLE__
    ...
#else
    struct timeval tv;
    int rc = gettimeofday(&tv, NULL);
    assert(rc == 0);
    return ((int64_t)tv.tv_sec) * 1000 + (((int64_t)tv.tv_usec) / 1000);
#endif
}
// 获取当前系统时间(注意这里的时间是有cache的)
int64_t mill_now_(void) {
#if (defined __GNUC__ || defined __clang__) && (defined __i386__ || defined __x86_64__)
    // rdtsc获取系统启动后经历的cpu时钟周期数量
    uint32_t low;
    uint32_t high;
    __asm__ volatile("rdtsc" : "=a" (low), "=d" (high));
    
    int64_t tsc = (int64_t)((uint64_t)high << 32 | low);

    static int64_t last_tsc = -1;
    static int64_t last_now = -1;
    if(mill_slow(last_tsc < 0)) {
        last_tsc = tsc;
        last_now = mill_os_time();
    }
    // 如果在精度范围内返回上次获取的系统时间,超出精度范围则更新系统时间
    if(mill_fast(tsc - last_tsc <= (MILL_CLOCK_PRECISION / 2) && tsc >= last_tsc))
        return last_now;
    
    last_tsc = tsc;
    last_now = mill_os_time();
    return last_now;
#else
    return mill_os_time();
#endif
}
// 定时器列表,列表中定时器是有序的,时间靠前的排列在前面
static struct mill_list mill_timers = {0};

// 往定时器列表中添加定时器,保证链表中定时器有序(按过期时间升序排列)
void mill_timer_add(struct mill_timer *timer, int64_t deadline, mill_timer_callback callback) {
    mill_assert(deadline >= 0);
    timer->expiry = deadline;
    timer->callback = callback;
    /* Move the timer into the right place in the ordered list
       of existing timers. TODO: This is an O(n) operation! */
    struct mill_list_item *it = mill_list_begin(&mill_timers);
    while(it) {
        struct mill_timer *tm = mill_cont(it, struct mill_timer, item);
        /* If multiple timers expire at the same momemt they will be fired
           in the order they were created in (> rather than >=). */
        if(tm->expiry > timer->expiry)
            break;
        it = mill_list_next(it);
    }
    mill_list_insert(&mill_timers, &timer->item, it);
}

// 从定时器列表中移除定时器
void mill_timer_rm(struct mill_timer *timer) {
    mill_assert(timer->expiry >= 0);
    mill_list_erase(&mill_timers, &timer->item);
    timer->expiry = -1;
}

// 返回定时器列表中首个定时器的剩余超时时间
int mill_timer_next(void) {
    if(mill_list_empty(&mill_timers))
        return -1;
    int64_t nw = now();
    int64_t expiry = mill_cont(mill_list_begin(&mill_timers), struct mill_timer, item) -> expiry;
    return (int) (nw >= expiry ? 0 : expiry - nw);
}

// 执行所有超时的定时器绑定的回调函数,返回是否有调用定时器的回调方法
int mill_timer_fire(void) {
    /* Avoid getting current time if there are no timers anyway. */
    if(mill_list_empty(&mill_timers))
        return 0;
    int64_t nw = now();
    int fired = 0;
    while(!mill_list_empty(&mill_timers)) {
        struct mill_timer *tm = mill_cont(
            mill_list_begin(&mill_timers), struct mill_timer, item);
        if(tm->expiry > nw)
            break;
        mill_list_erase(&mill_timers, mill_list_begin(&mill_timers));
        tm->expiry = -1;
        if(tm->callback)
            tm->callback(tm);
        fired = 1;
    }
    return fired;
}

// 初始化定时器列表(postfork?fixme!!!)
void mill_timer_postfork(void) {
    mill_list_init(&mill_timers);
}

8 Debug

想更好地debug libmill?

osx下面通过brew install libmill安装的是共享库libmill.so,很多调试信息都已经被优化掉了,调试起来不是很方便,为了更好地进行调试,可以自己从源码构建安装libmill。

git clone https://github.com/sustrik/libmill
cd libmill

./autogen.sh
./configure --disable-shared --enable-debug --enable-valgrind

make -j8
sudo make install