聊聊Elasticsearch的MonitorService

栏目: 后端 · 发布时间: 6年前

内容简介:本文主要研究一下Elasticsearch的MonitorServiceelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/MonitorService.javaelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java

本文主要研究一下Elasticsearch的MonitorService

MonitorService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/MonitorService.java

public class MonitorService extends AbstractLifecycleComponent {

    private final JvmGcMonitorService jvmGcMonitorService;
    private final OsService osService;
    private final ProcessService processService;
    private final JvmService jvmService;
    private final FsService fsService;

    public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool,
                          ClusterInfoService clusterInfoService) throws IOException {
        this.jvmGcMonitorService = new JvmGcMonitorService(settings, threadPool);
        this.osService = new OsService(settings);
        this.processService = new ProcessService(settings);
        this.jvmService = new JvmService(settings);
        this.fsService = new FsService(settings, nodeEnvironment, clusterInfoService);
    }

    public OsService osService() {
        return this.osService;
    }

    public ProcessService processService() {
        return this.processService;
    }

    public JvmService jvmService() {
        return this.jvmService;
    }

    public FsService fsService() {
        return this.fsService;
    }

    @Override
    protected void doStart() {
        jvmGcMonitorService.start();
    }

    @Override
    protected void doStop() {
        jvmGcMonitorService.stop();
    }

    @Override
    protected void doClose() {
        jvmGcMonitorService.close();
    }

}
  • MonitorService的构造器创建了jvmGcMonitorService、osService、processService、jvmService、fsService;其doStart、doStop、doClose分别调用了jvmGcMonitorService的start、stop、close方法

JvmGcMonitorService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java

public class JvmGcMonitorService extends AbstractLifecycleComponent {
    private static final Logger logger = LogManager.getLogger(JvmGcMonitorService.class);

    private final ThreadPool threadPool;
    private final boolean enabled;
    private final TimeValue interval;
    private final Map<String, GcThreshold> gcThresholds;
    private final GcOverheadThreshold gcOverheadThreshold;

    private volatile Cancellable scheduledFuture;

    public static final Setting<Boolean> ENABLED_SETTING =
        Setting.boolSetting("monitor.jvm.gc.enabled", true, Property.NodeScope);
    public static final Setting<TimeValue> REFRESH_INTERVAL_SETTING =
        Setting.timeSetting("monitor.jvm.gc.refresh_interval", TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(1),
            Property.NodeScope);

    private static String GC_COLLECTOR_PREFIX = "monitor.jvm.gc.collector.";
    public static final Setting<Settings> GC_SETTING = Setting.groupSetting(GC_COLLECTOR_PREFIX, Property.NodeScope);

    public static final Setting<Integer> GC_OVERHEAD_WARN_SETTING =
        Setting.intSetting("monitor.jvm.gc.overhead.warn", 50, 0, 100, Property.NodeScope);
    public static final Setting<Integer> GC_OVERHEAD_INFO_SETTING =
        Setting.intSetting("monitor.jvm.gc.overhead.info", 25, 0, 100, Property.NodeScope);
    public static final Setting<Integer> GC_OVERHEAD_DEBUG_SETTING =
        Setting.intSetting("monitor.jvm.gc.overhead.debug", 10, 0, 100, Property.NodeScope);

    //......

    public JvmGcMonitorService(Settings settings, ThreadPool threadPool) {
        this.threadPool = threadPool;

        this.enabled = ENABLED_SETTING.get(settings);
        this.interval = REFRESH_INTERVAL_SETTING.get(settings);

        Map<String, GcThreshold> gcThresholds = new HashMap<>();
        Map<String, Settings> gcThresholdGroups = GC_SETTING.get(settings).getAsGroups();
        for (Map.Entry<String, Settings> entry : gcThresholdGroups.entrySet()) {
            String name = entry.getKey();
            TimeValue warn = getValidThreshold(entry.getValue(), entry.getKey(), "warn");
            TimeValue info = getValidThreshold(entry.getValue(), entry.getKey(), "info");
            TimeValue debug = getValidThreshold(entry.getValue(), entry.getKey(), "debug");
            gcThresholds.put(name, new GcThreshold(name, warn.millis(), info.millis(), debug.millis()));
        }
        gcThresholds.putIfAbsent(GcNames.YOUNG, new GcThreshold(GcNames.YOUNG, 1000, 700, 400));
        gcThresholds.putIfAbsent(GcNames.OLD, new GcThreshold(GcNames.OLD, 10000, 5000, 2000));
        gcThresholds.putIfAbsent("default", new GcThreshold("default", 10000, 5000, 2000));
        this.gcThresholds = unmodifiableMap(gcThresholds);

        if (GC_OVERHEAD_WARN_SETTING.get(settings) <= GC_OVERHEAD_INFO_SETTING.get(settings)) {
            final String message =
                String.format(
                    Locale.ROOT,
                    "[%s] must be greater than [%s] [%d] but was [%d]",
                    GC_OVERHEAD_WARN_SETTING.getKey(),
                    GC_OVERHEAD_INFO_SETTING.getKey(),
                    GC_OVERHEAD_INFO_SETTING.get(settings),
                    GC_OVERHEAD_WARN_SETTING.get(settings));
            throw new IllegalArgumentException(message);
        }
        if (GC_OVERHEAD_INFO_SETTING.get(settings) <= GC_OVERHEAD_DEBUG_SETTING.get(settings)) {
            final String message =
                String.format(
                    Locale.ROOT,
                    "[%s] must be greater than [%s] [%d] but was [%d]",
                    GC_OVERHEAD_INFO_SETTING.getKey(),
                    GC_OVERHEAD_DEBUG_SETTING.getKey(),
                    GC_OVERHEAD_DEBUG_SETTING.get(settings),
                    GC_OVERHEAD_INFO_SETTING.get(settings));
            throw new IllegalArgumentException(message);
        }

        this.gcOverheadThreshold = new GcOverheadThreshold(
            GC_OVERHEAD_WARN_SETTING.get(settings),
            GC_OVERHEAD_INFO_SETTING.get(settings),
            GC_OVERHEAD_DEBUG_SETTING.get(settings));

        logger.debug(
            "enabled [{}], interval [{}], gc_threshold [{}], overhead [{}, {}, {}]",
            this.enabled,
            this.interval,
            this.gcThresholds,
            this.gcOverheadThreshold.warnThreshold,
            this.gcOverheadThreshold.infoThreshold,
            this.gcOverheadThreshold.debugThreshold);
    }

    @Override
    protected void doStart() {
        if (!enabled) {
            return;
        }
        scheduledFuture = threadPool.scheduleWithFixedDelay(new JvmMonitor(gcThresholds, gcOverheadThreshold) {
            @Override
            void onMonitorFailure(Exception e) {
                logger.debug("failed to monitor", e);
            }

            @Override
            void onSlowGc(final Threshold threshold, final long seq, final SlowGcEvent slowGcEvent) {
                logSlowGc(logger, threshold, seq, slowGcEvent, JvmGcMonitorService::buildPools);
            }

            @Override
            void onGcOverhead(final Threshold threshold, final long current, final long elapsed, final long seq) {
                logGcOverhead(logger, threshold, current, elapsed, seq);
            }
        }, interval, Names.SAME);
    }

     @Override
    protected void doStop() {
        if (!enabled) {
            return;
        }
        scheduledFuture.cancel();
    }

    @Override
    protected void doClose() {
    }

    //......
}
  • JvmGcMonitorService的doStart方法通过scheduleWithFixedDelay注册了JvmMonitor的定时任务;其doStop方法则是cancel掉该定时任务

JvmMonitor

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java

abstract static class JvmMonitor implements Runnable {

        enum Threshold { DEBUG, INFO, WARN }

        static class SlowGcEvent {

            final GarbageCollector currentGc;
            final long collectionCount;
            final TimeValue collectionTime;
            final long elapsed;
            final JvmStats lastJvmStats;
            final JvmStats currentJvmStats;
            final ByteSizeValue maxHeapUsed;

            SlowGcEvent(
                final GarbageCollector currentGc,
                final long collectionCount,
                final TimeValue collectionTime,
                final long elapsed,
                final JvmStats lastJvmStats,
                final JvmStats currentJvmStats,
                final ByteSizeValue maxHeapUsed) {
                this.currentGc = currentGc;
                this.collectionCount = collectionCount;
                this.collectionTime = collectionTime;
                this.elapsed = elapsed;
                this.lastJvmStats = lastJvmStats;
                this.currentJvmStats = currentJvmStats;
                this.maxHeapUsed = maxHeapUsed;
            }

        }

        private long lastTime = now();
        private JvmStats lastJvmStats = jvmStats();
        private long seq = 0;
        private final Map<String, JvmGcMonitorService.GcThreshold> gcThresholds;
        final GcOverheadThreshold gcOverheadThreshold;

        JvmMonitor(final Map<String, GcThreshold> gcThresholds, final GcOverheadThreshold gcOverheadThreshold) {
            this.gcThresholds = Objects.requireNonNull(gcThresholds);
            this.gcOverheadThreshold = Objects.requireNonNull(gcOverheadThreshold);
        }

        @Override
        public void run() {
            try {
                monitorGc();
            } catch (Exception e) {
                onMonitorFailure(e);
            }
        }

        abstract void onMonitorFailure(Exception e);

        synchronized void monitorGc() {
            seq++;
            final long currentTime = now();
            JvmStats currentJvmStats = jvmStats();

            final long elapsed = TimeUnit.NANOSECONDS.toMillis(currentTime - lastTime);

            monitorSlowGc(currentJvmStats, elapsed);
            monitorGcOverhead(currentJvmStats, elapsed);

            lastTime = currentTime;
            lastJvmStats = currentJvmStats;
        }

        final void monitorSlowGc(JvmStats currentJvmStats, long elapsed) {
            for (int i = 0; i < currentJvmStats.getGc().getCollectors().length; i++) {
                GarbageCollector gc = currentJvmStats.getGc().getCollectors()[i];
                GarbageCollector prevGc = lastJvmStats.getGc().getCollectors()[i];

                // no collection has happened
                long collections = gc.getCollectionCount() - prevGc.getCollectionCount();
                if (collections == 0) {
                    continue;
                }
                long collectionTime = gc.getCollectionTime().millis() - prevGc.getCollectionTime().millis();
                if (collectionTime == 0) {
                    continue;
                }

                GcThreshold gcThreshold = gcThresholds.get(gc.getName());
                if (gcThreshold == null) {
                    gcThreshold = gcThresholds.get("default");
                }

                long avgCollectionTime = collectionTime / collections;

                Threshold threshold = null;
                if (avgCollectionTime > gcThreshold.warnThreshold) {
                    threshold = Threshold.WARN;
                } else if (avgCollectionTime > gcThreshold.infoThreshold) {
                    threshold = Threshold.INFO;
                } else if (avgCollectionTime > gcThreshold.debugThreshold) {
                    threshold = Threshold.DEBUG;
                }
                if (threshold != null) {
                    onSlowGc(threshold, seq, new SlowGcEvent(
                        gc,
                        collections,
                        TimeValue.timeValueMillis(collectionTime),
                        elapsed,
                        lastJvmStats,
                        currentJvmStats,
                        JvmInfo.jvmInfo().getMem().getHeapMax()));
                }
            }
        }

        final void monitorGcOverhead(final JvmStats currentJvmStats, final long elapsed) {
            long current = 0;
            for (int i = 0; i < currentJvmStats.getGc().getCollectors().length; i++) {
                GarbageCollector gc = currentJvmStats.getGc().getCollectors()[i];
                GarbageCollector prevGc = lastJvmStats.getGc().getCollectors()[i];
                current += gc.getCollectionTime().millis() - prevGc.getCollectionTime().millis();
            }
            checkGcOverhead(current, elapsed, seq);
        }

        void checkGcOverhead(final long current, final long elapsed, final long seq) {
            final int fraction = (int) ((100 * current) / (double) elapsed);
            Threshold overheadThreshold = null;
            if (fraction >= gcOverheadThreshold.warnThreshold) {
                overheadThreshold = Threshold.WARN;
            } else if (fraction >= gcOverheadThreshold.infoThreshold) {
                overheadThreshold = Threshold.INFO;
            } else if (fraction >= gcOverheadThreshold.debugThreshold) {
                overheadThreshold = Threshold.DEBUG;
            }
            if (overheadThreshold != null) {
                onGcOverhead(overheadThreshold, current, elapsed, seq);
            }
        }

        JvmStats jvmStats() {
            return JvmStats.jvmStats();
        }

        long now() {
            return System.nanoTime();
        }

        abstract void onSlowGc(Threshold threshold, long seq, SlowGcEvent slowGcEvent);

        abstract void onGcOverhead(Threshold threshold, long total, long elapsed, long seq);

    }
  • JvmMonitor实现了Runnable接口,其run方法执行monitorGc方法,异常时执行onMonitorFailure方法;JvmMonitor还定义了onMonitorFailure、onSlowGc、onGcOverhead抽象方法需要子类去实现
  • monitorGc方法首先获取currentJvmStats,然后执行monitorSlowGc及monitorGcOverhead
  • monitorSlowGc方法主要是计算avgCollectionTime,然后判断是否超出指定level的阈值,超出则回调onSlowGc方法;monitorGcOverhead方法主要是计算gc耗时占比,如果判断是否超过指定level的阈值,超出则回调onGcOverhead方法

小结

  • MonitorService的构造器创建了jvmGcMonitorService、osService、processService、jvmService、fsService;其doStart、doStop、doClose分别调用了jvmGcMonitorService的start、stop、close方法
  • JvmGcMonitorService的doStart方法通过scheduleWithFixedDelay注册了JvmMonitor的定时任务;其doStop方法则是cancel掉该定时任务
  • JvmMonitor实现了Runnable接口,其run方法执行monitorGc方法,异常时执行onMonitorFailure方法;JvmMonitor还定义了onMonitorFailure、onSlowGc、onGcOverhead抽象方法需要子类去实现

doc


以上所述就是小编给大家介绍的《聊聊Elasticsearch的MonitorService》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Python Machine Learning

Python Machine Learning

Sebastian Raschka / Packt Publishing - ebooks Account / 2015-9 / USD 44.99

About This Book Leverage Python' s most powerful open-source libraries for deep learning, data wrangling, and data visualization Learn effective strategies and best practices to improve and opti......一起来看看 《Python Machine Learning》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具