内容简介:Swoole 源码分析——Server 模块之初始化
前言
本节主要介绍 server 模块进行初始化的代码,关于初始化过程中,各个属性的意义,可以参考官方文档:
关于初始化过程中,用于监听的 socket 绑定问题,可以参考:
构造 server 对象
构造 server 对象最重要的是两件事:swServer_init 初始化 server、为 server 添加端口:
PHP_METHOD(swoole_server, __construct)
{
zend_size_t host_len = 0;
char *serv_host;
long sock_type = SW_SOCK_TCP;
long serv_port = 0;
long serv_mode = SW_MODE_PROCESS;
swServer *serv = sw_malloc(sizeof (swServer));
swServer_init(serv);
serv->factory_mode = serv_mode;
if (serv_port == 0 && strcasecmp(serv_host, "SYSTEMD") == 0)
{
if (swserver_add_systemd_socket(serv) <= 0)
{
swoole_php_fatal_error(E_ERROR, "failed to add systemd socket.");
return;
}
}
else
{
swListenPort *port = swServer_add_port(serv, sock_type, serv_host, serv_port);
}
}
swServer_init 函数
swServer_init函数主要为serv对象赋值初值,如果想要更改serv对象各个属性,可以调用set函数serv->gs是全局共享内存
void swServer_init(swServer *serv)
{
swoole_init();
bzero(serv, sizeof(swServer));
serv->factory_mode = SW_MODE_BASE;
serv->reactor_num = SW_REACTOR_NUM > SW_REACTOR_MAX_THREAD ? SW_REACTOR_MAX_THREAD : SW_REACTOR_NUM;
serv->dispatch_mode = SW_DISPATCH_FDMOD;
serv->worker_num = SW_CPU_NUM;
serv->max_connection = SwooleG.max_sockets < SW_SESSION_LIST_SIZE ? SwooleG.max_sockets : SW_SESSION_LIST_SIZE;
serv->max_request = 0;
serv->max_wait_time = SW_WORKER_MAX_WAIT_TIME;
//http server
serv->http_parse_post = 1;
serv->upload_tmp_dir = sw_strdup("/tmp");
//heartbeat check
serv->heartbeat_idle_time = SW_HEARTBEAT_IDLE;
serv->heartbeat_check_interval = SW_HEARTBEAT_CHECK;
serv->buffer_input_size = SW_BUFFER_INPUT_SIZE;
serv->buffer_output_size = SW_BUFFER_OUTPUT_SIZE;
serv->task_ipc_mode = SW_TASK_IPC_UNIXSOCK;
/**
* alloc shared memory
*/
serv->stats = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swServerStats));
if (serv->stats == NULL)
{
swError("[Master] Fatal Error: failed to allocate memory for swServer->stats.");
}
serv->gs = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swServerGS));
if (serv->gs == NULL)
{
swError("[Master] Fatal Error: failed to allocate memory for swServer->gs.");
}
SwooleG.serv = serv;
}
swoole_init 函数
swoole_init函数用于初始化全局变量SwooleG的各个属性SwooleGS是全局的共享内存SwooleTG是线程特有数据,每个线程都有自己独特的数据
extern swServerG SwooleG; //Local Global Variable
extern SwooleGS_t *SwooleGS; //Share Memory Global Variable
extern __thread swThreadG SwooleTG; //Thread Global Variable
typedef struct
{
swLock lock;
swLock lock_2;
} SwooleGS_t;
void swoole_init(void)
{
struct rlimit rlmt;
if (SwooleG.running)
{
return;
}
bzero(&SwooleG, sizeof(SwooleG));
bzero(&SwooleWG, sizeof(SwooleWG));
bzero(sw_error, SW_ERROR_MSG_SIZE);
SwooleG.running = 1;
SwooleG.enable_coroutine = 1;
sw_errno = 0;
SwooleG.log_fd = STDOUT_FILENO;
SwooleG.cpu_num = sysconf(_SC_NPROCESSORS_ONLN);
SwooleG.pagesize = getpagesize();
SwooleG.pid = getpid();
SwooleG.socket_buffer_size = SW_SOCKET_BUFFER_SIZE;
#ifdef SW_DEBUG
SwooleG.log_level = 0;
#else
SwooleG.log_level = SW_LOG_INFO;
#endif
//get system uname
uname(&SwooleG.uname);
//random seed
srandom(time(NULL));
//init global shared memory
SwooleG.memory_pool = swMemoryGlobal_new(SW_GLOBAL_MEMORY_PAGESIZE, 1);
if (SwooleG.memory_pool == NULL)
{
printf("[Master] Fatal Error: global memory allocation failure.");
exit(1);
}
SwooleGS = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(SwooleGS_t));
if (SwooleGS == NULL)
{
printf("[Master] Fatal Error: failed to allocate memory for SwooleGS.");
exit(2);
}
//init global lock
swMutex_create(&SwooleGS->lock, 1);
swMutex_create(&SwooleGS->lock_2, 1);
swMutex_create(&SwooleG.lock, 0);
if (getrlimit(RLIMIT_NOFILE, &rlmt) < 0)
{
swWarn("getrlimit() failed. Error: %s[%d]", strerror(errno), errno);
SwooleG.max_sockets = 1024;
}
else
{
SwooleG.max_sockets = (uint32_t) rlmt.rlim_cur;
}
SwooleTG.buffer_stack = swString_new(8192);
if (SwooleTG.buffer_stack == NULL)
{
exit(3);
}
if (!SwooleG.task_tmpdir)
{
SwooleG.task_tmpdir = sw_strndup(SW_TASK_TMP_FILE, sizeof(SW_TASK_TMP_FILE));
SwooleG.task_tmpdir_len = sizeof(SW_TASK_TMP_FILE);
}
char *tmp_dir = swoole_dirname(SwooleG.task_tmpdir);
//create tmp dir
if (access(tmp_dir, R_OK) < 0 && swoole_mkdir_recursive(tmp_dir) < 0)
{
swWarn("create task tmp dir(%s) failed.", tmp_dir);
}
if (tmp_dir)
{
sw_free(tmp_dir);
}
//init signalfd
#ifdef HAVE_SIGNALFD
swSignalfd_init();
SwooleG.use_signalfd = 1;
SwooleG.enable_signalfd = 1;
#endif
//timerfd
#ifdef HAVE_TIMERFD
SwooleG.use_timerfd = 1;
#endif
SwooleG.use_timer_pipe = 1;
}
swServer_add_port 函数
swServer_add_port函数为服务端添加监听的端口- 首先需要检测
listen_port_num已监听的端口不能大于SW_MAX_LISTEN_PORT(默认为 60000) - 如果
socket的类型不是unix sock,那么端口号必须大于等于 0,小于 65535 host主域名长度也不能大于SW_HOST_MAXSIZE(104)- 然后从共享内存池中申请一个
swListenPort类型的对象,然后调用swPort_init对端口对象进行初始化 - 利用函数
swSocket_create创建一个socket对象,并返回其文件描述符 - 调用
swSocket_bind函数将socket绑定到对应的主域与端口上来 - 如果协议是数据报(
UDP),而不是数据流时,需要设置socket的发送缓存与接收缓存为socket_buffer_size - 设置
socket为非阻塞、O_CLOEXEC(exec之后文件描述符自动关闭) - 根据协议类型设置
have_udp_sock、have_tcp_sock、udp_socket_ipv4/udp_socket_ipv6等等属性 - 递增
listen_port_num,向单链表listen_list中添加swListenPort对象
enum swSocket_type
{
SW_SOCK_TCP = 1,
SW_SOCK_UDP = 2,
SW_SOCK_TCP6 = 3,
SW_SOCK_UDP6 = 4,
SW_SOCK_UNIX_DGRAM = 5, //unix sock dgram
SW_SOCK_UNIX_STREAM = 6, //unix sock stream
};
swListenPort* swServer_add_port(swServer *serv, int type, char *host, int port)
{
if (serv->listen_port_num >= SW_MAX_LISTEN_PORT)
{
swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_TOO_MANY_LISTEN_PORT, "allows up to %d ports to listen", SW_MAX_LISTEN_PORT);
return NULL;
}
if (!(type == SW_SOCK_UNIX_DGRAM || type == SW_SOCK_UNIX_STREAM) && (port < 0 || port > 65535))
{
swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_INVALID_LISTEN_PORT, "invalid port [%d]", port);
return NULL;
}
if (strlen(host) + 1 > SW_HOST_MAXSIZE)
{
swoole_error_log(SW_LOG_ERROR, SW_ERROR_NAME_TOO_LONG, "address '%s' exceeds %d characters limit", host, SW_HOST_MAXSIZE - 1);
return NULL;
}
swListenPort *ls = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swListenPort));
if (ls == NULL)
{
swError("alloc failed");
return NULL;
}
swPort_init(ls);
ls->type = type;
ls->port = port;
strncpy(ls->host, host, strlen(host) + 1);
if (type & SW_SOCK_SSL)
{
type = type & (~SW_SOCK_SSL);
if (swSocket_is_stream(type))
{
ls->type = type;
ls->ssl = 1;
#ifdef SW_USE_OPENSSL
ls->ssl_config.prefer_server_ciphers = 1;
ls->ssl_config.session_tickets = 0;
ls->ssl_config.stapling = 1;
ls->ssl_config.stapling_verify = 1;
ls->ssl_config.ciphers = sw_strdup(SW_SSL_CIPHER_LIST);
ls->ssl_config.ecdh_curve = sw_strdup(SW_SSL_ECDH_CURVE);
#endif
}
}
//create server socket
int sock = swSocket_create(ls->type);
if (sock < 0)
{
swSysError("create socket failed.");
return NULL;
}
//bind address and port
if (swSocket_bind(sock, ls->type, ls->host, &ls->port) < 0)
{
close(sock);
return NULL;
}
//dgram socket, setting socket buffer size
if (swSocket_is_dgram(ls->type))
{
setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &ls->socket_buffer_size, sizeof(int));
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &ls->socket_buffer_size, sizeof(int));
}
//O_NONBLOCK & O_CLOEXEC
swoole_fcntl_set_option(sock, 1, 1);
ls->sock = sock;
if (swSocket_is_dgram(ls->type))
{
serv->have_udp_sock = 1;
serv->dgram_port_num++;
if (ls->type == SW_SOCK_UDP)
{
serv->udp_socket_ipv4 = sock;
}
else if (ls->type == SW_SOCK_UDP6)
{
serv->udp_socket_ipv6 = sock;
}
}
else
{
serv->have_tcp_sock = 1;
}
LL_APPEND(serv->listen_list, ls);
serv->listen_port_num++;
return ls;
}
swPort_init 函数
swPort_init函数用于初始化swListenPort对象backlog、tcp_keepcount、tcp_keepidle等等都是相应socket的属性- 在外网通信时,有些客户端发送数据的速度较慢,每次只能发送一小段数据。这样
onReceive到的数据就不是一个完整的包。 还有些客户端是逐字节发送数据的,如果每次回调onReceive会拖慢整个系统。Length_Check 和 EOF_Check 的使用。package_length_type、package_eof等等就是相关参数的具体参数。
#define SW_DATA_EOF "\r\n\r\n"
void swPort_init(swListenPort *port)
{
port->sock = 0;
port->ssl = 0;
//listen backlog
port->backlog = SW_BACKLOG;
//tcp keepalive
port->tcp_keepcount = SW_TCP_KEEPCOUNT;
port->tcp_keepinterval = SW_TCP_KEEPINTERVAL;
port->tcp_keepidle = SW_TCP_KEEPIDLE;
port->open_tcp_nopush = 1;
port->protocol.package_length_type = 'N';
port->protocol.package_length_size = 4;
port->protocol.package_body_offset = 4;
port->protocol.package_max_length = SW_BUFFER_INPUT_SIZE;
port->socket_buffer_size = SwooleG.socket_buffer_size;
char eof[] = SW_DATA_EOF;
port->protocol.package_eof_len = sizeof(SW_DATA_EOF) - 1;
memcpy(port->protocol.package_eof, eof, port->protocol.package_eof_len);
}
- c:有符号、1字节
- C:无符号、1字节
- s :有符号、主机字节序、2字节
- S:无符号、主机字节序、2字节
- n:无符号、网络字节序、2字节
- N:无符号、网络字节序、4字节
- l:有符号、主机字节序、4字节(小写L)
- L:无符号、主机字节序、4字节(大写L)
- v:无符号、小端字节序、2字节
- V:无符号、小端字节序、4字节
swSocket_create 创建 socket
swSocket_create 函数会根据 type 的类型来调用 socket 系统调用
int swSocket_create(int type)
{
int _domain;
int _type;
switch (type)
{
case SW_SOCK_TCP:
_domain = PF_INET;
_type = SOCK_STREAM;
break;
case SW_SOCK_TCP6:
_domain = PF_INET6;
_type = SOCK_STREAM;
break;
case SW_SOCK_UDP:
_domain = PF_INET;
_type = SOCK_DGRAM;
break;
case SW_SOCK_UDP6:
_domain = PF_INET6;
_type = SOCK_DGRAM;
break;
case SW_SOCK_UNIX_DGRAM:
_domain = PF_UNIX;
_type = SOCK_DGRAM;
break;
case SW_SOCK_UNIX_STREAM:
_domain = PF_UNIX;
_type = SOCK_STREAM;
break;
default:
swWarn("unknown socket type [%d]", type);
return SW_ERR;
}
return socket(_domain, _type, 0);
}
swSocket_bind 绑定端口
SO_REUSEADDR允许启动一个监听服务器并捆绑众所周知端口,即使以前建立的该端口用作它们的本地端口的连接仍存在。- 如果不对TCP的套接字选项进行任何限制时,如果启动两个进程,第二个进程就会在调用bind函数的时候出错(Address already in use)。
- 如果在调用bind之前我们设置了SO_REUSEADDR,但是不在第二个进程启动前close这个套接字,那么第二个进程仍然会在调用bind函数的时候出错(Address already in use)。
- 如果在调用bind之前我们设置了SO_REUSEADDR,并接收了一个客户端连接,并且在第二个进程启动前关闭了bind的套接字,这个时候第一个进程只拥有一个套接字(与客户端的连接),那么第二个进程则可以bind成功,符合预期。
- 相对
SO_REUSEADDR来说,SO_REUSEPORT没有那么多的限制条件,允许两个毫无血缘关系的进程使用相同的IP地址同时监听同一个端口,并且不会出现惊群效应 - 对于
UNIX SOCKET,需要设置sun_family与sun_path - 对于
IPV4,需要设置sin_family、sin_port、sin_addr;对于IPV6,需要设置sin6_family、sin6_port、sin6_addr,然后调用bind函数; - 如果
port为0,说明服务器绑定的是任意端口,bind函数会将系统所选择的端口返回给sockaddr对象
int swSocket_bind(int sock, int type, char *host, int *port)
{
int ret;
struct sockaddr_in addr_in4;
struct sockaddr_in6 addr_in6;
struct sockaddr_un addr_un;
socklen_t len;
//SO_REUSEADDR option
int option = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(int)) < 0)
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_SYSTEM_CALL_FAIL, "setsockopt(%d, SO_REUSEADDR) failed.", sock);
}
//reuse port
#ifdef HAVE_REUSEPORT
if (SwooleG.reuse_port)
{
if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &option, sizeof(int)) < 0)
{
swSysError("setsockopt(SO_REUSEPORT) failed.");
SwooleG.reuse_port = 0;
}
}
#endif
//unix socket
if (type == SW_SOCK_UNIX_DGRAM || type == SW_SOCK_UNIX_STREAM)
{
bzero(&addr_un, sizeof(addr_un));
unlink(host);
addr_un.sun_family = AF_UNIX;
strncpy(addr_un.sun_path, host, sizeof(addr_un.sun_path) - 1);
ret = bind(sock, (struct sockaddr*) &addr_un, sizeof(addr_un));
}
//IPv6
else if (type > SW_SOCK_UDP)
{
bzero(&addr_in6, sizeof(addr_in6));
inet_pton(AF_INET6, host, &(addr_in6.sin6_addr));
addr_in6.sin6_port = htons(*port);
addr_in6.sin6_family = AF_INET6;
ret = bind(sock, (struct sockaddr *) &addr_in6, sizeof(addr_in6));
if (ret == 0 && *port == 0)
{
len = sizeof(addr_in6);
if (getsockname(sock, (struct sockaddr *) &addr_in6, &len) != -1)
{
*port = ntohs(addr_in6.sin6_port);
}
}
}
//IPv4
else
{
bzero(&addr_in4, sizeof(addr_in4));
inet_pton(AF_INET, host, &(addr_in4.sin_addr));
addr_in4.sin_port = htons(*port);
addr_in4.sin_family = AF_INET;
ret = bind(sock, (struct sockaddr *) &addr_in4, sizeof(addr_in4));
if (ret == 0 && *port == 0)
{
len = sizeof(addr_in4);
if (getsockname(sock, (struct sockaddr *) &addr_in4, &len) != -1)
{
*port = ntohs(addr_in4.sin_port);
}
}
}
//bind failed
if (ret < 0)
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_SYSTEM_CALL_FAIL, "bind(%s:%d) failed. Error: %s [%d]", host, *port, strerror(errno), errno);
return SW_ERR;
}
return ret;
}
swoole_fcntl_set_option 函数为文件描述符设置选项
- 此函数主要是利用
fcntl函数为文件描述符设置阻塞/非阻塞、CLOEXEC等属性。
void swoole_fcntl_set_option(int sock, int nonblock, int cloexec)
{
int opts, ret;
if (nonblock >= 0)
{
do
{
opts = fcntl(sock, F_GETFL);
}
while (opts < 0 && errno == EINTR);
if (opts < 0)
{
swSysError("fcntl(%d, GETFL) failed.", sock);
}
if (nonblock)
{
opts = opts | O_NONBLOCK;
}
else
{
opts = opts & ~O_NONBLOCK;
}
do
{
ret = fcntl(sock, F_SETFL, opts);
}
while (ret < 0 && errno == EINTR);
if (ret < 0)
{
swSysError("fcntl(%d, SETFL, opts) failed.", sock);
}
}
#ifdef FD_CLOEXEC
if (cloexec >= 0)
{
do
{
opts = fcntl(sock, F_GETFD);
}
while (opts < 0 && errno == EINTR);
if (opts < 0)
{
swSysError("fcntl(%d, GETFL) failed.", sock);
}
if (cloexec)
{
opts = opts | FD_CLOEXEC;
}
else
{
opts = opts & ~FD_CLOEXEC;
}
do
{
ret = fcntl(sock, F_SETFD, opts);
}
while (ret < 0 && errno == EINTR);
if (ret < 0)
{
swSysError("fcntl(%d, SETFD, opts) failed.", sock);
}
}
#endif
}
以上所述就是小编给大家介绍的《Swoole 源码分析——Server 模块之初始化》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- nodejs源码—初始化
- Kratos 初始化源码分析
- Mybatis源码解读-初始化过程详解
- Vue源码探究-类初始化函数详情
- Spring MVC 源码解析(二)— 容器初始化
- Drone 源码分析之数据库初始化
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Responsive Web Design
Ethan Marcotte / Happy Cog / 2011-6 / USD 18.00
From mobile browsers to netbooks and tablets, users are visiting your sites from an increasing array of devices and browsers. Are your designs ready? Learn how to think beyond the desktop and craft be......一起来看看 《Responsive Web Design》 这本书的介绍吧!