内容简介:很多公司面试的时候都喜欢问为什么 Redis 那么快?这就得益于 Redis的一般来说,客户端连接(socket)需要添加到多路复用IO句柄中进行监听,多路复用IO可以监听到客户端连接的状态变化,当客户端连接状态发生变化时(变为可读或可写),多路复用IO就会把状态发生变化的客户端连接列表返回给调用方。如下图:
很多公司面试的时候都喜欢问为什么 Redis 那么快?这就得益于 Redis的 事件驱动模块
,什么是 事件驱动
呢?通俗来说, 事件驱动
指的是当某一事件发生触发某一处理过程。举个例子,当发生火灾时,就会触发消防队救火,在这个例子中,事件是发生火灾,而处理过程是消防队救火。而在 Redis 中的事件指的是客户端连接就绪(可接收或者可发送数据),所以当客户端连接就绪时,就会触发 Redis 的处理过程(调用某一个处理函数)去处理客户端连接。
一般来说,客户端连接(socket)需要添加到多路复用IO句柄中进行监听,多路复用IO可以监听到客户端连接的状态变化,当客户端连接状态发生变化时(变为可读或可写),多路复用IO就会把状态发生变化的客户端连接列表返回给调用方。如下图:
不同的操作系统有不同的多路复用IO接口,比如 Linux 系统中使用的是 epoll,而 FreeBSD 系统中使用的 kqueue。由于Redis支持多操作系统平台,所以 Redis 为了跨平台对多路复用IO进行封装。
下面主要讨论 Redis 在 Linux 操作系统下对事件驱动库的封装。
Redis 事件驱动库的使用
1. 创建事件驱动对象
要使用Redis的事件驱动库,首先需要调用 aeCreateEventLoop()
函数创建一个事件驱动对象,其原型如下:
aeEventLoop *aeCreateEventLoop(int setsize);
参数 setsize
指定了事件驱动库能够监听多少个客户端连接, aeCreateEventLoop()
函数返回一个类型为 aeEventLoop
的对象(结构体)。
2. 添加监听的客户端连接
要监听一个客户端连接的状态变化,需要调用 aeCreateFileEvent()
函数把客户端连接添加到事件驱动对象中进行监听, aeCreateFileEvent()
函数原型如下:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData);
下面介绍一下 aeCreateFileEvent()
函数各个参数的作用:
-
eventLoop
:由aeCreateEventLoop()
函数创建的事件驱动对象。 -
fd
:客户端连接socket句柄。 -
mask
:监听客户端连接的事件,有AE_READABLE(读)
和AE_WRITABLE(写)
两种事件。 -
proc
:事件发生时的处理函数。 -
clientData
:proc
函数的参数。
3. 删除监听的客户端连接
当我们不希望某个客户端连接被事件驱动库监听时,可以通过调用 aeDeleteFileEvent()
把客户端连接从事件驱动库中删除,其原型如下:
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
下面介绍一下 aeDeleteFileEvent()
函数各个参数的作用:
-
eventLoop
:由aeCreateEventLoop()
函数创建的事件驱动对象。 -
fd
:客户端连接socket句柄。 -
mask
:监听客户端连接的事件,有AE_READABLE(读)
和AE_WRITABLE(写)
两种事件。
4. 等待客户端连接状态变化
当我们通过调用 aeCreateEventLoop()
函数把客户端连接添加到事件驱动库进行监听后,需要调用 aeMain()
函数等待客户端连接状态发生变化,其原型如下:
void aeMain(aeEventLoop *eventLoop);
aeMain()
函数只有一个参数,就是由 aeCreateEventLoop()
函数创建的事件驱动对象。
5. 事件驱动库使用示例
下面我们通过一个 demo 来说明事件驱动库的使用:
#include "ae.h" // 把socket设置为非阻塞 void set_nonblock(int sockfd) { flags = fcntl(sockfd, F_GETFL, 0); // 获取socket的flags值。 fcntl(sockfd, F_SETFL, flags | O_NONBLOCK); // 设置成非阻塞模式; } // 创建一个socket并监听端口 int listen_socket(short port) { int sockfd; sockaddr_in sin; sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); // 创建监听端口的socket sin.sin_family = AF_INET; sin.sin_port = htons(port); sin.sin_addr.S_un.S_addr = INADDR_ANY; bind(sockfd, (struct sockaddr*)&sin, sizeof(sin)); // 绑定IP地址和端口 listen(sockfd, 10); // 开始监听 set_nonblock(sockfd); // 把socket设置为非阻塞(比较重要, 否则可能会阻塞进程) return sockfd; } int main() { int serverfd; aeEventLoop *eventLoop; serverfd = listen_socket(8080); // 创建监听端口的socket eventLoop = aeCreateEventLoop(1024); // 创建事件驱动对象 aeCreateFileEvent(eventLoop, serverfd, AE_READABLE, accept_client, NULL); // 把socket添加到事件驱动对象中进行监听 aeMain(eventLoop); // 开始等待socket的状态发生变化 } // 处理接收到的连接 void accept_client(struct aeEventLoop *eventLoop, int serverfd, void *data, int mask) { while (1) { int clientfd; clientfd = accept(serverfd, (struct sockaddr*)NULL, NULL); if (clientfd == -1) { break; } set_nonblock(clientfd); // 把客户端socket设置为非阻塞(比较重要, 否则可能会阻塞进程) // 把客户端连接添加到事件驱动库中进行监听 aeCreateFileEvent(eventLoop, clientfd, AE_READABLE, process_client, NULL); } } // 处理客户端连接的请求 void process_client(struct aeEventLoop *eventLoop, int clientfd, void *data, int mask) { // ... }
上面的示例主要展示了怎样使用 Redis
的事件驱动库,程序主要完成了以下几个部分:
-
创建监听
8080
端口的socket句柄,然后设置为非阻塞。 -
创建事件驱动对象,并把监听
8080
端口的socket句柄添加到事件驱动对象中进行监听,监听事件为读事件(AE_READABLE)
,当其状态发生变化(可读)时回调函数为accept_client()
。 -
accept_client()
函数会调用accpet()
系统调用来接收客户端连接socket,并且把其添加到事件驱动对象中进行监听,监听事件为读事件(AE_READABLE)
,当其状态发生变化(可读)时回调函数为process_client()
,process_client()
函数可以处理客户端连接的请求。
注意:使用 Redis
事件驱动库时,必须把socket设置为非阻塞状态,如果socket是阻塞状态,那么可能会导致接收或发生数据时阻塞进程。
Redis 事件驱动库源码分析
前面说过,不同的操作系统平台有不同的 多路复用I/O
接口,Redis 为了跨平台,使用了面向接口的编程模式。如果使用 Java
或者 Golang
这些编程语言的同学可能接触过接口,以 Golang 为例,如果某一个对象(结构)实现接口的所有方法,那么就可以把这个对象(结构)当成这个接口。
但 Redis 是使用 C语言
编写的,C语言是没有接口这个概念的,所以必须使用某种方式来模拟接口。Redis 为不同的操作系统平台定义了不同的实现文件,而这些文件都实现相同的方法,然后根据不同的平台引入实现文件即可。例如,在 Linux 系统的实现文件是 ae_epoll.c
,在 FreeBSD 系统的实现文件是 ae_kqueue.c
,在 Soliris 系统的实现文件是 ae_evport.c
,其他系统是 ae_select.c
。打开这些文件可以发现,它们都实现了以下几个函数(方法):
aeApiCreate() // 用于创建平台对应的事件驱动上下文, 比如epoll就是创建epoll句柄 aeApiResize() // 用于扩展事件驱动库能够监听的的客户端连接 aeApiFree() // 用于释放由aeApiCreate()创建的上下文 aeApiAddEvent() // 用于把客户端连接添加到事件驱动上下文中 aeApiDelEvent() // 用于把客户端连接从事件驱动上下文中删除 aeApiPoll() // 用于等待监听的客户端连接状态发生变化 aeApiName() // 用于获取正在使用的事件驱动的类型(如epoll、kqueue、select等)
然后在 ae.c
文件中可以发现以下代码:
#ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
上面的代码意思是:如果是 Soliris 系统就引入 ae_evport.c
文件,如果是 Linux 系统就引入 ae_epoll.c
文件,如果是 FreeBSD 系统就引入 ae_kqueue.c
文件,而其他系统就引入 ae_select.c
文件。
Linux 系统下的实现
下面主要分析 Linux 平台的实现,也就是 ae_epoll.c
文件的实现,我们主要分析几个比较重要的方法: aeApiCreate()
、 aeApiAddEvent()
和 aeApiPoll()
。
aeApiCreate() 函数
aeApiCreate()
函数用于创建平台对应的事件驱动上下文,其代码如下:
static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); ... state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); ... state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ ... eventLoop->apidata = state; return 0; }
Redis 定义了一个 aeApiState
结构体用于保存事件驱动上下文,在 ae_epoll.c
文件下的定义如下:
typedef struct aeApiState { int epfd; struct epoll_event *events; } aeApiState;
aeApiState
结构体中的 epfd
字段用于保存使用 epoll_create()
系统调用创建的 epoll 文件句柄,而 events
字段是个 epoll_event
结构的数组,用于保存所有就绪(可读或可写)的客户端连接。
aeApiCreate()
函数的实现比较简单,主要完成以下几件事情:
-
首先申请一个
aeApiState
结构。 -
然后初始化其
events
字段(申请events
数组需要的内存)。 -
接着调用
epoll_create()
函数创建一个 epoll 句柄并保存到epfd
字段中。 -
最后把
aeApiState
结构保存到事件驱动对象的apidata
字段中。
aeApiAddEvent() 函数
aeApiAddEvent()
函数用于把客户端连接添加到事件驱动上下文中进行监听,其代码如下:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; // 监听读事件 if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; // 监听写事件 ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,ⅇ) == -1) return -1; // 把客户端连接添加到epoll中进行监听 return 0; }
aeApiAddEvent()
函数的参数作用如下:
-
eventLoop
:事件驱动对象。 -
fd
:添加要进行监听的客户端连接socket句柄。 -
mask
:要监听的事件(读或写)。
aeApiAddEvent()
函数主要通过调用 epoll_ctl()
系统调用把客户端连接添加到事件驱动上下文(epoll句柄)中进行监听,当然添加前要指定监听的事件,在epoll 中 EPOLLIN
表示读事件,而 EPOLLOUT
表示写事件。
aeApiPoll()函数
aeApiPoll()
函数用于等待监听的客户端连接状态发生变化,也就是等待客户端连接变为可读或者可写状态,其代码如下:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; }
tvp
参数表示要等待多长时间,如果在等待的时间内没有客户端连接的状态发生变化,那么就会超时。 aeApiPoll()
函数主要通过调用 epoll_wait()
系统调用来等待被监听的客户端连接的状态发生变化, epoll_wait()
系统调用会将就绪的客户端连接保存到 events
参数中,并且通过返回值告知其数量。最后, aeApiPoll()
函数会把就绪的客户端连接(socket句柄和发生的事件)记录到事件驱动对象的 fired
字段中。
事件驱动库封装
前面介绍了在 Linux 系统下的事件驱动实现,但为了跨平台的需要,Redis 还需要把这些函数进行一层封装,封装成统一的对外接口,也就是前面介绍过的事件驱动库接口。
这里,我们主要介绍以下几个接口的实现: aeCreateEventLoop()
、 aeCreateFileEvent()
和 aeMain()
。
aeCreateEventLoop() 函数
aeCreateEventLoop()
函数的主要作用是创建一个类型为 aeEventLoop
的事件驱动对象, aeEventLoop
的定义如下:
typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ ... aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ ... int stop; void *apidata; /* This is used for polling API specific data */ ... } aeEventLoop;
我们去掉了定时器相关的字段,下面介绍一下 aeEventLoop
结构各个字段的作用:
-
maxfd
:所有被监听的客户端连接中最大的句柄号,select
这种多路复用I/O需要用到。 -
setmax
:事件驱动对象最大能够监听的客户端连接数。 -
events
:所有注册到事件驱动对象中的客户端连接事件。 -
fired
:所有就绪的客户端连接事件。 -
stop
:是否要停止监听。 -
apidata
:用于保存前面介绍过的事件驱动上下文。
aeCreateEventLoop()
函数的实现如下:
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; ... eventLoop->stop = 0; eventLoop->maxfd = -1; ... if (aeApiCreate(eventLoop) == -1) goto err; for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; ... }
aeCreateEventLoop()
函数的实现比较简单,主要是创建和初始化 aeEventLoop
对象。值得注意的是, aeCreateEventLoop()
函数调用了 aeApiCreate()
函数来创建事件驱动上下文。所以, aeCreateEventLoop()
函数主要是对 aeApiCreate()
函数的封装。
aeCreateFileEvent() 函数
aeCreateFileEvent()
函数用于添加被监听的客户端连接,其实现如下:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { ... aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) // 把客户端连接添加到事件驱动上下文中 return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; // 设置读事件的回调函数 if (mask & AE_WRITABLE) fe->wfileProc = proc; // 设置写事件的回调函数 fe->clientData = clientData; // 设置回调函数的参数 if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
aeCreateFileEvent()
函数的参数前面已经介绍过,这里就不再重复了。
aeCreateFileEvent()
函数首先会调用 aeApiAddEvent()
函数把客户端连接添加到事件驱动上下文(也就是epoll句柄)中进行监听,然后设置事件的回调函数和回调函数的参数。所以, aeCreateFileEvent()
函数主要是对 aeApiAddEvent()
函数的封装。
aeMain() 函数
aeMain()
函数用于等待客户端连接的状态发生变化,并且调用客户端连接的事件回调函数进行处理。代码如下:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } }
从上面的代码可以看出, aeMain()
函数里面是一个无限循环,循环的停止条件是事件驱动对象的 stop
字段被设置为1。在循环里,每次都会调用 aeProcessEvents()
函数来监听客户端连接的状态变化,并且调用事件相关的回调函数对客户端连接进行处理。 aeProcessEvents()
函数的实现如下:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; ... if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { ... numevents = aeApiPoll(eventLoop, tvp); // 等待客户端连接就绪 ... for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ int invert = fe->mask & AE_BARRIER; if (!invert && fe->mask & mask & AE_READABLE) { // 处理读事件 fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { // 处理写事件 if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } ... return processed; /* return the number of processed file/time events */ }
aeProcessEvents()
函数首先会调用 aeApiPoll()
函数等待客户端连接的状态变化。然后遍历就绪的客户端连接,判断其发生的事件类型(读事件还是写事件)。如果发生的是读事件,那么就调用读事件回调函数对客户端连接进行处理。如果发生的是写事件,那么就调用写事件回调函数对客户端连接进行处理。
总结
这篇文章主要介绍了 Redis 的事件驱动库的使用与原理实现,Redis的事件驱动库主要使用了 多路复用I/O 来对客户端连接进行监听,如果客户端连接从不可用变为就绪,那么事件驱动库就会调用事件相关的回调函数对连接进行处理。
另外本文未对 Redis 事件驱动库的定时器进行分析,有兴趣的同学可以自行阅读代码分析。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 事件驱动架构引领产业技术升级: 事件驱动联盟(中国)成立
- 聊聊事件驱动模型
- 领域驱动设计 (DDD) 实践之路(二):事件驱动与 CQRS
- 领域驱动设计 (DDD) 实践之路(二):事件驱动与 CQRS
- Redis 源码学习之事件驱动
- 软件架构模式之事件驱动架构
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
利用Python进行数据分析
Wes McKinney / 唐学韬 / 机械工业出版社 / 2013-11-18 / 89.00
【名人推荐】 “科学计算和数据分析社区已经等待这本书很多年了:大量具体的实践建议,以及大量综合应用方法。本书在未来几年里肯定会成为Python领域中技术计算的权威指南。” ——Fernando Pérez 加州大学伯克利分校 研究科学家, IPython的创始人之一 【内容简介】 还在苦苦寻觅用Python控制、处理、整理、分析结构化数据的完整课程?本书含有大量的实践案例,......一起来看看 《利用Python进行数据分析》 这本书的介绍吧!