内容简介:本文主要研究一下Elasticsearch的CircuitBreakerServiceelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.javaelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerStats.java
序
本文主要研究一下Elasticsearch的CircuitBreakerService
CircuitBreakerService
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerService.java
/** * Interface for Circuit Breaker services, which provide breakers to classes * that load field data. */ public abstract class CircuitBreakerService extends AbstractLifecycleComponent { private static final Logger logger = LogManager.getLogger(CircuitBreakerService.class); protected CircuitBreakerService() { } /** * Allows to register of a custom circuit breaker. */ public abstract void registerBreaker(BreakerSettings breakerSettings); /** * @return the breaker that can be used to register estimates against */ public abstract CircuitBreaker getBreaker(String name); /** * @return stats about all breakers */ public abstract AllCircuitBreakerStats stats(); /** * @return stats about a specific breaker */ public abstract CircuitBreakerStats stats(String name); @Override protected void doStart() { } @Override protected void doStop() { } @Override protected void doClose() { } }
- CircuitBreakerService继承了AbstractLifecycleComponent,定义了registerBreaker、getBreaker、stats抽象方法;它有两个实现类分别为NoneCircuitBreakerService、HierarchyCircuitBreakerService
CircuitBreakerStats
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/indices/breaker/CircuitBreakerStats.java
public class CircuitBreakerStats implements Writeable, ToXContentObject { private final String name; private final long limit; private final long estimated; private final long trippedCount; private final double overhead; public CircuitBreakerStats(String name, long limit, long estimated, double overhead, long trippedCount) { this.name = name; this.limit = limit; this.estimated = estimated; this.trippedCount = trippedCount; this.overhead = overhead; } public CircuitBreakerStats(StreamInput in) throws IOException { limit = in.readLong(); estimated = in.readLong(); overhead = in.readDouble(); this.trippedCount = in.readLong(); this.name = in.readString(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeLong(limit); out.writeLong(estimated); out.writeDouble(overhead); out.writeLong(trippedCount); out.writeString(name); } public String getName() { return this.name; } public long getLimit() { return this.limit; } public long getEstimated() { return this.estimated; } public long getTrippedCount() { return this.trippedCount; } public double getOverhead() { return this.overhead; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(name.toLowerCase(Locale.ROOT)); builder.field(Fields.LIMIT, limit); builder.field(Fields.LIMIT_HUMAN, new ByteSizeValue(limit)); builder.field(Fields.ESTIMATED, estimated); builder.field(Fields.ESTIMATED_HUMAN, new ByteSizeValue(estimated)); builder.field(Fields.OVERHEAD, overhead); builder.field(Fields.TRIPPED_COUNT, trippedCount); builder.endObject(); return builder; } @Override public String toString() { return "[" + this.name + ",limit=" + this.limit + "/" + new ByteSizeValue(this.limit) + ",estimated=" + this.estimated + "/" + new ByteSizeValue(this.estimated) + ",overhead=" + this.overhead + ",tripped=" + this.trippedCount + "]"; } static final class Fields { static final String LIMIT = "limit_size_in_bytes"; static final String LIMIT_HUMAN = "limit_size"; static final String ESTIMATED = "estimated_size_in_bytes"; static final String ESTIMATED_HUMAN = "estimated_size"; static final String OVERHEAD = "overhead"; static final String TRIPPED_COUNT = "tripped"; } }
- CircuitBreakerStats包含了name、limit、estimated、trippedCount、overhead属性
BreakerSettings
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/indices/breaker/BreakerSettings.java
public final class BreakerSettings { private final String name; private final long limitBytes; private final double overhead; private final CircuitBreaker.Type type; private final CircuitBreaker.Durability durability; public BreakerSettings(String name, long limitBytes, double overhead) { this(name, limitBytes, overhead, CircuitBreaker.Type.MEMORY, CircuitBreaker.Durability.PERMANENT); } public BreakerSettings(String name, long limitBytes, double overhead, CircuitBreaker.Type type, CircuitBreaker.Durability durability) { this.name = name; this.limitBytes = limitBytes; this.overhead = overhead; this.type = type; this.durability = durability; } public String getName() { return this.name; } public long getLimit() { return this.limitBytes; } public double getOverhead() { return this.overhead; } public CircuitBreaker.Type getType() { return this.type; } public CircuitBreaker.Durability getDurability() { return durability; } @Override public String toString() { return "[" + this.name + ",type=" + this.type.toString() + ",durability=" + this.durability.toString() + ",limit=" + this.limitBytes + "/" + new ByteSizeValue(this.limitBytes) + ",overhead=" + this.overhead + "]"; } }
- BreakerSettings包含了name、limitBytes、overhead、type、durability属性
NoneCircuitBreakerService
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/indices/breaker/NoneCircuitBreakerService.java
/** * Class that returns a breaker that never breaks */ public class NoneCircuitBreakerService extends CircuitBreakerService { private final CircuitBreaker breaker = new NoopCircuitBreaker(CircuitBreaker.FIELDDATA); public NoneCircuitBreakerService() { super(); } @Override public CircuitBreaker getBreaker(String name) { return breaker; } @Override public AllCircuitBreakerStats stats() { return new AllCircuitBreakerStats(new CircuitBreakerStats[] {stats(CircuitBreaker.FIELDDATA)}); } @Override public CircuitBreakerStats stats(String name) { return new CircuitBreakerStats(CircuitBreaker.FIELDDATA, -1, -1, 0, 0); } @Override public void registerBreaker(BreakerSettings breakerSettings) { // ignore } }
- NoneCircuitBreakerService不做熔断处理
HierarchyCircuitBreakerService
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/indices/breaker/HierarchyCircuitBreakerService.java
/** * CircuitBreakerService that attempts to redistribute space between breakers * if tripped */ public class HierarchyCircuitBreakerService extends CircuitBreakerService { private static final Logger logger = LogManager.getLogger(HierarchyCircuitBreakerService.class); private static final String CHILD_LOGGER_PREFIX = "org.elasticsearch.indices.breaker."; private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean(); private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap<>(); public static final Setting<Boolean> USE_REAL_MEMORY_USAGE_SETTING = Setting.boolSetting("indices.breaker.total.use_real_memory", true, Property.NodeScope); public static final Setting<ByteSizeValue> TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.memorySizeSetting("indices.breaker.total.limit", settings -> { if (USE_REAL_MEMORY_USAGE_SETTING.get(settings)) { return "95%"; } else { return "70%"; } }, Property.Dynamic, Property.NodeScope); public static final Setting<ByteSizeValue> FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.memorySizeSetting("indices.breaker.fielddata.limit", "40%", Property.Dynamic, Property.NodeScope); public static final Setting<Double> FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING = Setting.doubleSetting("indices.breaker.fielddata.overhead", 1.03d, 0.0d, Property.Dynamic, Property.NodeScope); public static final Setting<CircuitBreaker.Type> FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING = new Setting<>("indices.breaker.fielddata.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope); public static final Setting<ByteSizeValue> REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.memorySizeSetting("indices.breaker.request.limit", "60%", Property.Dynamic, Property.NodeScope); public static final Setting<Double> REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING = Setting.doubleSetting("indices.breaker.request.overhead", 1.0d, 0.0d, Property.Dynamic, Property.NodeScope); public static final Setting<CircuitBreaker.Type> REQUEST_CIRCUIT_BREAKER_TYPE_SETTING = new Setting<>("indices.breaker.request.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope); public static final Setting<ByteSizeValue> ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.memorySizeSetting("indices.breaker.accounting.limit", "100%", Property.Dynamic, Property.NodeScope); public static final Setting<Double> ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING = Setting.doubleSetting("indices.breaker.accounting.overhead", 1.0d, 0.0d, Property.Dynamic, Property.NodeScope); public static final Setting<CircuitBreaker.Type> ACCOUNTING_CIRCUIT_BREAKER_TYPE_SETTING = new Setting<>("indices.breaker.accounting.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope); public static final Setting<ByteSizeValue> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING = Setting.memorySizeSetting("network.breaker.inflight_requests.limit", "100%", Property.Dynamic, Property.NodeScope); public static final Setting<Double> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING = Setting.doubleSetting("network.breaker.inflight_requests.overhead", 2.0d, 0.0d, Property.Dynamic, Property.NodeScope); public static final Setting<CircuitBreaker.Type> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING = new Setting<>("network.breaker.inflight_requests.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope); private final boolean trackRealMemoryUsage; private volatile BreakerSettings parentSettings; private volatile BreakerSettings fielddataSettings; private volatile BreakerSettings inFlightRequestsSettings; private volatile BreakerSettings requestSettings; private volatile BreakerSettings accountingSettings; // Tripped count for when redistribution was attempted but wasn't successful private final AtomicLong parentTripCount = new AtomicLong(0); public HierarchyCircuitBreakerService(Settings settings, ClusterSettings clusterSettings) { super(); this.fielddataSettings = new BreakerSettings(CircuitBreaker.FIELDDATA, FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings), FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING.get(settings), CircuitBreaker.Durability.PERMANENT ); this.inFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS, IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings), IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING.get(settings), CircuitBreaker.Durability.TRANSIENT ); this.requestSettings = new BreakerSettings(CircuitBreaker.REQUEST, REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings), REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.get(settings), CircuitBreaker.Durability.TRANSIENT ); this.accountingSettings = new BreakerSettings(CircuitBreaker.ACCOUNTING, ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings), ACCOUNTING_CIRCUIT_BREAKER_TYPE_SETTING.get(settings), CircuitBreaker.Durability.PERMANENT ); this.parentSettings = new BreakerSettings(CircuitBreaker.PARENT, TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).getBytes(), 1.0, CircuitBreaker.Type.PARENT, null); if (logger.isTraceEnabled()) { logger.trace("parent circuit breaker with settings {}", this.parentSettings); } this.trackRealMemoryUsage = USE_REAL_MEMORY_USAGE_SETTING.get(settings); registerBreaker(this.requestSettings); registerBreaker(this.fielddataSettings); registerBreaker(this.inFlightRequestsSettings); registerBreaker(this.accountingSettings); clusterSettings.addSettingsUpdateConsumer(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, this::setTotalCircuitBreakerLimit, this::validateTotalCircuitBreakerLimit); clusterSettings.addSettingsUpdateConsumer(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setFieldDataBreakerLimit); clusterSettings.addSettingsUpdateConsumer(IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING, IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setInFlightRequestsBreakerLimit); clusterSettings.addSettingsUpdateConsumer(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setRequestBreakerLimit); clusterSettings.addSettingsUpdateConsumer(ACCOUNTING_CIRCUIT_BREAKER_LIMIT_SETTING, ACCOUNTING_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setAccountingBreakerLimit); } @Override public CircuitBreaker getBreaker(String name) { return this.breakers.get(name); } @Override public AllCircuitBreakerStats stats() { List<CircuitBreakerStats> allStats = new ArrayList<>(this.breakers.size()); // Gather the "estimated" count for the parent breaker by adding the // estimations for each individual breaker for (CircuitBreaker breaker : this.breakers.values()) { allStats.add(stats(breaker.getName())); } // Manually add the parent breaker settings since they aren't part of the breaker map allStats.add(new CircuitBreakerStats(CircuitBreaker.PARENT, parentSettings.getLimit(), memoryUsed(0L).totalUsage, 1.0, parentTripCount.get())); return new AllCircuitBreakerStats(allStats.toArray(new CircuitBreakerStats[allStats.size()])); } @Override public CircuitBreakerStats stats(String name) { CircuitBreaker breaker = this.breakers.get(name); return new CircuitBreakerStats(breaker.getName(), breaker.getLimit(), breaker.getUsed(), breaker.getOverhead(), breaker.getTrippedCount()); } /** * Allows to register a custom circuit breaker. * Warning: Will overwrite any existing custom breaker with the same name. */ @Override public void registerBreaker(BreakerSettings breakerSettings) { // Validate the settings validateSettings(new BreakerSettings[] {breakerSettings}); if (breakerSettings.getType() == CircuitBreaker.Type.NOOP) { CircuitBreaker breaker = new NoopCircuitBreaker(breakerSettings.getName()); breakers.put(breakerSettings.getName(), breaker); } else { CircuitBreaker oldBreaker; CircuitBreaker breaker = new ChildMemoryCircuitBreaker(breakerSettings, LogManager.getLogger(CHILD_LOGGER_PREFIX + breakerSettings.getName()), this, breakerSettings.getName()); for (;;) { oldBreaker = breakers.putIfAbsent(breakerSettings.getName(), breaker); if (oldBreaker == null) { return; } breaker = new ChildMemoryCircuitBreaker(breakerSettings, (ChildMemoryCircuitBreaker)oldBreaker, LogManager.getLogger(CHILD_LOGGER_PREFIX + breakerSettings.getName()), this, breakerSettings.getName()); if (breakers.replace(breakerSettings.getName(), oldBreaker, breaker)) { return; } } } } //...... }
- HierarchyCircuitBreakerService的构造器读取了fielddataSettings、inFlightRequestsSettings、requestSettings、accountingSettings,并使用它们registerBreaker;之后对clusterSettings执行addSettingsUpdateConsumer,添加一系列的UpdateConsumer;registerBreaker方法会创建ChildMemoryCircuitBreaker并更新到breakers中
小结
- CircuitBreakerService继承了AbstractLifecycleComponent,定义了registerBreaker、getBreaker、stats抽象方法;它有两个实现类分别为NoneCircuitBreakerService、HierarchyCircuitBreakerService
- NoneCircuitBreakerService不做熔断处理
- HierarchyCircuitBreakerService的构造器读取了fielddataSettings、inFlightRequestsSettings、requestSettings、accountingSettings,并使用它们registerBreaker;之后对clusterSettings执行addSettingsUpdateConsumer,添加一系列的UpdateConsumer;registerBreaker方法会创建ChildMemoryCircuitBreaker并更新到breakers中
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。