内容简介:当我们谈起 nodejs 时,由于 JavaScript 只能在单线程上运行, 导致 一个 Node 进程只能运行在一个CPU上, 无法发挥现代 CPU 多核的特性。 这对于一个 服务端语言来说, 是比较掣肘其发展的。 好在 Node 在 v0.10 后, 可以使用 Cluster 模块搭建 多进程服务, 并在 v0.12 重写了该模块, 大幅提高其性能, 下面我们将走进 Node-Cluster ,看看 Node 是如何实现集群的。首先,这涉及一个 程序设计 的两种模型 本文提及的 Nodejs 便是使用
引言
当我们谈起 nodejs 时,由于 JavaScript 只能在单线程上运行, 导致 一个 Node 进程只能运行在一个CPU上, 无法发挥现代 CPU 多核的特性。 这对于一个 服务端语言来说, 是比较掣肘其发展的。 好在 Node 在 v0.10 后, 可以使用 Cluster 模块搭建 多进程服务, 并在 v0.12 重写了该模块, 大幅提高其性能, 下面我们将走进 Node-Cluster ,看看 Node 是如何实现集群的。
多进程单线程 与 单进程多线程 模型
首先,这涉及一个 程序设计 的两种模型 本文提及的 Nodejs 便是使用 多进程单线程
实现的, 而 Java 等语言 则使用 单进程多线程
模型 故本文着重 多进程单线程
模型, 至于 单进程多线程
模型,则会在比较中稍微涉及。
多进程单线程
正如上文说到, JavaScript 只能在单线程中运行, 故 Node 在 JS层面只能 在某进程的一个线程中运行。 若要实现集群, 则必须创建多个进程, 以实现多个应用实例同时运行。
Cluster 模块, 提供了 master-worker 模式 启动多个应用模式。
接下来我们就走入这个模块, 看看其内部具体做了哪些事情。
Cluster
Cluster 是什么?
- 在服务器上启动多个进程
- 每个进程里都跑的同一份代码
- 每个进程竟然还能监听同一个端口 (下文分析实现原理)
其中:
- 负责启动其他进程的叫做 Master 进程,他好比是个『包工头』,不做具体的工作,只负责启动其他进程。
- 其他被启动的叫 Worker 进程,顾名思义就是干活的『工人』。它们接收请求,对外提供服务。
- Worker 进程的数量一般根据服务器的 CPU 核数来定,这样就可以完美利用多核资源。
const cluster = require('cluster'); const http = require('http'); const numCPUs = require('os').cpus().length; if (cluster.isMaster) { // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } else { // Workers can share any TCP connection // In this case it is an HTTP server http.createServer(function(req, res) { res.writeHead(200); res.end("hello world\n"); }).listen(8000); }
worker 进程的创建
使用 Cluster 模块的 fork 方法来创建出子进程
cluster.fork();
先从 worker 进程的初始化开始看, master 进程在 fork 其 worker进程时 会在其环境变量(workerEnv)中 附加上一个 唯一ID(NODE UNIQUE ID) 该ID 是一个从 0 开始的递增数。
var ids = 0; //..... cluster.fork = function(env) { cluster.setupMaster(); const id = ++ids; const workerProcess = createWorkerProcess(id, env); const worker = new Worker({ id: id, process: workerProcess }); // .... function createWorkerProcess(id, env) { const workerEnv = util._extend({}, process.env); const execArgv = cluster.settings.execArgv.slice(); const debugArgRegex = /--inspect(?:-brk|-port)?|--debug-port/; util._extend(workerEnv, env); workerEnv.NODE_UNIQUE_ID = '' + id; // ..... return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerEnv, silent: cluster.settings.silent, windowsHide: cluster.settings.windowsHide, execArgv: execArgv, stdio: cluster.settings.stdio, gid: cluster.settings.gid, uid: cluster.settings.uid }); }
然后 Node 在实例初始化时,使用该 ID 判断使用 clild.js Or master.js
const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master'; module.exports = require(`internal/cluster/${childOrMaster}`);
把目光再集中到 这个 fork()
函数中来,
exports.fork = function(modulePath /*, args, options*/) { // Get options and args arguments. var execArgv; var options = {}; var args = []; var pos = 1; if (pos < arguments.length && Array.isArray(arguments[pos])) { args = arguments[pos++]; } if (pos < arguments.length && arguments[pos] != null) { if (typeof arguments[pos] !== 'object') { throw new TypeError('Incorrect value of args option'); } options = util._extend({}, arguments[pos++]); } // Prepare arguments for fork: execArgv = options.execArgv || process.execArgv; if (execArgv === process.execArgv && process._eval != null) { const index = execArgv.lastIndexOf(process._eval); if (index > 0) { // Remove the -e switch to avoid fork bombing ourselves. execArgv = execArgv.slice(); execArgv.splice(index - 1, 2); } } args = execArgv.concat([modulePath], args); if (typeof options.stdio === 'string') { options.stdio = stdioStringToArray(options.stdio); } else if (!Array.isArray(options.stdio)) { // Use a separate fd=3 for the IPC channel. Inherit stdin, stdout, // and stderr from the parent if silent isn't set. options.stdio = options.silent ? stdioStringToArray('pipe') : stdioStringToArray('inherit'); } else if (options.stdio.indexOf('ipc') === -1) { throw new TypeError('Forked processes must have an IPC channel'); } options.execPath = options.execPath || process.execPath; options.shell = false; return spawn(options.execPath, args, options); };
在函数中 做了一些 参数准备,而重点在于 对 这个 options.stdio
的处理。
options.stdio
用于配置子进程与父进程之间建立的管道, 其值应该为一个数组, 但是为了方便, 值可以是以下的字符串之一:
'pipe' - 等同于 ['pipe', 'pipe', 'pipe'] (默认)
'ignore' - 等同于 ['ignore', 'ignore', 'ignore']
'inherit' - 等同于 [process.stdin, process.stdout, process.stderr] 或 [0,1,2]
其每个值 分别对应 [process.stdin, process.stdout, process.stderr]
标准输入、标准输出、标准错误 输出到父进程的方式。
而使用 Fork 方式 衍生出的子进程,又必须加上 一个 IPC通道
用于父子间 传递消息或文件描述符, 故 stdioStringToArray
函数代码如下
function stdioStringToArray(option) { switch (option) { case 'ignore': case 'pipe': case 'inherit': return [option, option, option, 'ipc']; default: throw new TypeError('Incorrect value of stdio option: ' + option); } }
多进程之间 的 进程间通信
下面我们就重点来看,这个 IPC通道
是如何实现的,以及其工作原理
进程间通信(Inter-process communication, IPC)其实是个很简单的概念,只要你将这个进程的数据传到 另外一个进程就是 IPC
了, 要实现 这个数据传递的方式有非常的多, 如以下
在 Node 中 IPC
实现分两种, 在 Windows 上通过 命名管道, 在 UNIX 上则使用 UNIX domain sockets
(UDS), 详情参见 官方文档
目前 Linux 还是主流的服务端操作系统, 主要分析下在 Linux 下 UDS 的使用方式
UDS 是在 Socket 的基础上发展而来的, Socket 一般我们指的都是 IP Socket
, 通过网络协议进行通信, 但是在同一台设备上的通信,是否可以绕开网络层的限制呢。 这里我们就可以通过 UDS 来实现, 所以把 它称之为 LocalSocket
,看起来更贴切一点。
在 Linux 中, 一切都可以当做是 文件。 UDS 也不例外。
在 Node 的 child_process.js 模块中, 有如下代码
stdio = stdio.reduce(function(acc, stdio, i) { // ...... else if (stdio === 'ipc') { if (sync || ipc !== undefined) { // Cleanup previously created pipes cleanup(); if (!sync) throw new errors.Error('ERR_IPC_ONE_PIPE'); else throw new errors.Error('ERR_IPC_SYNC_FORK'); } ipc = new Pipe(PipeConstants.IPC); ipcFd = i; acc.push({ type: 'pipe', handle: ipc, ipc: true }); }
当 stdio 的 类型为 ipc
时,会创建一个 ipc 管道, 其fd 为 'ipc' 在stdio数组 中的索引
ipc = new Pipe(PipeConstants.IPC); ipcFd = i;
此时目光应该被这个 Pipe
所吸引了吧, 那么它又是什么呢, 话不多少直接上 libuv
中对应 Pipe 的实现。
void PipeWrap::New(const FunctionCallbackInfo<Value>& args) { // This constructor should not be exposed to public javascript. // Therefore we assert that we are not trying to call this as a // normal function. CHECK(args.IsConstructCall()); CHECK(args[0]->IsInt32()); Environment* env = Environment::GetCurrent(args); int type_value = args[0].As<Int32>()->Value(); PipeWrap::SocketType type = static_cast<PipeWrap::SocketType>(type_value); bool ipc; ProviderType provider; switch (type) { case SOCKET: provider = PROVIDER_PIPEWRAP; ipc = false; break; case SERVER: provider = PROVIDER_PIPESERVERWRAP; ipc = false; break; case IPC: provider = PROVIDER_PIPEWRAP; ipc = true; break; default: UNREACHABLE(); } new PipeWrap(env, args.This(), provider, ipc); } PipeWrap::PipeWrap(Environment* env, Local<Object> object, ProviderType provider, bool ipc) : ConnectionWrap(env, object, provider) { int r = uv_pipe_init(env->event_loop(), &handle_, ipc); CHECK_EQ(r, 0); // How do we proxy this error up to javascript? // Suggestion: uv_pipe_init() returns void. UpdateWriteQueueSize(); }
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); handle->shutdown_req = NULL; handle->connect_req = NULL; handle->pipe_fname = NULL; handle->ipc = ipc; return 0; }
void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream, uv_handle_type type) { int err; uv__handle_init(loop, (uv_handle_t*)stream, type); stream->read_cb = NULL; stream->alloc_cb = NULL; stream->close_cb = NULL; stream->connection_cb = NULL; stream->connect_req = NULL; stream->shutdown_req = NULL; stream->accepted_fd = -1; stream->queued_fds = NULL; stream->delayed_error = 0; QUEUE_INIT(&stream->write_queue); QUEUE_INIT(&stream->write_completed_queue); stream->write_queue_size = 0; if (loop->emfile_fd == -1) { err = uv__open_cloexec("/dev/null", O_RDONLY); if (err < 0) /* In the rare case that "/dev/null" isn't mounted open "/" * instead. */ err = uv__open_cloexec("/", O_RDONLY); if (err >= 0) loop->emfile_fd = err; } #if defined(__APPLE__) stream->select = NULL; #endif /* defined(__APPLE_) */ uv__io_init(&stream->io_watcher, uv__stream_io, -1); }
可以看到对应 ipc 管道, 底层是通过文件流的方式实现的。 那么这个管道 就一样拥有 和 stream 一样的模式,即 open—write/read—close
父进程在实际创建子进程前,会创建IPC通道并监听它,然后才真正创建出子进程,并通过环境变量(NODE CHANNEL FD)告诉子进程这个IPC通信的文件描述符(fd)。子进程在启动的过程中,根据文件描述符去连接这个已存在的IPC通道,从而完成父子进程之间的连接。
当父进程 send 数据到子进程时, 便通过这个 fd 向 这个特殊的文件开始写入数据,此时调用底层stream 的 write 方法,而子进程由于在启动的过程中便已经连接上 该通道, 在应用层通过 message 事件(底层应是 stream 的 read方法),来接收数据。
由于这个IPC通道是 双工通信的, 故子进程也可以实现向父进程通信。
问题解析
多进程之间是如何实现 端口共享的?
那么 node 又是如何实现多进程 监听同一个端口呢,这里的关键在于 句柄传递
。
如下代码:
//主进程代码 var child = require('child_process').fork('child.js'); // Open up the server object and send the handle var server = require('net').createServer(); server.on('connection', function (socket) { socket.end('handled by parent\n'); }); server.listen(1337, function () { child.send('server', server); }); //子进程代码 process.on('message', function (m, server) { if (m === 'server') { server.on('connection', function (socket) { socket.end('handled by child\n'); }); } });
在示例中,父进程直接将创建出来的 Tcp对象 传递到子进程中,但是我们知道两个进程之间又是无法直接共享内存的, 那么这又是怎么实现的呢?
来看这个 send方法
child.send(message, [sendHandle])
目前,子进程对象send()方法可以发送的句柄类型包括如下几种:
1.net.socket,tcp套接字
2.net.Server,tcp服务器,任意建立在tcp服务上的应用层服务都可以享受到它带来的好处。
3.net.Native,c++层面的tcp套接字或IPC管道。
4.dgram.socket,UDP套接字
5.dgram.Native,C++层面的UDP套接字
send()方法在将消息发送到IPC管道前,将消息组装成两个对象,一个参数是handle,另一个是message, 而发送到子进程的 实际上是这个句柄的 文件描述符, message对象也会序列化为字符串。
而子进程由于之前连接了IPC通道, 可以读取到父进程发送的消息。获取到这个消息后, 通过JSON.parse 还原为对象, 并分析 其中的cmd值, 若 message.cmd = NODE_HANDLE, 则表示父进程传递的是一个句柄,此时便会通过 获取到的 fd 和传递的句柄类型,还原出这个Tcp对象。
还原出这个server 对象后, 便是去监听这个端口了。
setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))
而在 libuv 中在 setsockopt 时设置了 SO_REUSEADDR 选项(关于这个选项的含义可以自行百度),简而言之,设置了这个选项后, 在Linux层面允许我们使用不同的进程 就相同的网卡和端口进行监听, 对于独立启动的进程,并不知道彼此的fd, 所以当一个进程监听成功后, 后面的进程便会失败,但是 通过 send 方式还原出来的 server对象,他们的fd是相同的,此时该子进程就可以通过这个fd 去监听这个端口。
但是 fd 在同一时刻只能被一个进程所占用, 换言之就是网络请求向服务器端发送时,只有一个幸运的进程能够抢到连接,也就是说只有他能为这个请求进行服务。这些进程也都是抢占式的。
总结
随着这些模块逐渐完善, Nodejs 在服务端的使用场景也越来越丰富,如果你仅仅是因为JS 这个后缀而注意到它的话, 那么我希望你能暂停脚步,好好了解一下这门年轻的语言,相信它会给你带来惊喜。
参考
https://github.com/nodejs/node
https://nodejs.org/dist/latest-v10.x/docs/api/net.html#net ipc support
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
SHA 加密
SHA 加密工具
RGB CMYK 转换工具
RGB CMYK 互转工具