redis 源码分析之事件调度器 epoll

这个可以视为 redis源码分析之网络通信和事件调度篇  的下一篇,主要涉及 ae.c + ae_epoll.c 两个文件不到一千行代码,从redis.c的main调用aeMain开始。
下面是aeMain 函数的全部代码。

void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        // 如果有需要在事件处理前执行的函数,那么运行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);                                          

        // 开始处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

eventLoop 是redis的事件调度器,可以看到这里就是一个死循环,只要 eventLoop->stop 不置位,就会不停循环,不断地处理事务,实际上,除了在benchmark之外,没有任何地方将stop置为1,也就是永远都不会触发stop = 1 的情况。aeProcessEvents 具体处理epoll的响应事件。 在全部代码里面grep了一下,尚未发现 eventLoop->beforesleep 可以被用到的时候,应该一直都是NULL的。

下面是 aeProcessEvents的定义,flags表示要处理的事件类型,根据调用的时候的参数,可以看到是要处理全部的事件类型,也就是AE_ALL_EVENTS,

int aeProcessEvents(aeEventLoop *eventLoop, int flags) 

这是AE_ALL_EVENTS的定义,可见对于redis来说,全部事件包括时间和文本的两种事件。

#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)

unix哲学里面,一切都是文件。

我们步入到aeProcessEvents中,看看在redis的主循环中,到底都做了那些事情。

aeProcessEvents 前两个 if/else ,42行代码,主要是为了得到下一次时间事件到来还有多久,设置tvp变量,由此决定文本事件的阻塞时间,正常来说,serverCron每1ms执行一次。作者的意图,就是在不耽误后台进程处理的情况下,阻塞尽可能长的事件,获取尽可能多的响应,增大处理的效率。

之后来到这一行,将得到的tvp变量和事件调度器传递给epoll处理函数,获取tvp事件内响应的全部套字节列表。epoll观察着全部的I/O变化。

numevents = aeApiPoll(eventLoop, tvp);

这个就是整个事件处理器的最最核心的一句,epoll_wait ,获取tvp的时间内,注册在state->epfd上的全部响应套字节,保存在state->events中,并返回数量,保存在变量retval中。

retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

很经典的设计模式,观察者模式。“一个对象发生改变时将自动通知其他对象,其他对象将相应作出反应”

state->events中是socket描述字的列表,后面需要后续处理,调用当初注册在对应对象列表上的函数,“做出相应反应”。


 for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读/写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

            processed++;
 }

响应之后对应的处理函数,都注册在对应的函数指针里面aeFileEvent的rfileProc和wfileProc指针里面,这个要比nodejs的回调,感觉方便得多,因为模型简单易懂高效,也不会因为回调增加无谓的复杂度。

读响应的后续处理函数,函数指针 rfileProc 主要是 acceptTcpHandler和 acceptUnixHandler ,如果想看具体如何处理socket的响应,可以追踪进去。

而写的后续响应函数 ,保存在函数指针wfileProc中的,sendReplyToClient ,将回复用户的查询内容的,当时并不真正写,而是首先注册到epoll里面,集中处理。

当然,要回复客户端内容,并不是真的一定需要进入事件循环的,可以 使用 flushSlavesOutputBuffers,立刻做出响应。

if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);

因为已经阻塞了tvp的时间,所以processTimeEvents中肯定是有内容需要处理的。也就是开始执行serverCron,执行redis的后台进程。

关于serverCron的解析,欢迎查看 http://unasm.com/2015/07/394/

Leave a comment

Your email address will not be published.

*