内容简介:Swoole 源码分析——Reactor 模块之 ReactorBase
前言
作为一个网络框架,最为核心的就是消息的接受与发送。高效的 reactor 模式一直是众多网络框架的首要选择,本节主要讲解 swoole 中的 reactor 模块。
Reactor 的数据结构
Reactor的数据结构比较复杂,首先object是具体Reactor对象的首地址,ptr是拥有Reactor对象的类的指针,event_num存放现有监控的fd个数,max_event_num存放允许持有的最大事件数目,flag为标记位,id用于存放对应reactor的id,running用于标记该reactor是否正在运行,一般是创建时会被置为 1,start标记着reactor是否已经被启动,一般是进行wait监控时被置为 1,once标志着reactor是否是仅需要一次性监控,check_timer标志着是否要检查定时任务singal_no:每次reactor由于fd的就绪返回时,reactor都会检查这个singal_no,如果这个值不为空,那么就会调用相应的信号回调函数disable_accept标志着是否接受新的连接,这个只有主reactor中才会设置为 0,其他reactor线程不需要接受新的连接,只需要接受数据即可check_signalfd标志着是否需要检查signalfdthread用于标记当前是使用reactor多线程模式还是多进程模式,一般都会使用多线程模式timeout_msec用于记录每次reactor->wait的超时max_socket记录着reactor中最大的连接数,与max_connection的值一致;socket_list是reactor多线程模式的监听的socket,与connection_list保持一致;socket_array是reactor多进程模式中的监听的fdhandle是默认就绪的回调函数,write_handle是写就绪的回调函数,error_handle包含错误就绪的回调函数timewheel、heartbeat_interval、last_heartbeat_time是心跳检测,专门剔除空闲连接last_malloc_trim_time记录了上次返还给系统的时间,swoole会定期的通过malloc_trim函数返回空闲的内存空间
struct _swReactor
{
void *object;
void *ptr; //reserve
/**
* last signal number
*/
int singal_no;
uint32_t event_num;
uint32_t max_event_num;
uint32_t check_timer :1;
uint32_t running :1;
uint32_t start :1;
uint32_t once :1;
/**
* disable accept new connection
*/
uint32_t disable_accept :1;
uint32_t check_signalfd :1;
/**
* multi-thread reactor, cannot realloc sockets.
*/
uint32_t thread :1;
/**
* reactor->wait timeout (millisecond) or -1
*/
int32_t timeout_msec;
uint16_t id; //Reactor ID
uint16_t flag; //flag
uint32_t max_socket;
#ifdef SW_USE_MALLOC_TRIM
time_t last_malloc_trim_time;
#endif
#ifdef SW_USE_TIMEWHEEL
swTimeWheel *timewheel;
uint16_t heartbeat_interval;
time_t last_heartbeat_time;
#endif
/**
* for thread
*/
swConnection *socket_list;
/**
* for process
*/
swArray *socket_array;
swReactor_handle handle[SW_MAX_FDTYPE]; //默认事件
swReactor_handle write_handle[SW_MAX_FDTYPE]; //扩展事件1(一般为写事件)
swReactor_handle error_handle[SW_MAX_FDTYPE]; //扩展事件2(一般为错误事件,如socket关闭)
int (*add)(swReactor *, int fd, int fdtype);
int (*set)(swReactor *, int fd, int fdtype);
int (*del)(swReactor *, int fd);
int (*wait)(swReactor *, struct timeval *);
void (*free)(swReactor *);
int (*setHandle)(swReactor *, int fdtype, swReactor_handle);
swDefer_callback *defer_callback_list;
swDefer_callback idle_task;
swDefer_callback future_task;
void (*onTimeout)(swReactor *);
void (*onFinish)(swReactor *);
void (*onBegin)(swReactor *);
void (*enable_accept)(swReactor *);
int (*can_exit)(swReactor *);
int (*write)(swReactor *, int, void *, int);
int (*close)(swReactor *, int);
int (*defer)(swReactor *, swCallback, void *);
};
reactor 的创建
reactor的创建主要是调用swReactorEpoll_create函数setHandle函数是为监听的fd设置回调函数,包括读就绪、写就绪、错误onFinish是每次调用epoll函数返回后,处理具体逻辑后,最后调用的回调函数onTimeout是每次调用epoll函数超时后的回调函数write函数是利用reactor向socket发送数据的接口defer函数用于添加defer_callback_list成员变量,这个成员变量是回调函数列表,epoll函数超时和onFinish都会循环defer_callback_list里面的回调函数socket_array是监听的fd列表
int swReactor_create(swReactor *reactor, int max_event)
{
int ret;
bzero(reactor, sizeof(swReactor));
#ifdef HAVE_EPOLL
ret = swReactorEpoll_create(reactor, max_event);
reactor->running = 1;
reactor->setHandle = swReactor_setHandle;
reactor->onFinish = swReactor_onFinish;
reactor->onTimeout = swReactor_onTimeout;
reactor->write = swReactor_write;
reactor->defer = swReactor_defer;
reactor->close = swReactor_close;
reactor->socket_array = swArray_new(1024, sizeof(swConnection));
if (!reactor->socket_array)
{
swWarn("create socket array failed.");
return SW_ERR;
}
return ret;
}
reactor 的函数
reactor 设置文件就绪回调函数 swReactor_setHandle
reactor中设置的fd由两部分构成,一种是swFd_type,标识着文件描述符的类型,一种是swEvent_type标识着文件描述符感兴趣的读写事件
enum swFd_type
{
SW_FD_TCP = 0, //tcp socket
SW_FD_LISTEN = 1, //server socket
SW_FD_CLOSE = 2, //socket closed
SW_FD_ERROR = 3, //socket error
SW_FD_UDP = 4, //udp socket
SW_FD_PIPE = 5, //pipe
SW_FD_STREAM = 6, //stream socket
SW_FD_WRITE = 7, //fd can write
SW_FD_TIMER = 8, //timer fd
SW_FD_AIO = 9, //linux native aio
SW_FD_SIGNAL = 11, //signalfd
SW_FD_DNS_RESOLVER = 12, //dns resolver
SW_FD_INOTIFY = 13, //server socket
SW_FD_USER = 15, //SW_FD_USER or SW_FD_USER+n: for custom event
SW_FD_STREAM_CLIENT = 16, //swClient stream
SW_FD_DGRAM_CLIENT = 17, //swClient dgram
};
enum swEvent_type
{
SW_EVENT_DEAULT = 256,
SW_EVENT_READ = 1u << 9,
SW_EVENT_WRITE = 1u << 10,
SW_EVENT_ERROR = 1u << 11,
SW_EVENT_ONCE = 1u << 12,
};
swReactor_fdtype用于从文件描述符中提取swFd_type,也就是文件描述符的类型:
static sw_inline int swReactor_fdtype(int fdtype)
{
return fdtype & (~SW_EVENT_READ) & (~SW_EVENT_WRITE) & (~SW_EVENT_ERROR);
}
swReactor_event_read、swReactor_event_write、swReactor_event_error这三个函数与swFd_type正相反,是从文件描述符中提取读写事件
static sw_inline int swReactor_event_read(int fdtype)
{
return (fdtype < SW_EVENT_DEAULT) || (fdtype & SW_EVENT_READ);
}
static sw_inline int swReactor_event_write(int fdtype)
{
return fdtype & SW_EVENT_WRITE;
}
static sw_inline int swReactor_event_error(int fdtype)
{
return fdtype & SW_EVENT_ERROR;
}
swReactor_setHandle用于为文件描述符_fdtype设定读就绪、写就绪的回调函数
int swReactor_setHandle(swReactor *reactor, int _fdtype, swReactor_handle handle)
{
int fdtype = swReactor_fdtype(_fdtype);
if (fdtype >= SW_MAX_FDTYPE)
{
swWarn("fdtype > SW_MAX_FDTYPE[%d]", SW_MAX_FDTYPE);
return SW_ERR;
}
if (swReactor_event_read(_fdtype))
{
reactor->handle[fdtype] = handle;
}
else if (swReactor_event_write(_fdtype))
{
reactor->write_handle[fdtype] = handle;
}
else if (swReactor_event_error(_fdtype))
{
reactor->error_handle[fdtype] = handle;
}
else
{
swWarn("unknow fdtype");
return SW_ERR;
}
return SW_OK;
}
reactor 添加 defer 函数
defer函数会在每次事件循环结束或超时的时候调用swReactor_defer函数会为defer_callback_list添加新的回调函数
static int swReactor_defer(swReactor *reactor, swCallback callback, void *data)
{
swDefer_callback *cb = sw_malloc(sizeof(swDefer_callback));
if (!cb)
{
swWarn("malloc(%ld) failed.", sizeof(swDefer_callback));
return SW_ERR;
}
cb->callback = callback;
cb->data = data;
LL_APPEND(reactor->defer_callback_list, cb);
return SW_OK;
}
reactor 超时回调函数
epoll 在设置的时间内没有返回的话,也会自动返回,这个时候就会调用超时回调函数:
static void swReactor_onTimeout(swReactor *reactor)
{
swReactor_onTimeout_and_Finish(reactor);
if (reactor->disable_accept)
{
reactor->enable_accept(reactor);
reactor->disable_accept = 0;
}
}
swReactor_onTimeout_and_Finish函数用于在超时、finish等情况下调用- 这个函数首先会检查是否存在定时任务,如果有定时任务就会调用
swTimer_select执行回调函数 - 接下来就要执行存储在
defer_callback_list的多个回调函数, 该list是事先定义好的需要defer执行的函数 idle_task是EventLoop中使用的每一轮事件循环结束时调用的函数。- 如果当前
reactor当前在work进程,那么就要调用swWorker_try_to_exit函数来判断event_num是不是为 0,如果为 0 ,那么就置running为0,停止等待事件就绪 - 如果当前
SwooleG.serv为空,swReactor_empty函数用于判断当前reactor是否还有事件在监听,如果没有,那么就会设置running为 0 - 判断当前时间是否可以调用
malloc_trim释放空闲的内存,如果距离上次释放内存的时间超过了SW_MALLOC_TRIM_INTERVAL,就更新last_malloc_trim_time并调用malloc_trim
static void swReactor_onTimeout_and_Finish(swReactor *reactor)
{
//check timer
if (reactor->check_timer)
{
swTimer_select(&SwooleG.timer);
}
//defer callback
swDefer_callback *cb, *tmp;
swDefer_callback *defer_callback_list = reactor->defer_callback_list;
reactor->defer_callback_list = NULL;
LL_FOREACH(defer_callback_list, cb)
{
cb->callback(cb->data);
}
LL_FOREACH_SAFE(defer_callback_list, cb, tmp)
{
sw_free(cb);
}
//callback at the end
if (reactor->idle_task.callback)
{
reactor->idle_task.callback(reactor->idle_task.data);
}
#ifdef SW_COROUTINE
//coro timeout
if (!swIsMaster())
{
coro_handle_timeout();
}
#endif
//server worker
swWorker *worker = SwooleWG.worker;
if (worker != NULL)
{
if (SwooleWG.wait_exit == 1)
{
swWorker_try_to_exit();
}
}
//not server, the event loop is empty
if (SwooleG.serv == NULL && swReactor_empty(reactor))
{
reactor->running = 0;
}
#ifdef SW_USE_MALLOC_TRIM
if (SwooleG.serv && reactor->last_malloc_trim_time < SwooleG.serv->gs->now - SW_MALLOC_TRIM_INTERVAL)
{
malloc_trim(SW_MALLOC_TRIM_PAD);
reactor->last_malloc_trim_time = SwooleG.serv->gs->now;
}
#endif
}
swReactor_empty用来判断当前的reactor是否还有事件需要监听- 可以从函数中可以看出来,如果定时任务
timer里面还有等待的任务,那么就可以返回 false event_num如果为 0,可以返回 true,结束事件循环- 对于协程来说,还要调用
can_exit来判断是否可以退出事件循环
int swReactor_empty(swReactor *reactor)
{
//timer
if (SwooleG.timer.num > 0)
{
return SW_FALSE;
}
int empty = SW_FALSE;
//thread pool
if (SwooleAIO.init && reactor->event_num == 1 && SwooleAIO.task_num == 0)
{
empty = SW_TRUE;
}
//no event
else if (reactor->event_num == 0)
{
empty = SW_TRUE;
}
//coroutine
if (empty && reactor->can_exit && reactor->can_exit(reactor))
{
empty = SW_TRUE;
}
return empty;
}
reactor 事件循环结束函数
- 每次事件循环结束之后,都会调用
onFinish函数 - 该函数主要函数调用
swReactor_onTimeout_and_Finish,在此之前还会检查在事件循环过程中是否有信号触发
static void swReactor_onFinish(swReactor *reactor)
{
//check signal
if (reactor->singal_no)
{
swSignal_callback(reactor->singal_no);
reactor->singal_no = 0;
}
swReactor_onTimeout_and_Finish(reactor);
}
reactor 事件循环关闭函数
- 当一个
socket关闭的时候,会调用close函数,对应的回调函数就是swReactor_close - 该函数用于释放
swConnection内部申请的内存,并调用close函数关闭连接
int swReactor_close(swReactor *reactor, int fd)
{
swConnection *socket = swReactor_get(reactor, fd);
if (socket->out_buffer)
{
swBuffer_free(socket->out_buffer);
}
if (socket->in_buffer)
{
swBuffer_free(socket->in_buffer);
}
if (socket->websocket_buffer)
{
swString_free(socket->websocket_buffer);
}
bzero(socket, sizeof(swConnection));
socket->removed = 1;
swTraceLog(SW_TRACE_CLOSE, "fd=%d.", fd);
return close(fd);
}
swReactor_get用于从reactor中根据文件描述符获取对应swConnection对象的场景,由于swoole一般都会采用reactor多线程模式,因此基本只会执行return &reactor->socket_list[fd];这一句。socket_list这个列表与connection_list保持一致,是事先申请的大小为max_connection的类型是swConnection的数组socket_list中的数据有一部分是已经建立连接的swConnection的对象,有一部分仅仅是空的swConnection,这个时候swConnection->fd为 0
static sw_inline swConnection* swReactor_get(swReactor *reactor, int fd)
{
if (reactor->thread)
{
return &reactor->socket_list[fd];
}
swConnection *socket = (swConnection*) swArray_alloc(reactor->socket_array, fd);
if (socket == NULL)
{
return NULL;
}
if (!socket->active)
{
socket->fd = fd;
}
return socket;
}
reactor 的数据写入
- 如果想对一个
socket写入数据,并不能简单的直接调用send函数,因为这个函数可能被信号打断(EINTR)、可能暂时不可用(EAGAIN)、可能只写入了部分数据,也有可能写入成功。因此,reactor定义了一个函数专门处理写数据这一逻辑 - 首先要利用
swReactor_get取出对应的swConnection对象 - 如果取出的对象
fd是 0,说明这个fd文件描述符事先并没有在reactor里面进行监听 - 如果这个
socket的out_buffer为空,那么就先尝试利用swConnection_send函数调用send函数,观察是否可以直接把所有数据发送成功- 如果返回
EINTR,那么说明被信号打断了,重新发送即可 - 如果返回
EAGAIN,那么说明此时socket暂时不可用,此时需要将fd文件描述符的写就绪状态添加到reactor中,然后将数据拷贝到out_buffer中去 - 如果返回写入的数据量小于
n,说明只写入了部分,此时需要把没有写入的部分拷贝到out_buffer中去
- 如果返回
- 如果
out_buffer不为空,那么说明此时socket不可写,那么就要将数据拷贝到out_buffer中去,等着reactor监控到写就绪之后,把out_buffer发送出去。 - 如果此时
out_buffer存储空间不足,那么就要swYield让进程休眠一段时间,等待fd的写就绪状态
int swReactor_write(swReactor *reactor, int fd, void *buf, int n)
{
int ret;
swConnection *socket = swReactor_get(reactor, fd);
swBuffer *buffer = socket->out_buffer;
if (socket->fd == 0)
{
socket->fd = fd;
}
if (socket->buffer_size == 0)
{
socket->buffer_size = SwooleG.socket_buffer_size;
}
if (socket->nonblock == 0)
{
swoole_fcntl_set_option(fd, 1, -1);
socket->nonblock = 1;
}
if (n > socket->buffer_size)
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "data is too large, cannot exceed buffer size.");
return SW_ERR;
}
if (swBuffer_empty(buffer))
{
if (socket->ssl_send)
{
goto do_buffer;
}
do_send:
ret = swConnection_send(socket, buf, n, 0);
if (ret > 0)
{
if (n == ret)
{
return ret;
}
else
{
buf += ret;
n -= ret;
goto do_buffer;
}
}
#ifdef HAVE_KQUEUE
else if (errno == EAGAIN || errno == ENOBUFS)
#else
else if (errno == EAGAIN)
#endif
{
do_buffer:
if (!socket->out_buffer)
{
buffer = swBuffer_new(sizeof(swEventData));
if (!buffer)
{
swWarn("create worker buffer failed.");
return SW_ERR;
}
socket->out_buffer = buffer;
}
socket->events |= SW_EVENT_WRITE;
if (socket->events & SW_EVENT_READ)
{
if (reactor->set(reactor, fd, socket->fdtype | socket->events) < 0)
{
swSysError("reactor->set(%d, SW_EVENT_WRITE) failed.", fd);
}
}
else
{
if (reactor->add(reactor, fd, socket->fdtype | SW_EVENT_WRITE) < 0)
{
swSysError("reactor->add(%d, SW_EVENT_WRITE) failed.", fd);
}
}
goto append_buffer;
}
else if (errno == EINTR)
{
goto do_send;
}
else
{
SwooleG.error = errno;
return SW_ERR;
}
}
else
{
append_buffer: if (buffer->length > socket->buffer_size)
{
if (socket->dontwait)
{
SwooleG.error = SW_ERROR_OUTPUT_BUFFER_OVERFLOW;
return SW_ERR;
}
else
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_OUTPUT_BUFFER_OVERFLOW, "socket#%d output buffer overflow.", fd);
swYield();
swSocket_wait(fd, SW_SOCKET_OVERFLOW_WAIT, SW_EVENT_WRITE);
}
}
if (swBuffer_append(buffer, buf, n) < 0)
{
return SW_ERR;
}
}
return SW_OK;
}
以上所述就是小编给大家介绍的《Swoole 源码分析——Reactor 模块之 ReactorBase》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Zepto源码学习Event模块
- NodeJS Cluster模块源码学习
- NodeJS Events模块源码学习
- 试读angular源码第四章:angular模块及JIT编译模块
- 对公司内部某个模块某个源码审计
- 比特币源码分析:txdb 模块(三)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Algorithms + Data Structures = Programs
Niklaus Wirth / Prentice Hall / 1975-11-11 / GBP 84.95
It might seem completely dated with all its examples written in the now outmoded Pascal programming language (well, unless you are one of those Delphi zealot trying to resist to the Java/.NET dominanc......一起来看看 《Algorithms + Data Structures = Programs》 这本书的介绍吧!