redis源码分析之网络通信和事件调度篇

一切都要从main开始说起。

main里面调用了两个函数,initServer和aeMain,在initServer里面,redis创建并初始化了各种socket文件,该listen的listen,该add的add,而aeMain则根据server.el,事件调度器,进入while(1)循环,处理事务。

initServer中,抛开对信号的处理,变量的初始化,单纯说网络通信和事件调度,要从initServer的第31行说起

server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);

首先创建事件调度器,redis的事件,包括文本事件和时间事件,文本事件其实是各种socket套字节的响应监听,包括本地的socket通信,主从复制的通信,客户端的连接通信等等,时间事件的添加是在两个地方,一个是redis.c里面,每1ms执行serverCron函数,另一个是redis-benchmark.c里面,每1ms执行showThroughput函数。

下面就是时间调度器server.el的定义,时间事件是以单链表的形式,无序的保存在timeEventHead中的,events中保存的,则是全部的文本事件。fired是在全部响应的文本事件列表,一个全部的,一个是已经响应的,需要accept的。

typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    // 目前已追踪的最大描述符
    int setsize; /* max number of file descriptors tracked */
    // 用于生成时间事件 id
    long long timeEventNextId;
    // 最后一次执行时间事件的时间
    time_t lastTime;     /* Used to detect system clock skew */
    // 已注册的文件事件,unix哲学: 一切都是文件
    // acceptUnixHandler 处理文件类型的时间
    aeFileEvent *events; /* Registered events */
    // 已就绪的文件事件
    aeFiredEvent *fired; /* Fired events */
    // 时间事件,链表
    aeTimeEvent *timeEventHead;
    // 事件处理器的开关
    int stop;
    // 多路复用库的私有数据
        // aeApiState 的 实例化数据
    void *apidata; /* This is used for polling API specific data */

    // 在处理事件前要执行的函数
    aeBeforeSleepProc *beforesleep;

} aeEventLoop;

创建了全局唯一的server.el之后,开始创建网络接口,server.port 默认是6379,哨兵模式下是26379,如果启动时候有指定,就是启动时指定的数。server.ipfd是用来保存生成的套字节的,ipfd_count 是对应的数量,具体绑定的本地地址以及地址类型(ipv4,ipv6)是保存在server.bindaddr中,对应的数量是server.bindaddr_count,这些本地地址是启动的时候需要在参数中指定的,如果没有指定,这里就是没有,因为不同的套字节监听相同的端口,server.port,需要端口复用,所以地址必须不能相同,全部的都是端口都是非阻塞的哦,响应在aeMain中呢。

 if (server.port != 0 &&
          listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
          exit(1);

listenToPort 函数直接就实现了从socket 到listen的全部流程,剩下的就是坐等响应,然后accept了。有兴趣可以自己追踪到listenToPort中看看如何创建一个socket。

接着是创建一个本地的socket文件,redis进程和其他的进程通信的时候需要,当redis和其客户端在同一个服务器上的时候,是通过本地socket通信实现的。server.unixsocket是socket文件的路径,server.unixsocketperm是socket文件的权限,同样设置成非阻塞模式。

  if (server.unixsocket != NULL) {
          unlink(server.unixsocket); /* don't care if this fails */
          server.sofd = anetUnixServer(server.neterr,server.unixsocket,
              server.unixsocketperm, server.tcp_backlog);
         if (server.sofd == ANET_ERR) {
              redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
              exit(1);
         }
         anetNonBlock(NULL,server.sofd);
  }

跳过一个长长的对db的初始化过程,来到这句代码。之前有提到,redis的时间事件几乎可以认为只是用来调用serverCron的,那这里就是创建一个事件事件,并加到timeEventHead 链表里面的。
我能感觉到,作者当初这么设计,是认为自己会有很多类型的时间事件需要处理,所以用来一种通用的方式来,后来发现,只是需要serverCron定时处理罢了。其实实现定时,是有很多简单设计的。

 if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
         redisPanic("Can't create the serverCron time event.");
         exit(1);
  }

接着是整个事件调度的最最核心的地方,也是反应器模式最灵魂的地方,将之前所有的套字节绑定到epoll/poll的套字节上面,并传入对应的响应处理函数。acceptTcpHandler ,就是server.ipfd这些套字节响应的时候,需要调用的处理函数,AE_READABLE表示该套字节是用来读的,将来触发的,是读操作的注册函数。有兴趣建议看一下aeCreateFileEvent,我看了epoll的部分,

    for (j = 0; j < server.ipfd_count; j++) {
                //为ipfd 添加事件处理,放在epoll的监听列表里面
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                redisPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }

下面是为本地的socket套字节注册到epoll/poll上的写法,功能类似,回调函数不同,这个事acceptUnixHandler。

 if (server.sofd > 0 &&
                        aeCreateFileEvent(
                                server.el,
                                server.sofd,
                                AE_READABLE,
                                acceptUnixHandler,
                                NULL
                        ) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");

至此将所有的套字节初始化,绑定完毕,之后,如何调度处理呢?请看 redis的事件调度器 epoll
恩,该文章还有待再次润色

Leave a comment

Your email address will not be published.

*