了解Vert.x:事件循环

栏目: Java · 发布时间: 5年前

内容简介:让Vert.x框架实现高度可扩展和高性能的核心是事件循环,更具体地说是Multi-Reactor模式,以及它的消息总线,在Vert.x中称为EventBus。在本文中,我想解决有关事件循环的误解,例如:“Vert.x有EventLoop,所以它是单线程的,只使用一个CPU”?

让Vert.x框架实现高度可扩展和高性能的核心是事件循环,更具体地说是Multi-Reactor模式,以及它的消息总线,在Vert.x中称为EventBus。

在本文中,我想解决有关事件循环的误解,例如:

“Vert.x有EventLoop,所以它是单线程的,只使用一个CPU”?

要么

“Vert.x是多线程的,所以它必须为每个Verticle创建一个线程”?

Multi-Reactor

Event Loop是Reactor设计模式的一个实现。

它的目标是不断检查新事件,并在每次新事件发生时,快速将其发送给知道如何处理它的人。

但是通过仅使用一个线程来消费所有事件,我们基本上没有充分利用我们的硬件。例如,Node.js应用程序通常会生成多个进程来解决该问题。

在提供良好隔离的同时,进程也很昂贵。Vert.x使用多个线程,这在系统资源方面更便宜点。

为了理解Multi-Reactor在实践中的工作原理,我们将通过简单的调用来检查线程的数量 Thread.activeCount(),虽然不准确,但这足以满足我们的目的。

让我们先看看我们在程序开头有多少线程:

Before starting VertX -> 1 thread

现在我们将启动Vert.x应用程序:

Vertx vertx = Vertx.vertx();

再次检查线程数:

After starting VertX -> 3 threads

因此,启动Vert.x会产生2个额外的线程。一个是运行应用程序,另一个是调用vertx-blocked-thread-checker

现在让我们部署一千个Verticle,看看它如何影响我们的线程数。Verticle是轻量级的actor,通常在事件循环上运行。

<b>final</b> Map<String, AtomicInteger> threadCounts = <b>new</b> ConcurrentHashMap<>();

<b>int</b> verticles = 1000;
<b>final</b> CountDownLatch latch = <b>new</b> CountDownLatch(verticles);
<b>for</b> (<b>int</b> i = 0; i < verticles; i++) {
    vertx.deployVerticle(<b>new</b> MyVerticle(threadCounts), c -> latch.countDown());
}
latch.await();

threadCounts现在不要理会,因为它将在后面解释。

我们在这里使用CountDownLatch,因为Verticle是异步部署的,我们希望确保在检查线程数时已经部署了所有实例。

After deploying 1000 verticles -> 19 threads

之前我们有3个线程,现在又增加了16个线程。它们都以形式命名vert.x-eventloop-thread-X。您可以启动一万个Verticle,并且不会影响事件循环线程的数量。

到目前为止,有两个重要的要点:

  • Vert.x不是单线程的
  • 事件循环线程的最大数量取决于CPU的数量,而不是部署的Verticle数量

您可以在此处查看默认线程数:

https://github.com/eclipse/vert.x/blob/master/src/main/java/io/vertx/core/VertxOptions.java#L38

现在是时候看看我们的Verticle是什么样的,为什么我们传递HashMap给它:

<b>class</b> MyVerticle <b>extends</b> AbstractVerticle {
    <b>private</b> <b>final</b> Map<String, AtomicInteger> threadCounts;

    MyVerticle(Map<String, AtomicInteger> threadCounts) {
        <b>this</b>.threadCounts = threadCounts;
    }

    @Override
    <b>public</b> <b>void</b> start() {
        threadCounts.computeIfAbsent(Thread.currentThread().getName(),
                t -> <b>new</b> AtomicInteger(0)).incrementAndGet();
    }
}

因此,当每个Verticle启动时,它会记录已分配的线程。

此代码有助于我们了解如何在Verticle之间划分线程:

vert.x-eventloop-thread-0=125

vert.x-eventloop-thread-1=125

vert.x-eventloop-thread-2=125

vert.x-eventloop-thread-3=125

vert.x-eventloop-thread-4=125

vert.x-eventloop-thread-5=125

vert.x-eventloop-thread-6=125

vert.x-eventloop-thread-7=125

如您所见,每个新Verticle以循环方式获取一个线程。

查看结果您可能想知道,为什么我们部署了16个事件循环线程,但Verticle仅在前8个中注册。原因是我们非常积极地部署Verticle。在常规应用程序中,您可能不会这样做。

所以,让我们放松一下。我们将部署相同的千个Verticle,但这一次,一个接一个:

<b>private</b> <b>void</b> deployMyVerticle(<b>final</b> Vertx vertx,
                              <b>final</b> Map<String, AtomicInteger> threadCounts,
                              <b>final</b> AtomicInteger counter,
                              <b>final</b> <b>int</b> verticles) {
    vertx.deployVerticle(<b>new</b> MyVerticle(threadCounts), c -> {
        <b>if</b> (counter.incrementAndGet() < verticles) {
            deployMyVerticle(vertx, threadCounts, counter, verticles);
        }
    });
}

结果是我们使用的线程比以前少:

vert.x-eventloop-thread-0 = 250

vert.x-eventloop-thread-1 = 250

vert.x-eventloop-thread-2 = 250

vert.x-eventloop-thread-3 = 250

那是因为框架有足够的时间来做出反应。

Worker Verticle

worker verticle用于执行长时间运行或阻塞任务。让我们现在以类似的方式部署一千个worker Verticle,看看会发生什么:

<b>final</b> CountDownLatch workersLatch = <b>new</b> CountDownLatch(verticles);
<b>final</b> DeploymentOptions worker = <b>new</b> DeploymentOptions().setWorker(<b>true</b>);
<b>for</b> (<b>int</b> i = 0; i < verticles; i++) {
    vertx.deployVerticle(<b>new</b> MyVerticle(threadCounts), worker, c -> workersLatch.countDown());
}
workersLatch.await();

线程数:

After deploying 1000 worker verticles -> 27 threads

部署一千个worker  Verticle增加了另外20个线程。

这是因为工作者Verticle使用一个单独的线程池,默认情况下大小为20。

https://github.com/eclipse/vert.x/blob/master/src/main/java/io/vertx/core/VertxOptions.java#L43

您可以通过调用VertxOptions的setWorkerPoolSize()on 来控制此池的大小,然后在Vert.x初始化时传递它们:

<b>final</b> VertxOptions options = <b>new</b> VertxOptions().setWorkerPoolSize(10);
Vertx vertx = Vertx.vertx(options);

请注意,与常规Verticle不同,worker Verticle不会在线程之间均匀分布,因为它们用于不同的目的:

vert.x-worker-thread-0=126

vert.x-worker-thread-1=39

vert.x-worker-thread-2=94

vert.x-worker-thread-3=118

vert.x-worker-thread-4=89

vert.x-worker-thread-5=114

vert.x-worker-thread-6=222

vert.x-worker-thread-7=79

vert.x-worker-thread-8=67

vert.x-worker-thread-9=50

可以类似的方式控制事件循环池的大小:

<b>final</b> VertxOptions options = <b>new</b> VertxOptions().setEventLoopPoolSize(4);
Vertx vertx = Vertx.vertx(options);

结论

以下是几个要点:

  • Vert.x是多线程框架
  • 它使用受控数量的线程
  • 对于事件循环任务,默认情况下,线程池的大小是CPU计数的两倍
  • 对于worker任务,默认情况下线程池的大小为20
  • 可以轻松调整两个线程池的大小

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Scalability for Startup Engineers

Web Scalability for Startup Engineers

Artur Ejsmont / McGraw / 2015-6-23 / USD 34.81

Design and build scalable web applications quickly This is an invaluable roadmap for meeting the rapid demand to deliver scalable applications in a startup environment. With a focus on core concept......一起来看看 《Web Scalability for Startup Engineers》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具