nodejs“并行”处理尝试

栏目: Node.js · 发布时间: 6年前

内容简介:之前做过一些爬取方面的工作,由于node不能多线程,为了提高抓取效率,都是使用child_process.fork来多进程跑任务,然后通过message事件与主进程进行通信,代码编写的时候都是用的yield/await之类的同步写法,于是这次尝试利用node非阻塞I/O的机制,利用多个函数同时运行来模拟多线程,效果如何呢?server.js用来统计qps,将产出的数据status.txt里的内容复制到echarts的官方示例里进行可视化,从而验证是否能达到“并行”的效果

之前做过一些爬取方面的工作,由于node不能多线程,为了提高抓取效率,都是使用child_process.fork来多进程跑任务,然后通过message事件与主进程进行通信,代码编写的时候都是用的yield/await之类的同步写法,于是这次尝试利用node非阻塞I/O的机制,利用多个函数同时运行来模拟多线程,效果如何呢?

尝试“并行”发送HTTP请求

server.js

用来统计qps,将产出的数据status.txt里的内容复制到echarts的官方示例里进行可视化,从而验证是否能达到“并行”的效果

const fs = require('fs');
const Koa = require('koa');
const app = new Koa();

// 用来统计qps
let last_time = new Date(),
    init_timestamp = last_time.getTime(),
    count = 0;

// 运行时长60s
let run_secs = 60;

// 用来存储qps历史,用于绘制曲线图
let qps_list = [],
    timestr_list = [];

app.use(async ctx => {
    // 简单的模拟计算qps
    let cur_time = new Date(),
        cur_timestr = cur_time.toLocaleTimeString(),
        cur_timestamp = cur_time.getTime(),
        last_timestr = last_time.toLocaleTimeString();

    if (cur_timestr !== last_timestr) {
        let timestamp_cost = Math.round((cur_timestamp - init_timestamp) / 1000);

        console.log(`\n${cur_timestr}: ${timestamp_cost} qps*********************************`);
        console.log(count);

        qps_list.push(count);
        timestr_list.push(cur_timestr);

        if (timestamp_cost >= run_secs) {
            // 将运行结果存储起来,打开http://echarts.baidu.com/examples/editor.html?c=line-smooth,复制内容查看曲线图
            let option_str = JSON.stringify({
                tooltip: {
                    trigger: 'axis'
                },
                xAxis: {
                    type: 'category',
                    data: timestr_list
                },
                yAxis: {
                    type: 'value'
                },
                series: [{
                    data: qps_list,
                    type: 'line',
                    smooth: true
                }]
            }, null, 2);
            fs.writeFileSync('./status.txt', `option=${option_str}`);

            console.log('1.复制status.txt的内容');
            console.log('2.打开http://echarts.baidu.com/examples/editor.html?c=line-smooth');
            console.log('3.粘贴在左边代码区域');
            console.log('4.点击"运行",在右侧区域查看');
            process.exit();
        }

        last_time = cur_time;
        count = 1;
    } else {
        count++;
    }

    // 模拟服务端处理请求的时间
    await delay();

    ctx.body = 'hello';
});

function delay () {
    return new Promise((resolve) => {
        setTimeout(resolve, 250);
    });
}

app.listen(3000);
复制代码

单进程版本

client.js

const axios = require('axios');

async function sendRequest (id) {
    return new Promise((resolve, reject) => {
        axios.get(`http://localhost:3000?id=${id}`).then(res => {
            resolve(res.data);
        }).catch(e => {
            reject(e);
        });
    });
}

function run () {
    let threads = 1;

    for (let i = 0; i < threads; i++) {
        makeThread(i);
    }
}

async function makeThread (id) {
    while (true) {
        try {
            await sendRequest(id);
        } catch (e) {
            console.log(id, e.message);
            process.exit();
        }
    }
}

run();
复制代码

多进程版本(进行对照)

client_center.js

const fork = require('child_process').fork;

function run () {
    let threads = 1;

    for (let i = 0; i < threads; i++) {
        fork('./client_worker.js', [i]);
    }
}

run();
复制代码

client_worker.js

const axios = require('axios');

let id = process.argv[2];

async function sendRequest () {
    return new Promise((resolve, reject) => {
        axios.get(`http://localhost:3000?id=${id}`).then(res => {
            resolve(res.data);
        }).catch(e => {
            reject(e);
        });
    });
}

async function makeThread () {
    while (true) {
        try {
            await sendRequest();
        } catch (e) {
            console.log(id, e.message);
            process.exit();
        }
    }
}

makeThread();
复制代码

运行结果(服务设置延时250ms)

2核机器

threads 单进程版本 多进程版本 备注
1 nodejs“并行”处理尝试 nodejs“并行”处理尝试 区别不大
5 nodejs“并行”处理尝试 nodejs“并行”处理尝试 区别不大
50 nodejs“并行”处理尝试 nodejs“并行”处理尝试 多进程效果弱于单进程版本
100 nodejs“并行”处理尝试 nodejs“并行”处理尝试 多进程效果弱于单进程版本
200 nodejs“并行”处理尝试 nodejs“并行”处理尝试 多进程弱于单进程版本,且多进程版本总是报错:read ECONNRESET/connect ECONNRESET/socket hang up
300 nodejs“并行”处理尝试 nodejs“并行”处理尝试 多进程弱于单进程版本,且多进程版本总是报错:read ECONNRESET/connect ECONNRESET/socket hang up

观察

对比结果让我挺吃惊的,这样看来单进程的模拟效果居然会比多进程好,但突然想到自己电脑上才几核,怎么同时跑几百个进程....... 登录到公司服务器上(48核)继续实验:

48核机器

threads 单进程版本 多进程版本 备注
30 nodejs“并行”处理尝试 nodejs“并行”处理尝试 区别不大
40 nodejs“并行”处理尝试 nodejs“并行”处理尝试 区别不大
100 nodejs“并行”处理尝试 nodejs“并行”处理尝试 qps峰值相同,但多进程更稳定
200 nodejs“并行”处理尝试 nodejs“并行”处理尝试 多进程版本优于单进程版本
300 nodejs“并行”处理尝试 nodejs“并行”处理尝试 多进程版本优于单进程版本
1000 nodejs“并行”处理尝试 nodejs“并行”处理尝试 多进程版本优于单进程版本
1500 nodejs“并行”处理尝试 nodejs“并行”处理尝试 多进程版本优于单进程版本,但threads增加所带来的收益较低,多进程版本峰值4318<1500*4,单进程版本峰值3058<1500*4
3000 nodejs“并行”处理尝试 nodejs“并行”处理尝试 单进程版本(峰值2969)优于多进程版本(峰值1500)

观察

  • 确实能通过多个函数同时运行的方式来模拟多线程的效果
  • 当threads设置与核数差距不大时,两者效果差不多。
  • 在高性能机器上, 在一定范围(大部分范围)内 ,threads越大,多进程版本的效果越好,但超过这个范围(极端情况),单进程版本反而表现突出
  • 在低性能机器上,单进程版本表现更好,由于出现的read ECONNRESET/connect ECONNRESET/socket hang up等错误使得无法继续增大threads数量观察下去
  • 低性能机器上两个版本都会表现出奇怪的周期性,在高性能机器上多进程版本会更稳定一些

分析

  • client.js能模拟“并行”的效果实际上是利用网络耗时远大于代码循环的原理,第一次for循环连续发送threads个网络请求,然后在处理回调的时候又发送新的网络请求,效果就变成了多个线程在不停的发请求。

不负责任的猜测

  • 高/低性能表现不一致,低性能机器是mac,libuv中使用kqueue处理网络I/O,高性能机器时linux,libuv中使用epoll处理

新的问题

  • 该服务性能的极限QPS是多少
  • 奇怪的曲线产生原因

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

深入React技术栈

深入React技术栈

陈屹 / 人民邮电出版社 / 2016-11-1 / CNY 79.00

全面讲述React技术栈的第一本原创图书,pure render专栏主创倾力打造 覆盖React、Flux、Redux及可视化,帮助开发者在实践中深入理解技术和源码 前端组件化主流解决方案,一本书玩转React“全家桶” 本书讲解了非常多的内容,不仅介绍了面向普通用户的API、应用架构和周边工具,还深入介绍了底层实现。此外,本书非常重视实战,每一节都有实际的例子,细节丰富。我从这......一起来看看 《深入React技术栈》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具