redis网络通信模块源码分析

栏目: C · 发布时间: 6年前

内容简介:时下的业界,相对于传统的关系型数据库,以key-value思想实现的nosql内存数据库也是非常流行,而提到内存数据库,很多人第一反应就是redis的最新源码下载地址可以在redis官网(redis.io/)获得。笔者使用的是CentOS 7.0系统,因此使用解压:

时下的业界,相对于传统的关系型数据库,以key-value思想实现的nosql内存数据库也是非常流行,而提到内存数据库,很多人第一反应就是 redis 。确实,redis以其高效的性能和优雅的实现成为众多内存数据库中的翘楚。前面章节介绍了单个服务器的基本结构,这个章节我们再来一个实战演习,即以 redis 为例来讲解实际项目的中服务器结构是怎样的。当然,本文介绍的角度与前面的章节思路不一样,前面的章节是先给结论,然后再加以论证,而本文则是假设预先不清楚 redis 网络通信层的结构,结合gdb调试,以探究的方式逐步搞清楚 redis 的网络通信模块结构。

redis源码下载与编译

redis的最新源码下载地址可以在redis官网(redis.io/)获得。笔者使用的是CentOS 7.0系统,因此使用 wget 命令将 redis 源码文件下载下来:

[root@localhost gdbtest]# wget http://download.redis.io/releases/redis-4.0.11.tar.gz
--2018-09-08 13:08:41--  http://download.redis.io/releases/redis-4.0.11.tar.gz
Resolving download.redis.io (download.redis.io)... 109.74.203.151
Connecting to download.redis.io (download.redis.io)|109.74.203.151|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1739656 (1.7M) [application/x-gzip]
Saving to: ‘redis-4.0.11.tar.gz’

54% [==================================================================>                                                         ] 940,876     65.6KB/s  eta 9s
复制代码

解压:

[root@localhost gdbtest]# tar zxvf redis-4.0.11.tar.gz 
复制代码

进入生成的 redis-4.0.11 目录使用 makefile 进行编译:

[root@localhost gdbtest]# cd redis-4.0.11
[root@localhost redis-4.0.11]# make -j 4
复制代码

编译成功后,会在 src 目录下生成多个可执行程序,其中 redis-serverredis-cli 是我们需要即将调试的程序。

我们可以进入 src 目录,使用gdb启动 redis-server 这个程序:

[root@localhost src]# gdb redis-server 
Reading symbols from /root/redis-4.0.9/src/redis-server...done.
(gdb) r
Starting program: /root/redis-4.0.9/src/redis-server 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
31212:C 17 Sep 11:59:50.781 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
31212:C 17 Sep 11:59:50.781 # Redis version=4.0.9, bits=64, commit=00000000, modified=0, pid=31212, just started
31212:C 17 Sep 11:59:50.781 # Warning: no config file specified, using the default config. In order to specify a config file use /root/redis-4.0.9/src/redis-server /path/to/redis.conf
31212:M 17 Sep 11:59:50.781 * Increased maximum number of open files to 10032 (it was originally set to 1024).
[New Thread 0x7ffff07ff700 (LWP 31216)]
[New Thread 0x7fffefffe700 (LWP 31217)]
[New Thread 0x7fffef7fd700 (LWP 31218)]
                _._                                                  
           _.-``__ ''-._                                             
      _.-``    `.  `_.  ''-._           Redis 4.0.9 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._                                   
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 31212
  `-._    `-._  `-./  _.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |           http://redis.io        
  `-._    `-._`-.__.-'_.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |                                  
  `-._    `-._`-.__.-'_.-'    _.-'                                   
      `-._    `-.__.-'    _.-'                                       
          `-._        _.-'                                           
              `-.__.-'                                               

31212:M 17 Sep 11:59:50.793 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
31212:M 17 Sep 11:59:50.793 # Server initialized
31212:M 17 Sep 11:59:50.793 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
31212:M 17 Sep 11:59:50.794 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
31212:M 17 Sep 11:59:50.794 * DB loaded from disk: 0.000 seconds
31212:M 17 Sep 11:59:50.794 * Ready to accept connections
复制代码

以上是 redis-server 的启动成功后的画面。

我们再开一个session,再次进入redis源码所在的 src 目录,然后使用gdb启动 redis 客户端 redis-cli

[root@localhost src]# gdb redis-cli
Reading symbols from /root/redis-4.0.9/src/redis-cli...done.
(gdb) r
Starting program: /root/redis-4.0.9/src/redis-cli 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
127.0.0.1:6379> 
复制代码

以上是 redis-cli 启动成功后的画面。

通信示例

我们本章节的目的是为了学习和研究 redis 的网络通信模块,我们并不关心 redis 其他一些内容,所以为了说明问题方便,我们使用的一个简单的通信实例,即通过 redis-cli 产生一个key为**"hello" ,值为 "world" 的key-value数据,然后得到 redis-server**的响应。我们通过这样一个实例来研究redis的网络通信模块。

127.0.0.1:6379> set hello world
OK
127.0.0.1:6379> 
复制代码

探究redis-server端的网络通信模块

我们先研究 redis-server 端的通信模块。

侦听socket初始化工作

通过前面章节的介绍,我们知道网络通信的本质在应用层上的大致流程如下:

  1. 服务器端创建侦听socket
  2. 将侦听socket绑定到需要的ip地址和端口上(调用Socket API bind 函数)
  3. 启动侦听(调用socket API listen 函数)
  4. 无限等待客户端连接到来,调用Socket API accept 函数接受客户端连接,并产生一个与该客户端对应的客户端socket。
  5. 处理在客户端socket上的网络数据的收发,必要时关闭该socket。

根据上面的流程,我们先来探究流程中的 123 这三步。由于 redis-server 默认对客户端的端口号是 6379 ,我们可以使用这个信息作为依据。

全局搜索一下 redis 的代码,我们寻找调用了 bind 函数的代码,经过过滤和筛选,我们确定了位于 anet.canetListen 函数。

static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
    if (bind(s,sa,len) == -1) {
        anetSetError(err, "bind: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }

    if (listen(s, backlog) == -1) {
        anetSetError(err, "listen: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    return ANET_OK;
}
复制代码

用gdb的 b 命令在这个函数上加个断点,然后重新运行 redis-server

(gdb) b anetListen
Breakpoint 1 at 0x426cd0: file anet.c, line 440.
(gdb) r
The program being debugged has been started already.
Start it from the beginning? (y or n) y
Starting program: /root/redis-4.0.9/src/redis-server 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
31546:C 17 Sep 14:20:43.861 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
31546:C 17 Sep 14:20:43.861 # Redis version=4.0.9, bits=64, commit=00000000, modified=0, pid=31546, just started
31546:C 17 Sep 14:20:43.861 # Warning: no config file specified, using the default config. In order to specify a config file use /root/redis-4.0.9/src/redis-server /path/to/redis.conf
31546:M 17 Sep 14:20:43.862 * Increased maximum number of open files to 10032 (it was originally set to 1024).

Breakpoint 1, anetListen (err=0x745bb0 <server+560> "", s=10, sa=0x75dfe0, len=28, backlog=511) at anet.c:440
440     static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
复制代码

当gdb中断在这个函数时,我们使用 bt 命令查看一下此时的调用堆栈:

(gdb) bt
#0  anetListen (err=0x745bb0 <server+560> "", s=10, sa=0x75dfe0, len=28, backlog=511) at anet.c:440
#1  0x0000000000426e25 in _anetTcpServer (err=err@entry=0x745bb0 <server+560> "", port=port@entry=6379, bindaddr=bindaddr@entry=0x0, af=af@entry=10, backlog=511)
    at anet.c:487
#2  0x000000000042792d in anetTcp6Server (err=err@entry=0x745bb0 <server+560> "", port=port@entry=6379, bindaddr=bindaddr@entry=0x0, backlog=<optimized out>)
    at anet.c:510
#3  0x000000000042b01f in listenToPort (port=6379, fds=fds@entry=0x745ae4 <server+356>, count=count@entry=0x745b24 <server+420>) at server.c:1728
#4  0x000000000042f917 in initServer () at server.c:1852
#5  0x0000000000423803 in main (argc=<optimized out>, argv=0x7fffffffe588) at server.c:3857
复制代码

通过这个堆栈,结合堆栈**#2 6379 端口号我们确认这是我们要找的逻辑,并且这个逻辑在 主线程**(因为从堆栈上看,最顶层堆栈是 main 函数)中进行的。

我们看下堆栈**#1**处的代码:

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
    int s = -1, rv;
    char _port[6];  /* strlen("65535") */
    struct addrinfo hints, *servinfo, *p;

    snprintf(_port,6,"%d",port);
    memset(&hints,0,sizeof(hints));
    hints.ai_family = af;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE;    /* No effect if bindaddr != NULL */

    if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
        anetSetError(err, "%s", gai_strerror(rv));
        return ANET_ERR;
    }
    for (p = servinfo; p != NULL; p = p->ai_next) {
        if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
            continue;

        if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
        if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
        if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;
        goto end;
    }
    if (p == NULL) {
        anetSetError(err, "unable to bind socket, errno: %d", errno);
        goto error;
    }

error:
    if (s != -1) close(s);
    s = ANET_ERR;
end:
    freeaddrinfo(servinfo);
    return s;
}
复制代码

将堆栈切换至**#1**,然后输入 info arg 可以查看传入给这个函数的参数:

(gdb) f 1
#1  0x0000000000426e25 in _anetTcpServer (err=err@entry=0x745bb0 <server+560> "", port=port@entry=6379, bindaddr=bindaddr@entry=0x0, af=af@entry=10, backlog=511)
    at anet.c:487
487             if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;
(gdb) info args
err = 0x745bb0 <server+560> ""
port = 6379
bindaddr = 0x0
af = 10
backlog = 511
复制代码

这里使用系统API getaddrinfo 来解析得到当前主机的ip地址和端口信息。这里没有选择使用 gethostbyname 这个API是因为 gethostbyname 仅能用于解析ipv4相关的主机信息,而 getaddrinfo 既可以用于ipv4也可以用于ipv6,这个函数的签名如下:

int getaddrinfo(const char *node, const char *service,
                       const struct addrinfo *hints,
                       struct addrinfo **res);
复制代码

您可以在linux man手册上查看这个函数的具体用法。通常服务器端在调用 getaddrinfo 之前,将 hints 参数的 ai_flags 设置 AI_PASSIVE ,用于 bind ;主机名 nodename 通常会设置为NULL,返回通配地址**[::] 。 当然,客户端调用 getaddrinfo 时, hints 参数的 ai_flags 一般不设置 AI_PASSIVE**,但是主机名 node 和服务名 service (更愿意称之为端口)则应该不为空。

解析完毕协议信息后,利用得到的协议信息创建侦听socket,并开启该socket的 reuseAddr 选项。然后调用 anetListen 函数,在该函数中先 bindlisten 。至此, redis-server 就可以在 6379 端口上接受客户端连接了。

接受客户端连接

同样的道理,我们要研究 redis-server 如何接受客户端连接,我们只要搜索socket API accept 函数即可。

经定位,我们最终在 anet.c 文件中找到 anetGenericAccept 函数:

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
    int fd;
    while(1) {
        fd = accept(s,sa,len);
        if (fd == -1) {
            if (errno == EINTR)
                continue;
            else {
                anetSetError(err, "accept: %s", strerror(errno));
                return ANET_ERR;
            }
        }
        break;
    }
    return fd;
}
复制代码

我们用 b 命令在这个函数处加个断点,然后重新运行 redis-server 。一直到程序全部运行起来,gdb都没有触发该断点,我们新打开一个 redis-cli ,以模拟新客户端连接到 redis-server 上的行为。断点触发了,我们查看下此时的调用堆栈。

Breakpoint 2, anetGenericAccept (err=0x745bb0 <server+560> "", s=s@entry=11, sa=sa@entry=0x7fffffffe2b0, len=len@entry=0x7fffffffe2ac) at anet.c:531
531     static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
(gdb) bt
#0  anetGenericAccept (err=0x745bb0 <server+560> "", s=s@entry=11, sa=sa@entry=0x7fffffffe2b0, len=len@entry=0x7fffffffe2ac) at anet.c:531
#1  0x0000000000427a1d in anetTcpAccept (err=<optimized out>, s=s@entry=11, ip=ip@entry=0x7fffffffe370 "\317P\237[", ip_len=ip_len@entry=46, 
    port=port@entry=0x7fffffffe36c) at anet.c:552
#2  0x0000000000437fb1 in acceptTcpHandler (el=<optimized out>, fd=11, privdata=<optimized out>, mask=<optimized out>) at networking.c:689
#3  0x00000000004267f0 in aeProcessEvents (eventLoop=eventLoop@entry=0x7ffff083a0a0, flags=flags@entry=11) at ae.c:440
#4  0x0000000000426adb in aeMain (eventLoop=0x7ffff083a0a0) at ae.c:498
#5  0x00000000004238ef in main (argc=<optimized out>, argv=0x7fffffffe588) at server.c:3894
复制代码

分析这个调用堆栈,我们梳理一下这个调用流程。在 main 函数的 initServer 函数中创建侦听socket、绑定地址然后开启侦听,接着调用 aeMain 函数启动一个循环不断地处理“ 事件 ”。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
复制代码

循环的退出条件是 eventLoop->stop 为1。 事件处理 的代码如下:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

	    /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}
复制代码

该段代码先通过 flag 参数检查是否有件事需要处理。如果有定时器事件( AE_TIME_EVENTS 标志),则寻找最近要到期的定时器。

/* Search the first timer to fire.
 * This operation is useful to know how many time the select can be
 * put in sleep without to delay any event.
 * If there are no timers NULL is returned.
 *
 * Note that's O(N) since time events are unsorted.
 * Possible optimizations (not needed by Redis so far, but...):
 * 1) Insert the event in order, so that the nearest is just the head.
 *    Much better but still insertion or deletion of timers is O(N).
 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
 */
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;

    while(te) {
        if (!nearest || te->when_sec < nearest->when_sec ||
                (te->when_sec == nearest->when_sec &&
                 te->when_ms < nearest->when_ms))
            nearest = te;
        te = te->next;
    }
    return nearest;
}
复制代码

这段代码有详细的注释,也非常好理解。代码作者注释告诉我们,由于这里的定时器集合是无序的,所以需要遍历一下这个链表,算法复杂度是 O(n) 。同时,注释中也“暗示”了我们将来redis在这块的优化方向,即把这个链表按到期时间从小到大排下序,这样链表的头部就是我们要的最近时间点的定时器对象,算法复杂度是 O(1) 。或者使用redis中的skiplist,算法复杂度是 O(log(N))

接着获取当前系统时间( aeGetTime(&now_sec, &now_ms); )将最早要到期的定时器时间减去当前系统时间获得一个间隔。这个时间间隔作为 numevents = aeApiPoll(eventLoop, tvp); 调用的参数, aeApiPoll() linux 平台上使用的epoll技术, redis 在这个IO复用技术上在不同的操作系统平台上使用不同的系统函数,在Windows系统上使用 select ,在Mac系统上使用 kqueue 。这里我们重点看下 linux 平台下的实现:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}
复制代码

epoll_wait这个函数的签名如下:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
复制代码

最后一个参数 timeout 的设置非常有讲究,如果传入进来的 tvpNULL ,根据上文的分析,说明没有定时器事件,则将等待时间设置为**-1**,这会让 epoll_wait 无限期的挂起来,直到有事件时才会被唤醒。挂起的好处就是不浪费cpu时间片。反之,将 timeout 设置成最近的定时器事件间隔,将 epoll_wait 的等待时间设置为最近的定时器事件来临的时间间隔,可以及时唤醒 epoll_wait ,这样程序流可以尽快处理这个到期的定时器事件(下文会介绍)。

对于 epoll_wait 这种系统调用,所有的fd(对于网络通信,也叫socket)信息包括侦听fd和普通客户端fd都记录在事件循环对象 aeEventLoopapidata 字段中,当某个fd上有事件触发时,从 apidata 中找到该fd,并把事件类型( mask 字段)一起记录到 aeEventLoopfired 字段中去。我们先把这个流程介绍完,再介绍 epoll_wait 函数中使用的 epfd 是在何时何地创建的,侦听fd、客户端fd是如何挂载到epfd上去的。

在得到了有事件的fd以后,接下来就要处理这些事件了。在主循环 aeProcessEvents 中从 aeEventLoop 对象的 fired 数组中取出上一步记录的fd,然后根据事件类型(读事件和写事件)分别进行处理。

for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

	    /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
复制代码

读事件字段 rfileProc 和写事件字段 wfileProc 都是函数指针,在程序早期设置好,这里直接调用就可以了。

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;
复制代码

epfd的创建

我们通过搜索关键字 epoll_createae_poll.c 文件中找到epfd的创建函数 aeApiCreate

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}
复制代码

使用gdb的 b 命令在这个函数上加个断点,然后使用 run 命令重新运行一下 redis-server ,触发断点,使用 bt 命令查看此时的调用堆栈。啊哈,发现epfd也是在上文介绍的 initServer 函数中创建的。

(gdb) bt
#0  aeCreateEventLoop (setsize=10128) at ae.c:79
#1  0x000000000042f542 in initServer () at server.c:1841
#2  0x0000000000423803 in main (argc=<optimized out>, argv=0x7fffffffe588) at server.c:3857
复制代码

aeCreateEventLoop 中不仅创建了epfd,也创建了整个事件循环需要的 aeEventLoop 对象,并把这个对象记录在redis的一个全局变量的 el 字段中。这个全局变量叫 server ,这是一个结构体类型。其定义如下:

//位于server.c文件中
struct redisServer server; /* Server global state */
复制代码
//位于server.h文件中
struct redisServer {
    /* General */
    //省略部分字段...
    aeEventLoop *el;
    unsigned int lruclock;      /* Clock for LRU eviction */
    //太长了,省略部分字段...
}
复制代码

侦听fd与客户端fd是如何挂载到epfd上去的

同样的方式,要把一个fd挂载到epfd上去,需要调用系统API epoll_ctl ,全部搜索一下这个函数名。在文件 ae_epoll.c 中我们找到 aeApiAddEvent 函数:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,ⅇ) == -1) return -1;
    return 0;
}
复制代码

当把一个fd绑定到epfd上去的时候,先从 eventLoopaeEventLoop 类型)中寻找是否存在已关注的件类型,如果已经有了,说明使用epoll_ctl是更改已绑定的fd事件类型( EPOLL_CTL_MOD ),否则就是添加fd到epfd上。

我们在 aeApiAddEvent 加个断点,再重启下 redis-server 。触发断点后的调用堆栈如下:

#0  aeCreateFileEvent (eventLoop=0x7ffff083a0a0, fd=15, mask=mask@entry=1, proc=0x437f50 <acceptTcpHandler>, clientData=clientData@entry=0x0) at ae.c:145
#1  0x000000000042f83b in initServer () at server.c:1927
#2  0x0000000000423803 in main (argc=<optimized out>, argv=0x7fffffffe588) at server.c:3857
复制代码

同样在 initServer 函数中。我们结合上文分析的侦听fd的创建过程,去掉无关代码,抽出这个函数的主脉络得到如下伪代码:

void initServer(void) {
       
    //记录程序进程ID   
    server.pid = getpid();
      
    //创建程序的aeEventLoop对象和epfd对象
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

    //创建侦听fd
    listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        
	//将侦听fd设置为非阻塞的
    anetNonBlock(NULL,server.sofd);
 
	//创建redis的定时器,用于执行定时任务cron
	/* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR
	
	//将侦听fd绑定到epfd上去
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
     aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR
    
	//创建一个管道,用于在需要时去唤醒epoll_wait挂起的整个EventLoop
    /* Register a readable event for the pipe used to awake the event loop
     * when a blocked client in a module needs attention. */
    aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR)
}
复制代码

注意:这里所说的“ 主脉络 ”是指我们这里关心的网络通信的主脉络,不代表这个函数中其他代码就不是主要的。

我们如何验证这个断点处挂载到epfd上的fd就是侦听fd呢?这个很容易,创建侦听fd时,我们用gdb记录下这个fd的值。例如,笔者的电脑某次运行时,侦听fd的值是15。如下图(调试 工具 用的是cgdb):

redis网络通信模块源码分析

然后在运行程序至绑定fd的地方,确认一下绑定到epfd上的fd值:

redis网络通信模块源码分析

这里的fd值也是15,说明绑定的fd是侦听fd。当然在绑定侦听fd时,同时也指定了只关注可读事件,并设置事件回调函数为 acceptTcpHandler 。对于侦听fd,我们一般只要关注可读事件就可以了,一般当触发可读事件,说明有新的连接到来。

aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR
复制代码

acceptTcpHandler函数定义如下(位于文件 networking.c 中):

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(cfd,0,cip);
    }
}
复制代码

anetTcpAccept函数中调用就是我们上文中说的 anetGenericAccept 函数了。

int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
    int fd;
    struct sockaddr_storage sa;
    socklen_t salen = sizeof(sa);
    if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
        return ANET_ERR;

    if (sa.ss_family == AF_INET) {
        struct sockaddr_in *s = (struct sockaddr_in *)&sa;
        if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin_port);
    } else {
        struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
        if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin6_port);
    }
    return fd;
}
复制代码

至此,这段流程总算连起来了。我们在 acceptTcpHandler 上加个断点,然后重新运行一下 redis-server ,再开个 redis-cli 去连接 redis-server 。看看是否能触发该断点,如果能触发该断点,说明我们的分析时正确的。

经验证,确实触发了该断点。

redis网络通信模块源码分析

acceptTcpHandler 中成功接受新连接后,产生客户端fd,然后调用 acceptCommonHandler 函数,在该函数中调用 createClient 函数,在 createClient 函数中先将客户端fd设置成非阻塞的,然后将该fd关联到epfd上去,同时记录到整个程序的 aeEventLoop 对象上。注意,这里客户端fd绑定到epfd上时也只关注可读事件。我们将无关的代码去掉,然后抽出我们关注的部分,整理后如下(位于 networking.c 文件中):

client *createClient(int fd) {
    //将客户端fd设置成非阻塞的
    anetNonBlock(NULL,fd);
	//启用tcp NoDelay选项
	anetEnableTcpNoDelay(NULL,fd);
	//根据配置,决定是否启动tcpkeepalive选项
	if (server.tcpkeepalive)
		anetKeepAlive(NULL,fd,server.tcpkeepalive);
	//将客户端fd绑定到epfd,同时记录到aeEventLoop上,关注的事件为AE_READABLE,回调函数为
	//readQueryFromClient
	aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR
        
    return c;
}
复制代码

如何处理fd读事件

客户端fd的触发可读事件后,回调函数是 readQueryFromClient 。该函数实现如下(文件 networking.c 文件中):

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);

    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);

        if (remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }

    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    server.stat_net_input_bytes += nread;
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }

    /* Time to process the buffer. If the client is a master we need to
     * compute the difference between the applied offset before and after
     * processing the buffer, to understand how much of the replication stream
     * was actually applied to the master state: this quantity, and its
     * corresponding part of the replication stream, will be propagated to
     * the sub-slaves and to the replication backlog. */
    if (!(c->flags & CLIENT_MASTER)) {
        processInputBuffer(c);
    } else {
        size_t prev_offset = c->reploff;
        processInputBuffer(c);
        size_t applied = c->reploff - prev_offset;
        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}
复制代码

我们给这个函数加个断点,然后重新运行下 redis-server ,再启动一个客户端,然后尝试给服务器发送一个命令" set hello world "。但是在我们实际调试的时候,我们发现。只要 redis-cli 一连接成功,gdb就触发该断点,此时并没有发送我们预先想的命令。我们单步调试 readQueryFromClient 函数,将收取到的数据打印出来,得到如下字符串:

(gdb) p c->querybuf 
$8 = (sds) 0x7ffff09b8685 "*1\r\n$7\r\nCOMMAND\r\n"
复制代码

这里的 c->querybuf 是什么呢?这里的c的类型是 client 结构体,它是上文中我们介绍的连接接收成功后,产生新的客户端fd,绑定回调函数时产生的并传递给 readQueryFromClient 函数的参数。我们可以在 server.h 中找到它的定义:

* With multiplexing we need to take per-client state.
 * Clients are taken in a linked list. */
typedef struct client {
    uint64_t id;            /* Client incremental unique ID. */
    int fd;                 /* Client socket. */
    redisDb *db;            /* Pointer to currently SELECTed DB. */
    robj *name;             /* As set by CLIENT SETNAME. */
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    //省略掉部分字段
} client;
复制代码

client实际上是存储每个客户端连接信息的对象,其 fd 字段就是当前连接的fd, querybuf 字段就是当前连接的接收缓冲区,也就是说每个新客户端连接都会产生这样一个对象。从fd上收取数据后就存储在这个这个 querybuf 字段中。

我们贴一下完整的 createClient 函数的代码:

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));

    /* passing -1 as fd it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (fd != -1) {
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    selectDb(c,0);
    uint64_t client_id;
    atomicGetIncr(server.next_client_id,client_id,1);
    c->id = client_id;
    c->fd = fd;
    c->name = NULL;
    c->bufpos = 0;
    c->querybuf = sdsempty();
    c->pending_querybuf = sdsempty();
    c->querybuf_peak = 0;
    c->reqtype = 0;
    c->argc = 0;
    c->argv = NULL;
    c->cmd = c->lastcmd = NULL;
    c->multibulklen = 0;
    c->bulklen = -1;
    c->sentlen = 0;
    c->flags = 0;
    c->ctime = c->lastinteraction = server.unixtime;
    c->authenticated = 0;
    c->replstate = REPL_STATE_NONE;
    c->repl_put_online_on_ack = 0;
    c->reploff = 0;
    c->read_reploff = 0;
    c->repl_ack_off = 0;
    c->repl_ack_time = 0;
    c->slave_listening_port = 0;
    c->slave_ip[0] = '\0';
    c->slave_capa = SLAVE_CAPA_NONE;
    c->reply = listCreate();
    c->reply_bytes = 0;
    c->obuf_soft_limit_reached_time = 0;
    listSetFreeMethod(c->reply,freeClientReplyValue);
    listSetDupMethod(c->reply,dupClientReplyValue);
    c->btype = BLOCKED_NONE;
    c->bpop.timeout = 0;
    c->bpop.keys = dictCreate(&objectKeyPointerValueDictType,NULL);
    c->bpop.target = NULL;
    c->bpop.numreplicas = 0;
    c->bpop.reploffset = 0;
    c->woff = 0;
    c->watched_keys = listCreate();
    c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
    c->pubsub_patterns = listCreate();
    c->peerid = NULL;
    listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
    if (fd != -1) listAddNodeTail(server.clients,c);
    initClientMultiState(c);
    return c;
}
复制代码

redis-server接收到客户端的第一条命令

redis-cli给 redis-server 发送的第一条数据是 1\r\n$7\r\nCOMMAND\r\n 。我们来看下对于这条数据如何处理的,这个很容易做到,单步调试一下 readQueryFromClient 调用 read 函数收取完数据,接着继续处理 c->querybuf 的代码即可。经实际跟踪调试,调用的是 processInputBuffer 函数,位于 networking.c *文件中:

/* This function is called every time, in the client structure 'c', there is
 * more query buffer to process, because we read more data from the socket
 * or because a client was blocked and later reactivated, so there could be
 * pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {
    server.current_client = c;
    /* Keep processing while there is something in the input buffer */
    while(sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED) break;

        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands).
         *
         * The same applies for clients we want to terminate ASAP. */
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        /* Determine request type when unknown. */
        if (!c->reqtype) {
            if (c->querybuf[0] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }

        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            if (processCommand(c) == C_OK) {
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                    /* Update the applied replication offset of our master. */
                    c->reploff = c->read_reploff - sdslen(c->querybuf);
                }

                /* Don't reset the client structure for clients blocked in a
                 * module blocking command, so that the reply callback will
                 * still be able to access the client argv and argc field.
                 * The client will be reset in unblockClientFromModule(). */
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    resetClient(c);
            }
            /* freeMemoryIfNeeded may flush slave output buffers. This may
             * result into a slave, that may be the active client, to be
             * freed. */
            if (server.current_client == NULL) break;
        }
    }
    server.current_client = NULL;
}
复制代码

processInputBuffer先判断接收到的字符串是不是以星号(*)开头,这里是以星号开头,然后设置Client对象的 reqtype 字段值为 PROTO_REQ_MULTIBULK 类型,接着调用 processMultibulkBuffer 函数接着处理剩余的字符串。处理后的字符串被解析成redis命令,记录在 client 对象的 argcargv 两个字段中,前者记录当前命令的数目,后者存储的的是命令对应的结构体对象的地址。由于,如何解析这些命令以及存在哪些命令结构体不是我们本章节的关注点,这里就不再详细分析了。

命令解析完成以后,从 processMultibulkBuffer 函数返回后,在 processCommand 函数中处理刚才记录在 client 对象 argv 字段中的命令。

//为了与原代码保持一致,代码缩进未调整
if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            if (processCommand(c) == C_OK) {
                //省略部分代码
            }
            
        }
复制代码

processCommand 函数中,处理命令,命令的处理流程大致如下:

  1. 先判断是不是 quit 命令,如果是,则往发送缓冲区中添加一条应答命令(应答redis客户端),并给当前 client 对象设置 CLIENT_CLOSE_AFTER_REPLY 标志,这个标志见名知意,即应答完毕后关闭连接。

  2. 如果不是 quit 命令,则使用 lookupCommand 函数从全局命令字典表中查找相应的命令,如果出错,则向发送缓冲区中添加出错应答。出错不是指的是程序逻辑出错,有可能是客户端发送的非法命令。如果找到相应的命令,则执行命令后添加应答。

    int processCommand(client *c) {
        /* The QUIT command is handled separately. Normal command procs will
         * go through checking for replication and QUIT will cause trouble
         * when FORCE_REPLICATION is enabled and would be implemented in
         * a regular command proc. */
        if (!strcasecmp(c->argv[0]->ptr,"quit")) {
            addReply(c,shared.ok);
            c->flags |= CLIENT_CLOSE_AFTER_REPLY;
            return C_ERR;
        }
    
        /* Now lookup the command and check ASAP about trivial error conditions
         * such as wrong arity, bad command name and so forth. */
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
        if (!c->cmd) {
            flagTransaction(c);
            addReplyErrorFormat(c,"unknown command '%s'",
                (char*)c->argv[0]->ptr);
            return C_OK;
        } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
                   (c->argc < -c->cmd->arity)) {
            flagTransaction(c);
            addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
                c->cmd->name);
            return C_OK;
        }
        
        //...省略部分代码
        
    }
    复制代码

全局字典表是前面介绍的 server 全局变量(类型是 redisServer )的一个字段 commands

struct redisServer {
    /* General */
    pid_t pid;                  /* Main process pid. */
    //无关字段省略
    dict *commands;             /* Command table */
    
    //无关字段省略
}
复制代码

至于这个全局字典表在哪里初始化的以及相关的数据结构类型,由于与本节主题无关,这里就不分析了。

下面重点探究下如何将应答命令(包括出错的应答)添加到发送缓冲区去。我们以添加一个“ok”命令为例:

void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    /* This is an important place where we can avoid copy-on-write
     * when there is a saving child running, avoiding touching the
     * refcount field of the object if it's not needed.
     *
     * If the encoding is RAW and there is room in the static buffer
     * we'll be able to send the object to the client without
     * messing with its page. */
    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyObjectToList(c,obj);
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* Optimization: if there is room in the static buffer for 32 bytes
         * (more than the max chars a 64 bit integer can take as string) we
         * avoid decoding the object and go for the lower level approach. */
        if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
            char buf[32];
            int len;

            len = ll2string(buf,sizeof(buf),(long)obj->ptr);
            if (_addReplyToBuffer(c,buf,len) == C_OK)
                return;
            /* else... continue with the normal code path, but should never
             * happen actually since we verified there is room. */
        }
        obj = getDecodedObject(obj);
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyObjectToList(c,obj);
        decrRefCount(obj);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}
复制代码

addReply函数中有两个关键的地方,一个是 prepareClientToWrite 函数调用,另外一个是**_addReplyToBuffer 函数调用。先来看 prepareClientToWrite**。这个函数中有这样一段代码:

if (!clientHasPendingReplies(c) &&
        !(c->flags & CLIENT_PENDING_WRITE) &&
        (c->replstate == REPL_STATE_NONE ||
         (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
    {
        /* Here instead of installing the write handler, we just flag the
         * client and put it into a list of clients that have something
         * to write to the socket. This way before re-entering the event
         * loop, we can try to directly write to the client sockets avoiding
         * a system call. We'll only really install the write handler if
         * we'll not be able to write the whole reply at once. */
        c->flags |= CLIENT_PENDING_WRITE;
        listAddNodeHead(server.clients_pending_write,c);
    }
复制代码

这段代码先判断发送缓冲区中是否还有未发送的应答命令——通过判断 client 的对象的 bufpos 字段(int型)和 reply 字段(这是一个链表)的长度是否大于0。

/* Return true if the specified client has pending reply buffers to write to
 * the socket. */
int clientHasPendingReplies(client *c) {
    return c->bufpos || listLength(c->reply);
}
复制代码

如果当前 client 对象不是处于 CLIENT_PENDING_WRITE 状态,且在发送缓冲区没有剩余数据,则给该 client 对象设置 CLIENT_PENDING_WRITE 标志,并将当前 client 对象添加到全局server对象的名叫 clients_pending_write 链表中去。这个链表中存的是所有有数据要发送的 client 对象,注意和上面说的 reply 链表区分开来。

关于 CLIENT_PENDING_WRITE 标志,redis解释是:

Client has output to send but a write handler is yet not installed
复制代码

翻译成中文就是,一个有数据需要发送,但是还没有注册可写事件的client对象。

接着讨论**_addReplyToBuffer 函数。其实现位于 networking.c**文件中。

int _addReplyToBuffer(client *c, const char *s, size_t len) {
    size_t available = sizeof(c->buf)-c->bufpos;

    if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;

    /* If there already are entries in the reply list, we cannot
     * add anything more to the static buffer. */
    if (listLength(c->reply) > 0) return C_ERR;

    /* Check that the buffer has enough space available for this string. */
    if (len > available) return C_ERR;

    memcpy(c->buf+c->bufpos,s,len);
    c->bufpos+=len;
    return C_OK;
}
复制代码

在这个函数中再次确保了 client 对象的 reply 链表长度不能大于0(if判断,如果不满足条件,则退出该函数)。 reply 链表存储的是待发送的应答命令。应答命令被存储在 client 对象的 buf 字段中,其长度被记录在 bufpos 字段中。 buf 字段是一个固定大小的字节数组:

typedef struct client {
    uint64_t id;            /* Client incremental unique ID. */
    int fd;                 /* Client socket. */
    redisDb *db;            /* Pointer to currently SELECTed DB. */
    robj *name;             /* As set by CLIENT SETNAME. */
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    sds pending_querybuf;   /* If this is a master, this buffer represents the
                               yet not applied replication stream that we
                               are receiving from the master. */
   //省略部分字段...

    /* Response buffer */
    int bufpos;
    char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
复制代码

PROTO_REPLY_CHUNK_BYTES在redis中的定义是 16*1024 ,也就是说应答命令数据包最长是 16k

回到我们上面提的命令: 1\r\n$7\r\nCOMMAND\r\n ,通过 lookupCommand *解析之后得到" command "命令,在gdb中显示如下:

2345        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
(gdb) n
2346        if (!c->cmd) {
(gdb) p c->cmd
$23 = (struct redisCommand *) 0x742db0 <redisCommandTable+13040>
(gdb) p *c->cmd
$24 = {name = 0x4fda67 "command", proc = 0x42d920 <commandCommand>, arity = 0, sflags = 0x50dc3e "lt", flags = 1536, getkeys_proc = 0x0, firstkey = 0, lastkey = 0, 
  keystep = 0, microseconds = 1088, calls = 1}
复制代码

如何处理可写事件

上面我们介绍了 redis-server 如何处理可读事件,整个流程就是注册可读事件回调函数,在回调函数中调用操作系统API read 函数收取数据,然后解析数据得到redis命令,处理命令接着将应答数据包放到 client 对象的 buf 字段中去。那么放入 buf 字段的数据何时发给客户端呢?

还记得我们前面章节说的那个while事件循环吗?我们再来回顾一下它的代码:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
复制代码

其中,先判断 eventLoop 对象的 beforesleep 对象是否设置了,这是一个回调函数。在 redis-server 初始化时已经设置好了。

void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
    eventLoop->beforesleep = beforesleep;
}
复制代码

我们在 aeSetBeforeSleepProc 这个函数上设置一个断点,然后重启一下 redis-server 来验证一下在何处设置的这个回调。

Breakpoint 2, aeSetBeforeSleepProc (eventLoop=0x7ffff083a0a0, beforesleep=beforesleep@entry=0x4294f0 <beforeSleep>) at ae.c:507
507         eventLoop->beforesleep = beforesleep;
(gdb) bt
#0  aeSetBeforeSleepProc (eventLoop=0x7ffff083a0a0, beforesleep=beforesleep@entry=0x4294f0 <beforeSleep>) at ae.c:507
#1  0x00000000004238d2 in main (argc=<optimized out>, argv=0x7fffffffe588) at server.c:3892
复制代码

使用 f 1 命令切换到堆栈**#1**,并输入 l 显示断点附近的代码:

(gdb) l
3887        /* Warning the user about suspicious maxmemory setting. */
3888        if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
3889            serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
3890        }
3891
3892        aeSetBeforeSleepProc(server.el,beforeSleep);
3893        aeSetAfterSleepProc(server.el,afterSleep);
3894        aeMain(server.el);
3895        aeDeleteEventLoop(server.el);
3896        return 0;
复制代码

3892行将这个回调设置成 beforeSleep 函数。因此每一轮循环都会调用这个 beforeSleep 函数。 server.el 前面也介绍过就是 aeEventLoop 对象。在这个 beforeSleep 函数中有一个 handleClientsWithPendingWrites 调用(位于文件 server.c 中):

void beforeSleep(struct aeEventLoop *eventLoop) {
    //省略无关代码...

    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();
	
	//省略无关代码...
}
复制代码

handleClientsWithPendingWrites函数调用即把记录在每个 client 中的数据发送出去。我们具体看一下发送的逻辑(位于 networking.c 文件中):

/* This function is called just before entering the event loop, in the hope
 * we can just write the replies to the client output buffer without any
 * need to use a syscall in order to install the writable event handler,
 * get it called, and so forth. */
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);

    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);

        /* Try to write buffers to the client socket. */
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        /* If there is nothing left, do nothing. Otherwise install
         * the write handler. */
        if (clientHasPendingReplies(c) &&
            aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
                sendReplyToClient, c) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    return processed;
}
复制代码

上面的代码先从全局 server 对象(前面已经介绍过了)的 clients_pending_write 字段(存储 client 对象的链表)挨个取出有数据要发送的 client 对象,然后调用 writeToClient 函数尝试将 client 中存储的应答数据发出去。

//位于networking.c文件中
int writeToClient(int fd, client *c, int handler_installed) {
    ssize_t nwritten = 0, totwritten = 0;
    size_t objlen;
    sds o;

    while(clientHasPendingReplies(c)) {
        if (c->bufpos > 0) {
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;

            /* If the buffer was sent, set bufpos to zero to continue with
             * the remainder of the reply. */
            if ((int)c->sentlen == c->bufpos) {
                c->bufpos = 0;
                c->sentlen = 0;
            }
        } else {
            o = listNodeValue(listFirst(c->reply));
            objlen = sdslen(o);

            if (objlen == 0) {
                listDelNode(c->reply,listFirst(c->reply));
                continue;
            }

            nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;

            /* If we fully sent the object on head go to the next one */
            if (c->sentlen == objlen) {
                listDelNode(c->reply,listFirst(c->reply));
                c->sentlen = 0;
                c->reply_bytes -= objlen;
                /* If there are no longer objects in the list, we expect
                 * the count of reply bytes to be exactly zero. */
                if (listLength(c->reply) == 0)
                    serverAssert(c->reply_bytes == 0);
            }
        }
        /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
         * bytes, in a single threaded server it's a good idea to serve
         * other clients as well, even if a very large request comes from
         * super fast link that is always able to accept data (in real world
         * scenario think about 'KEYS *' against the loopback interface).
         *
         * However if we are over the maxmemory limit we ignore that and
         * just deliver as much data as it is possible to deliver. */
        if (totwritten > NET_MAX_WRITES_PER_EVENT &&
            (server.maxmemory == 0 ||
             zmalloc_used_memory() < server.maxmemory)) break;
    }
    server.stat_net_output_bytes += totwritten;
    if (nwritten == -1) {
        if (errno == EAGAIN) {
            nwritten = 0;
        } else {
            serverLog(LL_VERBOSE,
                "Error writing to client: %s", strerror(errno));
            freeClient(c);
            return C_ERR;
        }
    }
    if (totwritten > 0) {
        /* For clients representing masters we don't count sending data
         * as an interaction, since we always send REPLCONF ACK commands
         * that take some time to just fill the socket output buffer.
         * We just rely on data / pings received for timeout detection. */
        if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
    }
    if (!clientHasPendingReplies(c)) {
        c->sentlen = 0;
        if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);

        /* Close connection after entire reply has been sent. */
        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
            freeClient(c);
            return C_ERR;
        }
    }
    return C_OK;
}
复制代码

writeToClient函数先把自己处理的 client 对象的 buf 字段的数据发出去,如果出错的话则释放这个 client 。如果数据能够全部发完,发完以后,则会移除对应的 fd 上的可写事件(如果添加了);如果当前 client 设置了 CLIENT_CLOSE_AFTER_REPLY 标志,则发送完数据,立即释放这个 client 对象。

当然,可能存在一种情况是,由于网络或者客户端的原因,redis-server某个客户端的数据发送不出去,或者只有部分可以发出去(例如:服务器端给客户端发数据,客户端的应用层一直不从Tcp内核缓冲区中取出数据,这样服务器发送一段时间的数据后,客户端内核缓冲区满了,服务器再发数据就会发不出去了。由于fd是非阻塞的,这个时候服务器调用 send 或者 write 函数会直接返回,返回值是-1,错误码是 EAGAIN ,见上面的代码。)。不管哪种情况,数据这一次发不完。这个时候就需要监听可写事件了。因为在 handleClientsWithPendingWrites 函数中有如下代码:

/* If there is nothing left, do nothing. Otherwise install
 * the write handler. */
if (clientHasPendingReplies(c) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
												sendReplyToClient, c) == AE_ERR)
{
	freeClientAsync(c);
}
复制代码

这里注册可写事件 AE_WRITABLE 的回调函数是 sendReplyToClient 。也就是说,当下一次某个触发可写事件时,调用的就是 sendReplyToClient 函数了。可以猜想, sendReplyToClient 发送数据的逻辑和上面的 writeToClient 函数一模一样,不信请看(位于文件 networking.c 文件中):

/* Write event handler. Just send data to the client. */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    UNUSED(el);
    UNUSED(mask);
    writeToClient(fd,privdata,1);
}
复制代码

至此, redis-server 发送数据的逻辑也理清楚了。这里简单做个总结:

如果有数据要发送给某个 client ,不需要专门注册可写事件,等触发可写事件再发送。通常的做法是,在应答数据产生的地方直接发送,如果是因为对端Tcp窗口太小引起的发送不完,则将剩余的数据存储至某个缓冲区并注册监听可写事件,等下次触发可写事件后再尝试发送,一直到数据全部发送完毕后移除可写事件。

redis-server数据的发送逻辑与这个稍微有点差别,就是将数据发送的时机放到了EventLoop的某个时间点上(这里是在ProcessEvents之前),其他的与上面完全一样。

之所以不注册监听可写事件,等可写事件触发再发送数据,原因是通常情况下,网络通信的两端数据一般都是正常收发的,一般不会出现某一端由于Tcp窗口太小而使另外一端发不出去的情况。如果注册监听可写事件,那么这个事件会频繁触发,而触发时不一定有数据需要发送,这样不仅浪费系统资源,同时也浪费服务器程序宝贵的CPU时间片。

定时器逻辑

一个网络通信模块是离不开定时器的,前面我们也介绍了在事件处理函数的中如何去除最早到期的定时器对象,这里我们接着这个问题继续讨论。在 aeProcessEvents 函数(位于文件 ae.c 中)的结尾处有这样一段代码:

/* Check time events */
if (flags & AE_TIME_EVENTS)
	processed += processTimeEvents(eventLoop);
复制代码

如果存在定时器事件,则调用 processTimeEvents 函数(位于文件 ae.c 中)进行处理。

/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te, *prev;
    long long maxId;
    time_t now = time(NULL);

    /* If the system clock is moved to the future, and then set back to the
     * right value, time events may be delayed in a random way. Often this
     * means that scheduled operations will not be performed soon enough.
     *
     * Here we try to detect system clock skews, and force all the time
     * events to be processed ASAP when this happens: the idea is that
     * processing events earlier is less dangerous than delaying them
     * indefinitely, and practice suggests it is. */
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;

    prev = NULL;
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;

        /* Remove events scheduled for deletion. */
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            if (prev == NULL)
                eventLoop->timeEventHead = te->next;
            else
                prev->next = te->next;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }

        /* Make sure we don't process time events created by time events in
         * this iteration. Note that this check is currently useless: we always
         * add new timers on the head, however if we change the implementation
         * detail, this check may be useful again: we keep it here for future
         * defense. */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        prev = te;
        te = te->next;
    }
    return processed;
}
复制代码

这段代码核心逻辑就是通过 eventLoop->timeEventHead 中记录的定时器对象链表遍历每个定时器对象的时间,然后与当前时间比较,如果定时器已经到期,则调用定时器对象设置的回调函数 timeProc 进行处理。这段代码,没有什么特别需要注意的地方。但是代码中作者考虑到了一种特殊场景,就是假设有人讲当前的计算机时间调到了未来某个时刻,然后再调回来。这样就会出现 now (当前时间)小于 eventLoop->lastTime (记录在 aeEventLoop 中的上一次时间)。出现这种情况怎么办呢?redis的作者,遍历该定时器对象链表,将这个链表中的所有定时器对象的时间设置成0。这样,这些定时器就会立即得到处理了。这也就是作者在代码注释中说的:

force all the time events to be processed ASAP
复制代码

ASAP应该是英文 As Soon As Possible (尽快)的缩写吧。

那么 redis-server 中到底哪些地方使用了定时器呢?我们可以在redis源码中搜索创建定时器的函数 aeCreateTimeEvent ,在 initServer 函数中有这么一行(位于 server.c 文件中):

if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
 }
复制代码

上述代码前面的章节我们也提到过,原来定时器的用途是用于redis的Cron任务。这个任务具体做些什么工作,就不是本章节的内容了,有兴趣的读者可以阅读下 serverCron 函数源码(位于 server.c 中)。

aftersleep钩子

通常情形下,在一个EventLoop中除了有定时器、IO Multiplexing和IO事件处理逻辑外,可以根据需求自定义一些函数,这类函数我们称之为“ 钩子函数 ”。钩子函数可以位于Loop的任何位置,前面我们介绍的 beforesleep 函数就是在事件处理之前的自定义钩子函数(位于定时器时间检测逻辑之前)。

redis-server 中,在IO Multiplexing调用与IO事件处理逻辑之间也有一个自定义钩子函数叫 aftersleep

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    //无关代码省略...
    numevents = aeApiPoll(eventLoop, tvp);

    /* After sleep callback. */
    if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
    	eventLoop->aftersleep(eventLoop);

    for (j = 0; j < numevents; j++) {
        //无关代码省略...
    }	
}
复制代码

这个函数在 main 函数中设置:

int main(int argc, char **argv) {
	//无关代码省略...
	aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    
     return 0;
}
复制代码

由于 afterSleep 函数的具体作用与我们的网络通信无关,这里也就不再介绍了。

redis-server端网络通信模块小结

通过前面的讲解,我们用一张图来概括一下 redis-server 端的网络通信模型。

redis网络通信模块源码分析

如上图所示,这就是典型的利用 one loop one thread 思想实现的reactor网络通信模型,也是目前最主流的网络通信架构。而且由于 redis-server 的网络通信中所有的客户端fd和侦听fd都集中在一个EventLoop中,所以通常也说redis的网络通信模型是单线程的。

探究redis-cli端的网络通信模型

我们接着探究一下redis源码自带的客户端 redis-cli 的网络通信模块。

我们使用gdb把 redis-cli 跑起来以后,原来打算按 Ctrl + C 让gdb中断下来查看一下 redis-cli 跑起来有几个线程,但是实验之后发现,这样并不能让gdb中断下来,反而会导致 redis-cli 这个进程退出。

一按Ctrl + C进程就自动退出了, 如下图

redis网络通信模块源码分析

我们换个思路:直接把 redis-cli 跑起来,然后使用linux pstack + 进程id 来查看下 redis-cli 的线程数量。

[root@localhost ~]# ps -ef | grep redis-cli
root     35454 12877  0 14:51 pts/1    00:00:00 ./redis-cli
root     35468 33548  0 14:51 pts/5    00:00:00 grep --color=auto redis-cli
[root@localhost ~]# pstack 35454
#0  0x00007f011c2186f0 in __read_nocancel () from /lib64/libpthread.so.0
#1  0x000000000041bc5c in linenoiseEdit (stdin_fd=0, stdout_fd=1, buflen=4096, prompt=<optimized out>, buf=0x7ffea3c20410 "") at linenoise.c:800
#2  linenoiseRaw (buflen=4096, prompt=<optimized out>, buf=0x7ffea3c20410 "") at linenoise.c:991
#3  linenoise (prompt=<optimized out>) at linenoise.c:1059
#4  0x00000000004116ac in repl () at redis-cli.c:1398
#5  0x000000000040aa4e in main (argc=0, argv=0x7ffea3c216b0) at redis-cli.c:2950
复制代码

通过上面的输出,我们发现 redis-cli 只有一个主线程。既然只有一个主线程,那么我们可以断定 redis-cli 中的发给 redis-server 的命令肯定都是同步的,这里同步的意思是发送命令后一直等待服务器应答或者应答超时。

redis-climain 函数(位于文件 redis-cli.c 中)有这样一段代码:

/* Start interactive mode when no command is provided */
if (argc == 0 && !config.eval) {
    /* Ignore SIGPIPE in interactive mode to force a reconnect */
    signal(SIGPIPE, SIG_IGN);

    /* Note that in repl mode we don't abort on connection error.
    * A new attempt will be performed for every command send. */
    cliConnect(0);
    repl();
}
复制代码

其中 cliConnect(0) 调用代码(位于 redis-cli.c 文件中)如下:

static int cliConnect(int force) {
    if (context == NULL || force) {
        if (context != NULL) {
            redisFree(context);
        }

        if (config.hostsocket == NULL) {
            context = redisConnect(config.hostip,config.hostport);
        } else {
            context = redisConnectUnix(config.hostsocket);
        }

        if (context->err) {
            fprintf(stderr,"Could not connect to Redis at ");
            if (config.hostsocket == NULL)
                fprintf(stderr,"%s:%d: %s\n",config.hostip,config.hostport,context->errstr);
            else
                fprintf(stderr,"%s: %s\n",config.hostsocket,context->errstr);
            redisFree(context);
            context = NULL;
            return REDIS_ERR;
        }

        /* Set aggressive KEEP_ALIVE socket option in the Redis context socket
         * in order to prevent timeouts caused by the execution of long
         * commands. At the same time this improves the detection of real
         * errors. */
        anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);

        /* Do AUTH and select the right DB. */
        if (cliAuth() != REDIS_OK)
            return REDIS_ERR;
        if (cliSelect() != REDIS_OK)
            return REDIS_ERR;
    }
    return REDIS_OK;
}
复制代码

这个函数做的工作可以分为三步:

  1. context = redisConnect(config.hostip,config.hostport);
  2. cliAuth()
  3. cliSelect()

我们先来看第一步 redisConnect 函数,这个函数实际又调用 redisContextConnectTcp 函数,后者又调用**_redisContextConnectTcp 函数。 _redisContextConnectTcp 函数是实际连接redis-server的地方,先调用API getaddrinfo 解析传入进来的ip地址和端口号(笔者这里是127.0.0.1和6379),然后创建socket,并将socket设置成非阻塞模式,接着调用API connect 函数,由于socket是非阻塞模式, connect 函数会立即返回-1。接着调用 redisContextWaitReady 函数,该函数中调用API poll 检测连接的socket是否可写( POLLOUT ),如果可写则表示连接redis-server成功。由于 _redisContextConnectTcp 代码较多,我们去掉一些无关的代码,整理出关键逻辑的伪码如下(位于 net.c**文件中):

static int _redisContextConnectTcp(redisContext *c, const char *addr, int port,
                                   const struct timeval *timeout,
                                   const char *source_addr) {
    //省略部分无关代码...    

    rv = getaddrinfo(c->tcp.host,_port,&hints,&servinfo)) != 0

    s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1

    redisSetBlocking(c,0) != REDIS_OK

    connect(s,p->ai_addr,p->ai_addrlen)

    redisContextWaitReady(c,timeout_msec) != REDIS_OK

    return rv;  // Need to return REDIS_OK if alright
}
复制代码

redisContextWaitReady函数的代码(位于 net.c 文件中)如下:

static int redisContextWaitReady(redisContext *c, long msec) {
    struct pollfd   wfd[1];

    wfd[0].fd     = c->fd;
    wfd[0].events = POLLOUT;

    if (errno == EINPROGRESS) {
        int res;

        if ((res = poll(wfd, 1, msec)) == -1) {
            __redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)");
            redisContextCloseFd(c);
            return REDIS_ERR;
        } else if (res == 0) {
            errno = ETIMEDOUT;
            __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
            redisContextCloseFd(c);
            return REDIS_ERR;
        }

        if (redisCheckSocketError(c) != REDIS_OK)
            return REDIS_ERR;

        return REDIS_OK;
    }

    __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
    redisContextCloseFd(c);
    return REDIS_ERR;
}
复制代码

这里贴一下此时的调用堆栈:

(gdb) bt
#0  redisContextWaitReady (c=c@entry=0x66f050, msec=msec@entry=-1) at net.c:213
#1  0x000000000041a4dd in _redisContextConnectTcp (c=c@entry=0x66f050, addr=addr@entry=0x66f011 "127.0.0.1", port=port@entry=6379, timeout=timeout@entry=0x0, 
    source_addr=source_addr@entry=0x0) at net.c:391
#2  0x000000000041a948 in redisContextConnectTcp (c=c@entry=0x66f050, addr=addr@entry=0x66f011 "127.0.0.1", port=port@entry=6379, timeout=timeout@entry=0x0)
    at net.c:420
#3  0x0000000000414ec9 in redisConnect (ip=0x66f011 "127.0.0.1", port=6379) at hiredis.c:682
#4  0x000000000040f6b2 in cliConnect (force=<optimized out>) at redis-cli.c:606
#5  0x000000000040aa49 in main (argc=0, argv=0x7fffffffe680) at redis-cli.c:2949
复制代码

连接redis-server成功以后,会接着调用上文中提到的 cliAuthcliSelect 函数,这两个函数分别根据是否配置了 config.authconfig.dbnum 来给redis-server发送相关命令。由于我们这里没配置,所以这两个函数实际什么也不做。

583     static int cliSelect(void) {
(gdb) n
585         if (config.dbnum == 0) return REDIS_OK;
(gdb) p config.dbnum
$11 = 0
复制代码

接着调用 repl 函数,在这个函数中是一个while循环,不断从命令行中获取用户输入:

//位于redis-cli.c文件中
static void repl(void) {
    //...省略无关代码...
    while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) {
        if (line[0] != '\0') {
            argv = cliSplitArgs(line,&argc);
            if (history) linenoiseHistoryAdd(line);
            if (historyfile) linenoiseHistorySave(historyfile);

            if (argv == NULL) {
                printf("Invalid argument(s)\n");
                linenoiseFree(line);
                continue;
            } else if (argc > 0) {
                if (strcasecmp(argv[0],"quit") == 0 ||
                    strcasecmp(argv[0],"exit") == 0)
                {
                    exit(0);
                } else if (argv[0][0] == ':') {
                    cliSetPreferences(argv,argc,1);
                    continue;
                } else if (strcasecmp(argv[0],"restart") == 0) {
                    if (config.eval) {
                        config.eval_ldb = 1;
                        config.output = OUTPUT_RAW;
                        return; /* Return to evalMode to restart the session. */
                    } else {
                        printf("Use 'restart' only in Lua debugging mode.");
                    }
                } else if (argc == 3 && !strcasecmp(argv[0],"connect")) {
                    sdsfree(config.hostip);
                    config.hostip = sdsnew(argv[1]);
                    config.hostport = atoi(argv[2]);
                    cliRefreshPrompt();
                    cliConnect(1);
                } else if (argc == 1 && !strcasecmp(argv[0],"clear")) {
                    linenoiseClearScreen();
                } else {
                    long long start_time = mstime(), elapsed;
                    int repeat, skipargs = 0;
                    char *endptr;

                    repeat = strtol(argv[0], &endptr, 10);
                    if (argc > 1 && *endptr == '\0' && repeat) {
                        skipargs = 1;
                    } else {
                        repeat = 1;
                    }

                    issueCommandRepeat(argc-skipargs, argv+skipargs, repeat);

                    /* If our debugging session ended, show the EVAL final
                     * reply. */
                    if (config.eval_ldb_end) {
                        config.eval_ldb_end = 0;
                        cliReadReply(0);
                        printf("\n(Lua debugging session ended%s)\n\n",
                            config.eval_ldb_sync ? "" :
                            " -- dataset changes rolled back");
                    }

                    elapsed = mstime()-start_time;
                    if (elapsed >= 500 &&
                        config.output == OUTPUT_STANDARD)
                    {
                        printf("(%.2fs)\n",(double)elapsed/1000);
                    }
                }
            }
            /* Free the argument vector */
            sdsfreesplitres(argv,argc);
        }
        /* linenoise() returns malloc-ed lines like readline() */
        linenoiseFree(line);
    }
    exit(0);
}
复制代码

得到用户输入的一行命令后,先保存到历史记录中(以便下一次按键盘上的上下箭头键再次输入),然后校验命令的合法性,如果是本地命令(不需要发送给服务器的命令,如 quitexit )则直接执行,如果是远端命令,则调用 issueCommandRepeat 函数发送给服务器端:

//位于文件redis-cli.c中
static int issueCommandRepeat(int argc, char **argv, long repeat) {
    while (1) {
        config.cluster_reissue_command = 0;
        if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {
            cliConnect(1);

            /* If we still cannot send the command print error.
             * We'll try to reconnect the next time. */
            if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {
                cliPrintContextError();
                return REDIS_ERR;
            }
         }
         /* Issue the command again if we got redirected in cluster mode */
         if (config.cluster_mode && config.cluster_reissue_command) {
            cliConnect(1);
         } else {
             break;
        }
    }
    return REDIS_OK;
}
复制代码

实际发送命令的函数是 cliSendCommand ,在 cliSendCommand 函数中又调用 cliReadReply 函数,后者又调用 redisGetReply 函数,在 redisGetReply 函数中又调用 redisBufferWrite 函数,在 redisBufferWrite 函数中最终调用系统API write 将我们输入的命令发出去:

//位于hiredis.c文件中
int redisBufferWrite(redisContext *c, int *done) {
    int nwritten;

    /* Return early when the context has seen an error. */
    if (c->err)
        return REDIS_ERR;

    if (sdslen(c->obuf) > 0) {
        nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
        if (nwritten == -1) {
            if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
                /* Try again later */
            } else {
                __redisSetError(c,REDIS_ERR_IO,NULL);
                return REDIS_ERR;
            }
        } else if (nwritten > 0) {
            if (nwritten == (signed)sdslen(c->obuf)) {
                sdsfree(c->obuf);
                c->obuf = sdsempty();
            } else {
                sdsrange(c->obuf,nwritten,-1);
            }
        }
    }
    if (done != NULL) *done = (sdslen(c->obuf) == 0);
    return REDIS_OK;
}
复制代码

redis-cli 中输入 set hello world 这一个简单的指令后,发送数据的调用堆栈如下:

(gdb) c
Continuing.
127.0.0.1:6379> set hello world

Breakpoint 7, redisBufferWrite (c=c@entry=0x66f050, done=done@entry=0x7fffffffe310) at hiredis.c:831
831     int redisBufferWrite(redisContext *c, int *done) {
(gdb) bt
#0  redisBufferWrite (c=c@entry=0x66f050, done=done@entry=0x7fffffffe310) at hiredis.c:831
#1  0x0000000000415942 in redisGetReply (c=0x66f050, reply=reply@entry=0x7fffffffe368) at hiredis.c:882
#2  0x00000000004102a0 in cliReadReply (output_raw_strings=output_raw_strings@entry=0) at redis-cli.c:846
#3  0x0000000000410e58 in cliSendCommand (argc=argc@entry=3, argv=argv@entry=0x693ed0, repeat=0, repeat@entry=1) at redis-cli.c:1006
#4  0x0000000000411445 in issueCommandRepeat (argc=3, argv=0x693ed0, repeat=<optimized out>) at redis-cli.c:1282
#5  0x00000000004117fa in repl () at redis-cli.c:1444
#6  0x000000000040aa4e in main (argc=0, argv=0x7fffffffe680) at redis-cli.c:2950
复制代码

当然,待发送的数据需要存储在一个全局静态变量 context 中,这是一个结构体,定义在 hiredis.h 文件中。

/* Context for a connection to Redis */
typedef struct redisContext {
    int err; /* Error flags, 0 when there is no error */
    char errstr[128]; /* String representation of error when applicable */
    int fd;
    int flags;
    char *obuf; /* Write buffer */
    redisReader *reader; /* Protocol reader */

    enum redisConnectionType connection_type;
    struct timeval *timeout;

    struct {
        char *host;
        char *source_addr;
        int port;
    } tcp;

    struct {
        char *path;
    } unix_sock;

} redisContext;
复制代码

其中字段 obuf 指向的是一个 sds 类型的对象,这个对象用来存储当前需要发送的命令。这也同时解决了命令一次发不完需要暂时缓存下来的问题。

redisGetReply 函数中发完数据后立马调用 redisBufferRead 去收取服务器的应答。

int redisGetReply(redisContext *c, void **reply) {
    int wdone = 0;
    void *aux = NULL;

    /* Try to read pending replies */
    if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
        return REDIS_ERR;

    /* For the blocking context, flush output buffer and read reply */
    if (aux == NULL && c->flags & REDIS_BLOCK) {
        /* Write until done */
        do {
            if (redisBufferWrite(c,&wdone) == REDIS_ERR)
                return REDIS_ERR;
        } while (!wdone);

        /* Read until there is a reply */
        do {
            if (redisBufferRead(c) == REDIS_ERR)
                return REDIS_ERR;
            if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
                return REDIS_ERR;
        } while (aux == NULL);
    }

    /* Set reply object */
    if (reply != NULL) *reply = aux;
    return REDIS_OK;
}
复制代码

拿到应答后就可以解析并显示在终端了。

总结起来, redis-cli 是一个实实在在的网络同步通信方式,只不过通信的socket仍然设置成非阻塞模式,这样有如下三个好处:

  1. 使用 connect 连接服务器时, connect 函数不会阻塞,可以立即返回,之后调用 poll 检测socket是否可写来判断是否连接成功。
  2. 在发数据时,如果因为对端tcp窗口太小发不出去, write 函数也会立即返回,不会阻塞,此时可以将未发送的数据暂存,下次继续发送。
  3. 在收数据时,如果当前没有数据可读,则 read 函数也不会阻塞,程序也可以立即返回,继续响应用户的输入。

redis的通信协议格式

redis客户端与服务器通信使用的是纯文本协议,以**\r\n**来作为协议或者命令或参数之间的分隔符。

我们接着通过 redis-cliredis-server 发送“ set hello world ”命令。

127.0.0.1:6379> set hello world
复制代码

此时服务器端收到的数据格式如下:

*3\r\n3\r\nset\r\n5\r\nhello\r\n$5\r\nworld\r\n
复制代码

其中第一个 3 是redis命令的标志信息,标志以星号( )开始,数字 3 是请求类型,不同的命令数字可能不一样,接着**\r\n**分割,后面就是统一的格式:

A指令字符长度\r\n指令A\r\nB指令或key字符长度\r\nB指令\r\nC内容长度\r\nC内容\r\n
复制代码

不同的指令长度不一样,携带的key和value也不一样,服务器端会根据命令的不同来进一步解析。

总结

至此,我们将redis的服务端和客户端的网络通信模块分析完了,redis的通信模型是非常常见的网络通信模型,也是非常值得学习和模仿的,建议想提高自己网络编程水平的读者可以好好研读一下。同时,在redis源码中,有许多网络通信API的使用小技巧,这也是非常值得我们学习的。

同时,redis中的用到的数据结构(字符串、链表、集合等)都有自己的高效实现,因此redis源码也是我们学习数据结构知识非常好的材料。

最后,redis也是目前业界用的最多的内存数据库,它不仅开源,而且源码量也不大。如果您想成为一名合格的服务器端开发人员,您应该去学习它、用好它。

更多的技术文章,您可以关注我的微信公众号 easyserverdev

redis网络通信模块源码分析

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

颠覆式成长

颠覆式成长

惠特尼•约翰逊 / 张瀚文 / 中信出版集团 / 2018-8 / 49.00

你可能想要标新立异、挑战自我,甚至抛弃安逸的事业; 你可能会从目前的行业或公司中跳槽,进入一个完全陌生的崭新领域, 这本书会让你认识到颠覆式成长的意义所在。 成功没有捷径,颠覆也会令人心生惧意,但是在职业发展与个人成长上的回报,会让你克服这种恐惧,让你不断尝试、不断精进。 S型曲线精进模型将帮助你预测自己创新的成长周期,洞悉颠覆自我过程中的心路历程,在变革与颠覆中从容应对,......一起来看看 《颠覆式成长》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

html转js在线工具
html转js在线工具

html转js在线工具