内容简介:本文主要研究一下Elasticsearch RestClient的DeadHostStateelasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.javaelasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RestClient.java
序
本文主要研究一下Elasticsearch RestClient的DeadHostState
DeadHostState
elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java
/** * Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and * when the host should be retried (based on number of previous failed attempts). * Class is immutable, a new copy of it should be created each time the state has to be changed. */ final class DeadHostState implements Comparable<DeadHostState> { private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1); static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30); private final int failedAttempts; private final long deadUntilNanos; private final TimeSupplier timeSupplier; /** * Build the initial dead state of a host. Useful when a working host stops functioning * and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so. * * @param timeSupplier a way to supply the current time and allow for unit testing */ DeadHostState(TimeSupplier timeSupplier) { this.failedAttempts = 1; this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS; this.timeSupplier = timeSupplier; } /** * Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence * it already failed for one or more consecutive times. The more failed attempts we register the longer we wait * to retry that same host again. Minimum is 1 minute (for a node the only failed once created * through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times) * * @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt */ DeadHostState(DeadHostState previousDeadHostState) { long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1), MAX_CONNECTION_TIMEOUT_NANOS); this.deadUntilNanos = previousDeadHostState.timeSupplier.nanoTime() + timeoutNanos; this.failedAttempts = previousDeadHostState.failedAttempts + 1; this.timeSupplier = previousDeadHostState.timeSupplier; } /** * Indicates whether it's time to retry to failed host or not. * * @return true if the host should be retried, false otherwise */ boolean shallBeRetried() { return timeSupplier.nanoTime() - deadUntilNanos > 0; } /** * Returns the timestamp (nanos) till the host is supposed to stay dead without being retried. * After that the host should be retried. */ long getDeadUntilNanos() { return deadUntilNanos; } int getFailedAttempts() { return failedAttempts; } @Override public int compareTo(DeadHostState other) { if (timeSupplier != other.timeSupplier) { throw new IllegalArgumentException("can't compare DeadHostStates with different clocks [" + timeSupplier + " != " + other.timeSupplier + "]"); } return Long.compare(deadUntilNanos, other.deadUntilNanos); } @Override public String toString() { return "DeadHostState{" + "failedAttempts=" + failedAttempts + ", deadUntilNanos=" + deadUntilNanos + ", timeSupplier=" + timeSupplier + '}'; } /** * Time supplier that makes timing aspects pluggable to ease testing */ interface TimeSupplier { TimeSupplier DEFAULT = new TimeSupplier() { @Override public long nanoTime() { return System.nanoTime(); } @Override public String toString() { return "nanoTime"; } }; long nanoTime(); } }
(long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1) timeSupplier.nanoTime() - deadUntilNanos > 0
RestClient
elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RestClient.java
public class RestClient implements Closeable { //...... /** * Select nodes to try and sorts them so that the first one will be tried initially, then the following ones * if the previous attempt failed and so on. Package private for testing. */ static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost, DeadHostState> blacklist, AtomicInteger lastNodeIndex, NodeSelector nodeSelector) throws IOException { /* * Sort the nodes into living and dead lists. */ List<Node> livingNodes = new ArrayList<>(Math.max(0, nodeTuple.nodes.size() - blacklist.size())); List<DeadNode> deadNodes = new ArrayList<>(blacklist.size()); for (Node node : nodeTuple.nodes) { DeadHostState deadness = blacklist.get(node.getHost()); if (deadness == null) { livingNodes.add(node); continue; } if (deadness.shallBeRetried()) { livingNodes.add(node); continue; } deadNodes.add(new DeadNode(node, deadness)); } if (false == livingNodes.isEmpty()) { /* * Normal state: there is at least one living node. If the * selector is ok with any over the living nodes then use them * for the request. */ List<Node> selectedLivingNodes = new ArrayList<>(livingNodes); nodeSelector.select(selectedLivingNodes); if (false == selectedLivingNodes.isEmpty()) { /* * Rotate the list using a global counter as the distance so subsequent * requests will try the nodes in a different order. */ Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement()); return selectedLivingNodes; } } /* * Last resort: there are no good nodes to use, either because * the selector rejected all the living nodes or because there aren't * any living ones. Either way, we want to revive a single dead node * that the NodeSelectors are OK with. We do this by passing the dead * nodes through the NodeSelector so it can have its say in which nodes * are ok. If the selector is ok with any of the nodes then we will take * the one in the list that has the lowest revival time and try it. */ if (false == deadNodes.isEmpty()) { final List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes); /* * We'd like NodeSelectors to remove items directly from deadNodes * so we can find the minimum after it is filtered without having * to compare many things. This saves us a sort on the unfiltered * list. */ nodeSelector.select(new Iterable<Node>() { @Override public Iterator<Node> iterator() { return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator()); } }); if (false == selectedDeadNodes.isEmpty()) { return singletonList(Collections.min(selectedDeadNodes).node); } } throw new IOException("NodeSelector [" + nodeSelector + "] rejected all nodes, " + "living " + livingNodes + " and dead " + deadNodes); } /** * Called after each successful request call. * Receives as an argument the host that was used for the successful request. */ private void onResponse(Node node) { DeadHostState removedHost = this.blacklist.remove(node.getHost()); if (logger.isDebugEnabled() && removedHost != null) { logger.debug("removed [" + node + "] from blacklist"); } } /** * Called after each failed attempt. * Receives as an argument the host that was used for the failed attempt. */ private void onFailure(Node node) { while(true) { DeadHostState previousDeadHostState = blacklist.putIfAbsent(node.getHost(), new DeadHostState(TimeSupplier.DEFAULT)); if (previousDeadHostState == null) { if (logger.isDebugEnabled()) { logger.debug("added [" + node + "] to blacklist"); } break; } if (blacklist.replace(node.getHost(), previousDeadHostState, new DeadHostState(previousDeadHostState))) { if (logger.isDebugEnabled()) { logger.debug("updated [" + node + "] already in blacklist"); } break; } } failureListener.onFailure(node); } //...... }
- RestClient的selectNodes方法首先将不在blacklist中的node,或者在blacklist中但是shallBeRetried返回为true的node归为livingNodes,之后通过nodeSelector来从这些livingNodes中选择一个node
- RestClient的onResponse方法会将该node的host从blacklist中移除
- RestClient的onFailure方法会往blacklist创建或更新host对应的DeadHostState,如果之前该host没有DeadHostState则使用TimeSupplier.DEFAULT创建一个新的并放入blacklist,如果该host已经有DeadHostState则使用该DeadHostState创建新的DeadHostState然后更新到blacklist中
小结
timeSupplier.nanoTime() - deadUntilNanos > 0 (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1)
doc
以上所述就是小编给大家介绍的《聊聊Elasticsearch RestClient的DeadHostState》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python Algorithms
Magnus Lie Hetland / Apress / 2010-11-24 / USD 49.99
Python Algorithms explains the Python approach to algorithm analysis and design. Written by Magnus Lie Hetland, author of Beginning Python, this book is sharply focused on classical algorithms, but it......一起来看看 《Python Algorithms》 这本书的介绍吧!