内容简介:Swoole 源码分析——Reactor 模块之 ReactorBase
前言
作为一个网络框架,最为核心的就是消息的接受与发送。高效的 reactor
模式一直是众多网络框架的首要选择,本节主要讲解 swoole
中的 reactor
模块。
Reactor
的数据结构
Reactor
的数据结构比较复杂,首先object
是具体Reactor
对象的首地址,ptr
是拥有Reactor
对象的类的指针,event_num
存放现有监控的fd
个数,max_event_num
存放允许持有的最大事件数目,flag
为标记位,id
用于存放对应reactor
的id
,running
用于标记该reactor
是否正在运行,一般是创建时会被置为 1,start
标记着reactor
是否已经被启动,一般是进行wait
监控时被置为 1,once
标志着reactor
是否是仅需要一次性监控,check_timer
标志着是否要检查定时任务singal_no
:每次reactor
由于fd
的就绪返回时,reactor
都会检查这个singal_no
,如果这个值不为空,那么就会调用相应的信号回调函数disable_accept
标志着是否接受新的连接,这个只有主reactor
中才会设置为 0,其他reactor
线程不需要接受新的连接,只需要接受数据即可check_signalfd
标志着是否需要检查signalfd
thread
用于标记当前是使用reactor
多线程模式还是多进程模式,一般都会使用多线程模式timeout_msec
用于记录每次reactor->wait
的超时max_socket
记录着reactor
中最大的连接数,与max_connection
的值一致;socket_list
是reactor
多线程模式的监听的socket
,与connection_list
保持一致;socket_array
是reactor
多进程模式中的监听的fd
handle
是默认就绪的回调函数,write_handle
是写就绪的回调函数,error_handle
包含错误就绪的回调函数timewheel
、heartbeat_interval
、last_heartbeat_time
是心跳检测,专门剔除空闲连接last_malloc_trim_time
记录了上次返还给系统的时间,swoole
会定期的通过malloc_trim
函数返回空闲的内存空间
struct _swReactor
{
void *object;
void *ptr; //reserve
/**
* last signal number
*/
int singal_no;
uint32_t event_num;
uint32_t max_event_num;
uint32_t check_timer :1;
uint32_t running :1;
uint32_t start :1;
uint32_t once :1;
/**
* disable accept new connection
*/
uint32_t disable_accept :1;
uint32_t check_signalfd :1;
/**
* multi-thread reactor, cannot realloc sockets.
*/
uint32_t thread :1;
/**
* reactor->wait timeout (millisecond) or -1
*/
int32_t timeout_msec;
uint16_t id; //Reactor ID
uint16_t flag; //flag
uint32_t max_socket;
#ifdef SW_USE_MALLOC_TRIM
time_t last_malloc_trim_time;
#endif
#ifdef SW_USE_TIMEWHEEL
swTimeWheel *timewheel;
uint16_t heartbeat_interval;
time_t last_heartbeat_time;
#endif
/**
* for thread
*/
swConnection *socket_list;
/**
* for process
*/
swArray *socket_array;
swReactor_handle handle[SW_MAX_FDTYPE]; //默认事件
swReactor_handle write_handle[SW_MAX_FDTYPE]; //扩展事件1(一般为写事件)
swReactor_handle error_handle[SW_MAX_FDTYPE]; //扩展事件2(一般为错误事件,如socket关闭)
int (*add)(swReactor *, int fd, int fdtype);
int (*set)(swReactor *, int fd, int fdtype);
int (*del)(swReactor *, int fd);
int (*wait)(swReactor *, struct timeval *);
void (*free)(swReactor *);
int (*setHandle)(swReactor *, int fdtype, swReactor_handle);
swDefer_callback *defer_callback_list;
swDefer_callback idle_task;
swDefer_callback future_task;
void (*onTimeout)(swReactor *);
void (*onFinish)(swReactor *);
void (*onBegin)(swReactor *);
void (*enable_accept)(swReactor *);
int (*can_exit)(swReactor *);
int (*write)(swReactor *, int, void *, int);
int (*close)(swReactor *, int);
int (*defer)(swReactor *, swCallback, void *);
};
reactor
的创建
reactor
的创建主要是调用swReactorEpoll_create
函数setHandle
函数是为监听的fd
设置回调函数,包括读就绪、写就绪、错误onFinish
是每次调用epoll
函数返回后,处理具体逻辑后,最后调用的回调函数onTimeout
是每次调用epoll
函数超时后的回调函数write
函数是利用reactor
向socket
发送数据的接口defer
函数用于添加defer_callback_list
成员变量,这个成员变量是回调函数列表,epoll
函数超时和onFinish
都会循环defer_callback_list
里面的回调函数socket_array
是监听的fd
列表
int swReactor_create(swReactor *reactor, int max_event)
{
int ret;
bzero(reactor, sizeof(swReactor));
#ifdef HAVE_EPOLL
ret = swReactorEpoll_create(reactor, max_event);
reactor->running = 1;
reactor->setHandle = swReactor_setHandle;
reactor->onFinish = swReactor_onFinish;
reactor->onTimeout = swReactor_onTimeout;
reactor->write = swReactor_write;
reactor->defer = swReactor_defer;
reactor->close = swReactor_close;
reactor->socket_array = swArray_new(1024, sizeof(swConnection));
if (!reactor->socket_array)
{
swWarn("create socket array failed.");
return SW_ERR;
}
return ret;
}
reactor
的函数
reactor
设置文件就绪回调函数 swReactor_setHandle
reactor
中设置的fd
由两部分构成,一种是swFd_type
,标识着文件描述符的类型,一种是swEvent_type
标识着文件描述符感兴趣的读写事件
enum swFd_type
{
SW_FD_TCP = 0, //tcp socket
SW_FD_LISTEN = 1, //server socket
SW_FD_CLOSE = 2, //socket closed
SW_FD_ERROR = 3, //socket error
SW_FD_UDP = 4, //udp socket
SW_FD_PIPE = 5, //pipe
SW_FD_STREAM = 6, //stream socket
SW_FD_WRITE = 7, //fd can write
SW_FD_TIMER = 8, //timer fd
SW_FD_AIO = 9, //linux native aio
SW_FD_SIGNAL = 11, //signalfd
SW_FD_DNS_RESOLVER = 12, //dns resolver
SW_FD_INOTIFY = 13, //server socket
SW_FD_USER = 15, //SW_FD_USER or SW_FD_USER+n: for custom event
SW_FD_STREAM_CLIENT = 16, //swClient stream
SW_FD_DGRAM_CLIENT = 17, //swClient dgram
};
enum swEvent_type
{
SW_EVENT_DEAULT = 256,
SW_EVENT_READ = 1u << 9,
SW_EVENT_WRITE = 1u << 10,
SW_EVENT_ERROR = 1u << 11,
SW_EVENT_ONCE = 1u << 12,
};
swReactor_fdtype
用于从文件描述符中提取swFd_type
,也就是文件描述符的类型:
static sw_inline int swReactor_fdtype(int fdtype)
{
return fdtype & (~SW_EVENT_READ) & (~SW_EVENT_WRITE) & (~SW_EVENT_ERROR);
}
swReactor_event_read
、swReactor_event_write
、swReactor_event_error
这三个函数与swFd_type
正相反,是从文件描述符中提取读写事件
static sw_inline int swReactor_event_read(int fdtype)
{
return (fdtype < SW_EVENT_DEAULT) || (fdtype & SW_EVENT_READ);
}
static sw_inline int swReactor_event_write(int fdtype)
{
return fdtype & SW_EVENT_WRITE;
}
static sw_inline int swReactor_event_error(int fdtype)
{
return fdtype & SW_EVENT_ERROR;
}
swReactor_setHandle
用于为文件描述符_fdtype
设定读就绪、写就绪的回调函数
int swReactor_setHandle(swReactor *reactor, int _fdtype, swReactor_handle handle)
{
int fdtype = swReactor_fdtype(_fdtype);
if (fdtype >= SW_MAX_FDTYPE)
{
swWarn("fdtype > SW_MAX_FDTYPE[%d]", SW_MAX_FDTYPE);
return SW_ERR;
}
if (swReactor_event_read(_fdtype))
{
reactor->handle[fdtype] = handle;
}
else if (swReactor_event_write(_fdtype))
{
reactor->write_handle[fdtype] = handle;
}
else if (swReactor_event_error(_fdtype))
{
reactor->error_handle[fdtype] = handle;
}
else
{
swWarn("unknow fdtype");
return SW_ERR;
}
return SW_OK;
}
reactor
添加 defer
函数
defer
函数会在每次事件循环结束或超时的时候调用swReactor_defer
函数会为defer_callback_list
添加新的回调函数
static int swReactor_defer(swReactor *reactor, swCallback callback, void *data)
{
swDefer_callback *cb = sw_malloc(sizeof(swDefer_callback));
if (!cb)
{
swWarn("malloc(%ld) failed.", sizeof(swDefer_callback));
return SW_ERR;
}
cb->callback = callback;
cb->data = data;
LL_APPEND(reactor->defer_callback_list, cb);
return SW_OK;
}
reactor
超时回调函数
epoll
在设置的时间内没有返回的话,也会自动返回,这个时候就会调用超时回调函数:
static void swReactor_onTimeout(swReactor *reactor)
{
swReactor_onTimeout_and_Finish(reactor);
if (reactor->disable_accept)
{
reactor->enable_accept(reactor);
reactor->disable_accept = 0;
}
}
swReactor_onTimeout_and_Finish
函数用于在超时、finish
等情况下调用- 这个函数首先会检查是否存在定时任务,如果有定时任务就会调用
swTimer_select
执行回调函数 - 接下来就要执行存储在
defer_callback_list
的多个回调函数, 该list
是事先定义好的需要defer
执行的函数 idle_task
是EventLoop
中使用的每一轮事件循环结束时调用的函数。- 如果当前
reactor
当前在work
进程,那么就要调用swWorker_try_to_exit
函数来判断event_num
是不是为 0,如果为 0 ,那么就置running
为0,停止等待事件就绪 - 如果当前
SwooleG.serv
为空,swReactor_empty
函数用于判断当前reactor
是否还有事件在监听,如果没有,那么就会设置running
为 0 - 判断当前时间是否可以调用
malloc_trim
释放空闲的内存,如果距离上次释放内存的时间超过了SW_MALLOC_TRIM_INTERVAL
,就更新last_malloc_trim_time
并调用malloc_trim
static void swReactor_onTimeout_and_Finish(swReactor *reactor)
{
//check timer
if (reactor->check_timer)
{
swTimer_select(&SwooleG.timer);
}
//defer callback
swDefer_callback *cb, *tmp;
swDefer_callback *defer_callback_list = reactor->defer_callback_list;
reactor->defer_callback_list = NULL;
LL_FOREACH(defer_callback_list, cb)
{
cb->callback(cb->data);
}
LL_FOREACH_SAFE(defer_callback_list, cb, tmp)
{
sw_free(cb);
}
//callback at the end
if (reactor->idle_task.callback)
{
reactor->idle_task.callback(reactor->idle_task.data);
}
#ifdef SW_COROUTINE
//coro timeout
if (!swIsMaster())
{
coro_handle_timeout();
}
#endif
//server worker
swWorker *worker = SwooleWG.worker;
if (worker != NULL)
{
if (SwooleWG.wait_exit == 1)
{
swWorker_try_to_exit();
}
}
//not server, the event loop is empty
if (SwooleG.serv == NULL && swReactor_empty(reactor))
{
reactor->running = 0;
}
#ifdef SW_USE_MALLOC_TRIM
if (SwooleG.serv && reactor->last_malloc_trim_time < SwooleG.serv->gs->now - SW_MALLOC_TRIM_INTERVAL)
{
malloc_trim(SW_MALLOC_TRIM_PAD);
reactor->last_malloc_trim_time = SwooleG.serv->gs->now;
}
#endif
}
swReactor_empty
用来判断当前的reactor
是否还有事件需要监听- 可以从函数中可以看出来,如果定时任务
timer
里面还有等待的任务,那么就可以返回 false event_num
如果为 0,可以返回 true,结束事件循环- 对于协程来说,还要调用
can_exit
来判断是否可以退出事件循环
int swReactor_empty(swReactor *reactor)
{
//timer
if (SwooleG.timer.num > 0)
{
return SW_FALSE;
}
int empty = SW_FALSE;
//thread pool
if (SwooleAIO.init && reactor->event_num == 1 && SwooleAIO.task_num == 0)
{
empty = SW_TRUE;
}
//no event
else if (reactor->event_num == 0)
{
empty = SW_TRUE;
}
//coroutine
if (empty && reactor->can_exit && reactor->can_exit(reactor))
{
empty = SW_TRUE;
}
return empty;
}
reactor
事件循环结束函数
- 每次事件循环结束之后,都会调用
onFinish
函数 - 该函数主要函数调用
swReactor_onTimeout_and_Finish
,在此之前还会检查在事件循环过程中是否有信号触发
static void swReactor_onFinish(swReactor *reactor)
{
//check signal
if (reactor->singal_no)
{
swSignal_callback(reactor->singal_no);
reactor->singal_no = 0;
}
swReactor_onTimeout_and_Finish(reactor);
}
reactor
事件循环关闭函数
- 当一个
socket
关闭的时候,会调用close
函数,对应的回调函数就是swReactor_close
- 该函数用于释放
swConnection
内部申请的内存,并调用close
函数关闭连接
int swReactor_close(swReactor *reactor, int fd)
{
swConnection *socket = swReactor_get(reactor, fd);
if (socket->out_buffer)
{
swBuffer_free(socket->out_buffer);
}
if (socket->in_buffer)
{
swBuffer_free(socket->in_buffer);
}
if (socket->websocket_buffer)
{
swString_free(socket->websocket_buffer);
}
bzero(socket, sizeof(swConnection));
socket->removed = 1;
swTraceLog(SW_TRACE_CLOSE, "fd=%d.", fd);
return close(fd);
}
swReactor_get
用于从reactor
中根据文件描述符获取对应swConnection
对象的场景,由于swoole
一般都会采用reactor
多线程模式,因此基本只会执行return &reactor->socket_list[fd];
这一句。socket_list
这个列表与connection_list
保持一致,是事先申请的大小为max_connection
的类型是swConnection
的数组socket_list
中的数据有一部分是已经建立连接的swConnection
的对象,有一部分仅仅是空的swConnection
,这个时候swConnection->fd
为 0
static sw_inline swConnection* swReactor_get(swReactor *reactor, int fd)
{
if (reactor->thread)
{
return &reactor->socket_list[fd];
}
swConnection *socket = (swConnection*) swArray_alloc(reactor->socket_array, fd);
if (socket == NULL)
{
return NULL;
}
if (!socket->active)
{
socket->fd = fd;
}
return socket;
}
reactor
的数据写入
- 如果想对一个
socket
写入数据,并不能简单的直接调用send
函数,因为这个函数可能被信号打断(EINTR)、可能暂时不可用(EAGAIN)、可能只写入了部分数据,也有可能写入成功。因此,reactor
定义了一个函数专门处理写数据这一逻辑 - 首先要利用
swReactor_get
取出对应的swConnection
对象 - 如果取出的对象
fd
是 0,说明这个fd
文件描述符事先并没有在reactor
里面进行监听 - 如果这个
socket
的out_buffer
为空,那么就先尝试利用swConnection_send
函数调用send
函数,观察是否可以直接把所有数据发送成功- 如果返回
EINTR
,那么说明被信号打断了,重新发送即可 - 如果返回
EAGAIN
,那么说明此时socket
暂时不可用,此时需要将fd
文件描述符的写就绪状态添加到reactor
中,然后将数据拷贝到out_buffer
中去 - 如果返回写入的数据量小于
n
,说明只写入了部分,此时需要把没有写入的部分拷贝到out_buffer
中去
- 如果返回
- 如果
out_buffer
不为空,那么说明此时socket
不可写,那么就要将数据拷贝到out_buffer
中去,等着reactor
监控到写就绪之后,把out_buffer
发送出去。 - 如果此时
out_buffer
存储空间不足,那么就要swYield
让进程休眠一段时间,等待fd
的写就绪状态
int swReactor_write(swReactor *reactor, int fd, void *buf, int n)
{
int ret;
swConnection *socket = swReactor_get(reactor, fd);
swBuffer *buffer = socket->out_buffer;
if (socket->fd == 0)
{
socket->fd = fd;
}
if (socket->buffer_size == 0)
{
socket->buffer_size = SwooleG.socket_buffer_size;
}
if (socket->nonblock == 0)
{
swoole_fcntl_set_option(fd, 1, -1);
socket->nonblock = 1;
}
if (n > socket->buffer_size)
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "data is too large, cannot exceed buffer size.");
return SW_ERR;
}
if (swBuffer_empty(buffer))
{
if (socket->ssl_send)
{
goto do_buffer;
}
do_send:
ret = swConnection_send(socket, buf, n, 0);
if (ret > 0)
{
if (n == ret)
{
return ret;
}
else
{
buf += ret;
n -= ret;
goto do_buffer;
}
}
#ifdef HAVE_KQUEUE
else if (errno == EAGAIN || errno == ENOBUFS)
#else
else if (errno == EAGAIN)
#endif
{
do_buffer:
if (!socket->out_buffer)
{
buffer = swBuffer_new(sizeof(swEventData));
if (!buffer)
{
swWarn("create worker buffer failed.");
return SW_ERR;
}
socket->out_buffer = buffer;
}
socket->events |= SW_EVENT_WRITE;
if (socket->events & SW_EVENT_READ)
{
if (reactor->set(reactor, fd, socket->fdtype | socket->events) < 0)
{
swSysError("reactor->set(%d, SW_EVENT_WRITE) failed.", fd);
}
}
else
{
if (reactor->add(reactor, fd, socket->fdtype | SW_EVENT_WRITE) < 0)
{
swSysError("reactor->add(%d, SW_EVENT_WRITE) failed.", fd);
}
}
goto append_buffer;
}
else if (errno == EINTR)
{
goto do_send;
}
else
{
SwooleG.error = errno;
return SW_ERR;
}
}
else
{
append_buffer: if (buffer->length > socket->buffer_size)
{
if (socket->dontwait)
{
SwooleG.error = SW_ERROR_OUTPUT_BUFFER_OVERFLOW;
return SW_ERR;
}
else
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_OUTPUT_BUFFER_OVERFLOW, "socket#%d output buffer overflow.", fd);
swYield();
swSocket_wait(fd, SW_SOCKET_OVERFLOW_WAIT, SW_EVENT_WRITE);
}
}
if (swBuffer_append(buffer, buf, n) < 0)
{
return SW_ERR;
}
}
return SW_OK;
}
以上所述就是小编给大家介绍的《Swoole 源码分析——Reactor 模块之 ReactorBase》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Zepto源码学习Event模块
- NodeJS Cluster模块源码学习
- NodeJS Events模块源码学习
- 试读angular源码第四章:angular模块及JIT编译模块
- 对公司内部某个模块某个源码审计
- 比特币源码分析:txdb 模块(三)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Out of their Minds
Dennis Shasha、Cathy Lazere / Springer / 1998-07-02 / USD 16.00
This best-selling book is now available in an inexpensive softcover format. Imagine living during the Renaissance and being able to interview that eras greatest scientists about their inspirations, di......一起来看看 《Out of their Minds》 这本书的介绍吧!