内容简介:本文主要研究一下elasticsearch的MasterFaultDetectionelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.javaelasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java
序
本文主要研究一下elasticsearch的MasterFaultDetection
FaultDetection
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.java
/** * A base class for {@link MasterFaultDetection} & {@link NodesFaultDetection}, * making sure both use the same setting. */ public abstract class FaultDetection implements Closeable { private static final Logger logger = LogManager.getLogger(FaultDetection.class); public static final Setting<Boolean> CONNECT_ON_NETWORK_DISCONNECT_SETTING = Setting.boolSetting("discovery.zen.fd.connect_on_network_disconnect", false, Property.NodeScope, Property.Deprecated); public static final Setting<TimeValue> PING_INTERVAL_SETTING = Setting.positiveTimeSetting("discovery.zen.fd.ping_interval", timeValueSeconds(1), Property.NodeScope, Property.Deprecated); public static final Setting<TimeValue> PING_TIMEOUT_SETTING = Setting.timeSetting("discovery.zen.fd.ping_timeout", timeValueSeconds(30), Property.NodeScope, Property.Deprecated); public static final Setting<Integer> PING_RETRIES_SETTING = Setting.intSetting("discovery.zen.fd.ping_retries", 3, Property.NodeScope, Property.Deprecated); public static final Setting<Boolean> REGISTER_CONNECTION_LISTENER_SETTING = Setting.boolSetting("discovery.zen.fd.register_connection_listener", true, Property.NodeScope, Property.Deprecated); protected final ThreadPool threadPool; protected final ClusterName clusterName; protected final TransportService transportService; // used mainly for testing, should always be true protected final boolean registerConnectionListener; protected final FDConnectionListener connectionListener; protected final boolean connectOnNetworkDisconnect; protected final TimeValue pingInterval; protected final TimeValue pingRetryTimeout; protected final int pingRetryCount; public FaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { this.threadPool = threadPool; this.transportService = transportService; this.clusterName = clusterName; this.connectOnNetworkDisconnect = CONNECT_ON_NETWORK_DISCONNECT_SETTING.get(settings); this.pingInterval = PING_INTERVAL_SETTING.get(settings); this.pingRetryTimeout = PING_TIMEOUT_SETTING.get(settings); this.pingRetryCount = PING_RETRIES_SETTING.get(settings); this.registerConnectionListener = REGISTER_CONNECTION_LISTENER_SETTING.get(settings); this.connectionListener = new FDConnectionListener(); if (registerConnectionListener) { transportService.addConnectionListener(connectionListener); } } @Override public void close() { transportService.removeConnectionListener(connectionListener); } /** * This method will be called when the {@link org.elasticsearch.transport.TransportService} raised a node disconnected event */ abstract void handleTransportDisconnect(DiscoveryNode node); private class FDConnectionListener implements TransportConnectionListener { @Override public void onNodeDisconnected(DiscoveryNode node) { AbstractRunnable runnable = new AbstractRunnable() { @Override public void onFailure(Exception e) { logger.warn("failed to handle transport disconnect for node: {}", node); } @Override protected void doRun() { handleTransportDisconnect(node); } }; threadPool.generic().execute(runnable); } } }
- FaultDetection实现了Closeable接口,它定义了FDConnectionListener,其构造器在registerConnectionListener为true的情况下会给transportService添加FDConnectionListener,而close方法则是将FDConnectionListener从transportService中移除;FaultDetection还定义了抽象方法handleTransportDisconnect
MasterFaultDetection
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java
public class MasterFaultDetection extends FaultDetection { private static final Logger logger = LogManager.getLogger(MasterFaultDetection.class); public static final String MASTER_PING_ACTION_NAME = "internal:discovery/zen/fd/master_ping"; public interface Listener { /** called when pinging the master failed, like a timeout, transport disconnects etc */ void onMasterFailure(DiscoveryNode masterNode, Throwable cause, String reason); } private final MasterService masterService; private final java.util.function.Supplier<ClusterState> clusterStateSupplier; private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>(); private volatile MasterPinger masterPinger; private final Object masterNodeMutex = new Object(); private volatile DiscoveryNode masterNode; private volatile int retryCount; private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean(); public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, java.util.function.Supplier<ClusterState> clusterStateSupplier, MasterService masterService, ClusterName clusterName) { super(settings, threadPool, transportService, clusterName); this.clusterStateSupplier = clusterStateSupplier; this.masterService = masterService; logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount); transportService.registerRequestHandler( MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, false, false, new MasterPingRequestHandler()); } @Override public void close() { super.close(); stop("closing"); this.listeners.clear(); } @Override protected void handleTransportDisconnect(DiscoveryNode node) { synchronized (masterNodeMutex) { if (!node.equals(this.masterNode)) { return; } if (connectOnNetworkDisconnect) { try { transportService.connectToNode(node); // if all is well, make sure we restart the pinger if (masterPinger != null) { masterPinger.stop(); } this.masterPinger = new MasterPinger(); // we use schedule with a 0 time value to run the pinger on the pool as it will run on later threadPool.schedule(masterPinger, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME); } catch (Exception e) { logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode); notifyMasterFailure(masterNode, null, "transport disconnected (with verified connect)"); } } else { logger.trace("[master] [{}] transport disconnected", node); notifyMasterFailure(node, null, "transport disconnected"); } } } private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) { if (notifiedMasterFailure.compareAndSet(false, true)) { try { threadPool.generic().execute(() -> { for (Listener listener : listeners) { listener.onMasterFailure(masterNode, cause, reason); } }); } catch (EsRejectedExecutionException e) { logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e); } stop("master failure, " + reason); } } //...... }
- MasterFaultDetection继承了FaultDetection,其构造器给transportService注册了MasterPingRequestHandler
- 其handleTransportDisconnect方法在connectOnNetworkDisconnect为true的情况下会对node进行重试,如果重试成功则重新注册MasterPinger的延时任务,如果重试失败或者是connectOnNetworkDisconnect为false的情况下会调用notifyMasterFailure方法
- notifyMasterFailure方法则会回调MasterFaultDetection.Listener的onMasterFailure方法
MasterPingRequestHandler
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java
private class MasterPingRequestHandler implements TransportRequestHandler<MasterPingRequest> { @Override public void messageReceived(final MasterPingRequest request, final TransportChannel channel, Task task) throws Exception { final DiscoveryNodes nodes = clusterStateSupplier.get().nodes(); // check if we are really the same master as the one we seemed to be think we are // this can happen if the master got "kill -9" and then another node started using the same port if (!request.masterNode.equals(nodes.getLocalNode())) { throw new ThisIsNotTheMasterYouAreLookingForException(); } // ping from nodes of version < 1.4.0 will have the clustername set to null if (request.clusterName != null && !request.clusterName.equals(clusterName)) { logger.trace("master fault detection ping request is targeted for a different [{}] cluster then us [{}]", request.clusterName, clusterName); throw new ThisIsNotTheMasterYouAreLookingForException("master fault detection ping request is targeted for a different [" + request.clusterName + "] cluster then us [" + clusterName + "]"); } // when we are elected as master or when a node joins, we use a cluster state update thread // to incorporate that information in the cluster state. That cluster state is published // before we make it available locally. This means that a master ping can come from a node // that has already processed the new CS but it is not known locally. // Therefore, if we fail we have to check again under a cluster state thread to make sure // all processing is finished. // if (!nodes.isLocalNodeElectedMaster() || !nodes.nodeExists(request.sourceNode)) { logger.trace("checking ping from {} under a cluster state thread", request.sourceNode); masterService.submitStateUpdateTask("master ping (from: " + request.sourceNode + ")", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { // if we are no longer master, fail... DiscoveryNodes nodes = currentState.nodes(); if (!nodes.nodeExists(request.sourceNode)) { throw new NodeDoesNotExistOnMasterException(); } return currentState; } @Override public void onNoLongerMaster(String source) { onFailure(source, new NotMasterException("local node is not master")); } @Override public void onFailure(String source, @Nullable Exception e) { if (e == null) { e = new ElasticsearchException("unknown error while processing ping"); } try { channel.sendResponse(e); } catch (IOException inner) { inner.addSuppressed(e); logger.warn("error while sending ping response", inner); } } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { try { channel.sendResponse(new MasterPingResponseResponse()); } catch (IOException e) { logger.warn("error while sending ping response", e); } } }); } else { // send a response, and note if we are connected to the master or not channel.sendResponse(new MasterPingResponseResponse()); } } } public static class MasterPingRequest extends TransportRequest { public DiscoveryNode sourceNode; private DiscoveryNode masterNode; private ClusterName clusterName; public MasterPingRequest() { } public MasterPingRequest(DiscoveryNode sourceNode, DiscoveryNode masterNode, ClusterName clusterName) { this.sourceNode = sourceNode; this.masterNode = masterNode; this.clusterName = clusterName; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); sourceNode = new DiscoveryNode(in); masterNode = new DiscoveryNode(in); clusterName = new ClusterName(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); sourceNode.writeTo(out); masterNode.writeTo(out); clusterName.writeTo(out); } } public static class MasterPingResponseResponse extends TransportResponse { public MasterPingResponseResponse() { } public MasterPingResponseResponse(StreamInput in) throws IOException { super(in); } }
- MasterPingRequestHandler用于响应MasterPingRequest请求,它正在localNode不是master或者sourceNode存在的前提下会执行ClusterStateUpdateTask,否则直接返回MasterPingResponseResponse
- ClusterStateUpdateTask的execute方法会校验request的sourceNode是否存在,如果不存在则抛出NodeDoesNotExistOnMasterException异常
- ClusterStateUpdateTask的onNoLongerMaster方法会调用onFailure方法,传递的异常为NotMasterException;onFailure方法判断异常是否为null,为null则创建ElasticsearchException异常,然后返回异常响应;clusterStateProcessed方法则返回MasterPingResponseResponse
ZenDiscovery.processNextCommittedClusterState
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener { //...... // return true if state has been sent to applier boolean processNextCommittedClusterState(String reason) { assert Thread.holdsLock(stateMutex); final ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess(); final ClusterState currentState = committedState.get(); // all pending states have been processed if (newClusterState == null) { return false; } assert newClusterState.nodes().getMasterNode() != null : "received a cluster state without a master"; assert !newClusterState.blocks().hasGlobalBlock(noMasterBlockService.getNoMasterBlock()) : "received a cluster state with a master block"; if (currentState.nodes().isLocalNodeElectedMaster() && newClusterState.nodes().isLocalNodeElectedMaster() == false) { handleAnotherMaster(currentState, newClusterState.nodes().getMasterNode(), newClusterState.version(), "via a new cluster state"); return false; } try { if (shouldIgnoreOrRejectNewClusterState(logger, currentState, newClusterState)) { String message = String.format( Locale.ROOT, "rejecting cluster state version [%d] uuid [%s] received from [%s]", newClusterState.version(), newClusterState.stateUUID(), newClusterState.nodes().getMasterNodeId() ); throw new IllegalStateException(message); } } catch (Exception e) { try { pendingStatesQueue.markAsFailed(newClusterState, e); } catch (Exception inner) { inner.addSuppressed(e); logger.error(() -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner); } return false; } if (currentState.blocks().hasGlobalBlock(noMasterBlockService.getNoMasterBlock())) { // its a fresh update from the master as we transition from a start of not having a master to having one logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId()); } if (currentState == newClusterState) { return false; } committedState.set(newClusterState); // update failure detection only after the state has been updated to prevent race condition with handleLeaveRequest // and handleNodeFailure as those check the current state to determine whether the failure is to be handled by this node if (newClusterState.nodes().isLocalNodeElectedMaster()) { // update the set of nodes to ping nodesFD.updateNodesAndPing(newClusterState); } else { // check to see that we monitor the correct master of the cluster if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) { masterFD.restart(newClusterState.nodes().getMasterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]"); } } //...... return true; } //...... }
- ZenDiscovery的processNextCommittedClusterState方法在当前node不是master的时候会在masterFD.masterNode()为null或者masterFD.masterNode()与newClusterState.nodes().getMasterNode()不同时执行masterFD.restart方法
MasterFaultDetection.restart
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java
public class MasterFaultDetection extends FaultDetection { //...... public void restart(DiscoveryNode masterNode, String reason) { synchronized (masterNodeMutex) { if (logger.isDebugEnabled()) { logger.debug("[master] restarting fault detection against master [{}], reason [{}]", masterNode, reason); } innerStop(); innerStart(masterNode); } } private void innerStart(final DiscoveryNode masterNode) { this.masterNode = masterNode; this.retryCount = 0; this.notifiedMasterFailure.set(false); if (masterPinger != null) { masterPinger.stop(); } this.masterPinger = new MasterPinger(); // we start pinging slightly later to allow the chosen master to complete it's own master election threadPool.schedule(masterPinger, pingInterval, ThreadPool.Names.SAME); } private void innerStop() { // also will stop the next ping schedule this.retryCount = 0; if (masterPinger != null) { masterPinger.stop(); masterPinger = null; } this.masterNode = null; } //...... }
- MasterFaultDetection的restart方法内部先执行innerStop,然后再执行innerStart;innerStop主要是执行masterPinger.stop()并设置masterPinger及masterNode为null;innerStart方法则创建并注册MasterPinger的延时任务,延时pingInterval执行
MasterPinger
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java
private class MasterPinger implements Runnable { private volatile boolean running = true; public void stop() { this.running = false; } @Override public void run() { if (!running) { // return and don't spawn... return; } final DiscoveryNode masterToPing = masterNode; if (masterToPing == null) { // master is null, should not happen, but we are still running, so reschedule threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME); return; } final MasterPingRequest request = new MasterPingRequest( clusterStateSupplier.get().nodes().getLocalNode(), masterToPing, clusterName); final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING) .withTimeout(pingRetryTimeout).build(); transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new TransportResponseHandler<MasterPingResponseResponse>() { @Override public MasterPingResponseResponse read(StreamInput in) throws IOException { return new MasterPingResponseResponse(in); } @Override public void handleResponse(MasterPingResponseResponse response) { if (!running) { return; } // reset the counter, we got a good result MasterFaultDetection.this.retryCount = 0; // check if the master node did not get switched on us..., if it did, we simply return with no reschedule if (masterToPing.equals(MasterFaultDetection.this.masterNode())) { // we don't stop on disconnection from master, we keep pinging it threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME); } } @Override public void handleException(TransportException exp) { if (!running) { return; } synchronized (masterNodeMutex) { // check if the master node did not get switched on us... if (masterToPing.equals(MasterFaultDetection.this.masterNode())) { if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { handleTransportDisconnect(masterToPing); return; } else if (exp.getCause() instanceof NotMasterException) { logger.debug("[master] pinging a master {} that is no longer a master", masterNode); notifyMasterFailure(masterToPing, exp, "no longer master"); return; } else if (exp.getCause() instanceof ThisIsNotTheMasterYouAreLookingForException) { logger.debug("[master] pinging a master {} that is not the master", masterNode); notifyMasterFailure(masterToPing, exp,"not master"); return; } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) { logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure" , masterNode); notifyMasterFailure(masterToPing, exp,"do not exists on master, act as master failure"); return; } int retryCount = ++MasterFaultDetection.this.retryCount; logger.trace(() -> new ParameterizedMessage( "[master] failed to ping [{}], retry [{}] out of [{}]", masterNode, retryCount, pingRetryCount), exp); if (retryCount >= pingRetryCount) { logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout); // not good, failure notifyMasterFailure(masterToPing, null, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout"); } else { // resend the request, not reschedule, rely on send timeout transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this); } } } } @Override public String executor() { return ThreadPool.Names.SAME; } } ); } } private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) { if (notifiedMasterFailure.compareAndSet(false, true)) { try { threadPool.generic().execute(() -> { for (Listener listener : listeners) { listener.onMasterFailure(masterNode, cause, reason); } }); } catch (EsRejectedExecutionException e) { logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e); } stop("master failure, " + reason); } }
- MasterPinger的run方法首先判断masterToPing是否为null,如果为null则在注册MasterPinger的延时任务;如果不为null则发送MasterPingRequest请求给masterToPing
- TransportResponseHandler的handleResponse方法会清空MasterFaultDetection.this.retryCount,然后判断masterNode是否变化,没有变化则继续注册MasterPinger的延时任务
- TransportResponseHandler的handleException方法会在masterNode没有变化的前提下对异常进行处理,如果是ConnectTransportException则执行handleTransportDisconnect方法,如果是NotMasterException、ThisIsNotTheMasterYouAreLookingForException、NodeDoesNotExistOnMasterException则执行notifyMasterFailure方法,其他异常则进行重试
- 重试时先递增MasterFaultDetection.this.retryCount,如果重试次数大于等于pingRetryCount则直接执行notifyMasterFailure方法,否则进行重试发送MasterPingRequest请求
- notifyMasterFailure方法则回调MasterFaultDetection.Listener的onMasterFailure方法
ZenDiscovery.MasterNodeFailureListener
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
private class MasterNodeFailureListener implements MasterFaultDetection.Listener { @Override public void onMasterFailure(DiscoveryNode masterNode, Throwable cause, String reason) { handleMasterGone(masterNode, cause, reason); } } private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) { if (lifecycleState() != Lifecycle.State.STARTED) { // not started, ignore a master failure return; } if (localNodeMaster()) { // we might get this on both a master telling us shutting down, and then the disconnect failure return; } logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause); synchronized (stateMutex) { if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) { // flush any pending cluster states from old master, so it will not be set as master again pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason)); rejoin("master left (reason = " + reason + ")"); } } } protected void rejoin(String reason) { assert Thread.holdsLock(stateMutex); ClusterState clusterState = committedState.get(); logger.warn("{}, current nodes: {}", reason, clusterState.nodes()); nodesFD.stop(); masterFD.stop(reason); // TODO: do we want to force a new thread if we actively removed the master? this is to give a full pinging cycle // before a decision is made. joinThreadControl.startNewThreadIfNotRunning(); if (clusterState.nodes().getMasterNodeId() != null) { // remove block if it already exists before adding new one assert clusterState.blocks().hasGlobalBlockWithId(noMasterBlockService.getNoMasterBlock().id()) == false : "NO_MASTER_BLOCK should only be added by ZenDiscovery"; ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks()) .addGlobalBlock(noMasterBlockService.getNoMasterBlock()) .build(); DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build(); clusterState = ClusterState.builder(clusterState) .blocks(clusterBlocks) .nodes(discoveryNodes) .build(); committedState.set(clusterState); clusterApplier.onNewClusterState(reason, this::clusterState, (source, e) -> {}); // don't wait for state to be applied } } private class JoinThreadControl { private final AtomicBoolean running = new AtomicBoolean(false); private final AtomicReference<Thread> currentJoinThread = new AtomicReference<>(); /** returns true if join thread control is started and there is currently an active join thread */ public boolean joinThreadActive() { Thread currentThread = currentJoinThread.get(); return running.get() && currentThread != null && currentThread.isAlive(); } /** returns true if join thread control is started and the supplied thread is the currently active joinThread */ public boolean joinThreadActive(Thread joinThread) { return running.get() && joinThread.equals(currentJoinThread.get()); } /** cleans any running joining thread and calls {@link #rejoin} */ public void stopRunningThreadAndRejoin(String reason) { assert Thread.holdsLock(stateMutex); currentJoinThread.set(null); rejoin(reason); } /** starts a new joining thread if there is no currently active one and join thread controlling is started */ public void startNewThreadIfNotRunning() { assert Thread.holdsLock(stateMutex); if (joinThreadActive()) { return; } threadPool.generic().execute(new Runnable() { @Override public void run() { Thread currentThread = Thread.currentThread(); if (!currentJoinThread.compareAndSet(null, currentThread)) { return; } while (running.get() && joinThreadActive(currentThread)) { try { innerJoinCluster(); return; } catch (Exception e) { logger.error("unexpected error while joining cluster, trying again", e); // Because we catch any exception here, we want to know in // tests if an uncaught exception got to this point and the test infra uncaught exception // leak detection can catch this. In practise no uncaught exception should leak assert ExceptionsHelper.reThrowIfNotNull(e); } } // cleaning the current thread from currentJoinThread is done by explicit calls. } }); } /** * marks the given joinThread as completed and makes sure another thread is running (starting one if needed) * If the given thread is not the currently running join thread, the command is ignored. */ public void markThreadAsDoneAndStartNew(Thread joinThread) { assert Thread.holdsLock(stateMutex); if (!markThreadAsDone(joinThread)) { return; } startNewThreadIfNotRunning(); } /** marks the given joinThread as completed. Returns false if the supplied thread is not the currently active join thread */ public boolean markThreadAsDone(Thread joinThread) { assert Thread.holdsLock(stateMutex); return currentJoinThread.compareAndSet(joinThread, null); } public void stop() { running.set(false); Thread joinThread = currentJoinThread.getAndSet(null); if (joinThread != null) { joinThread.interrupt(); } } public void start() { running.set(true); } }
- ZenDiscovery的MasterNodeFailureListener实现了MasterFaultDetection.Listener接口,其onMasterFailure方法执行的是handleMasterGone方法;handleMasterGone方法主要是执行pendingStatesQueue.failAllStatesAndClear,然后进行rejoin
- rejoin方法首先执行nodesFD.stop()及masterFD.stop(reason),然后触发joinThreadControl.startNewThreadIfNotRunning(),最后构造新的clusterState,执行clusterApplier.onNewClusterState
- joinThreadControl.startNewThreadIfNotRunning()方法主要是执行innerJoinCluster方法
小结
- FaultDetection实现了Closeable接口,它定义了FDConnectionListener,其构造器在registerConnectionListener为true的情况下会给transportService添加FDConnectionListener,而close方法则是将FDConnectionListener从transportService中移除;FaultDetection还定义了抽象方法handleTransportDisconnect
- MasterFaultDetection继承了FaultDetection,其构造器给transportService注册了MasterPingRequestHandler;其handleTransportDisconnect方法在connectOnNetworkDisconnect为true的情况下会对node进行重试,如果重试成功则重新注册MasterPinger的延时任务,如果重试失败或者是connectOnNetworkDisconnect为false的情况下会调用notifyMasterFailure方法;notifyMasterFailure方法则会回调MasterFaultDetection.Listener的onMasterFailure方法
- ZenDiscovery的MasterNodeFailureListener实现了MasterFaultDetection.Listener接口,其onMasterFailure方法执行的是handleMasterGone方法;handleMasterGone方法主要是执行pendingStatesQueue.failAllStatesAndClear,然后进行rejoin
- ZenDiscovery的processNextCommittedClusterState方法在当前node不是master的时候会在masterFD.masterNode()为null或者masterFD.masterNode()与newClusterState.nodes().getMasterNode()不同时执行masterFD.restart方法
- MasterFaultDetection的restart方法内部先执行innerStop,然后再执行innerStart;innerStop主要是执行masterPinger.stop()并设置masterPinger及masterNode为null;innerStart方法则创建并注册MasterPinger的延时任务,延时pingInterval执行
- MasterPinger的run方法首先判断masterToPing是否为null,如果为null则在注册MasterPinger的延时任务;如果不为null则发送MasterPingRequest请求给masterToPing;请求成功时会清空MasterFaultDetection.this.retryCount,然后判断masterNode是否变化,没有变化则继续注册MasterPinger的延时任务;请求失败则根据异常做不同处理,比如执行handleTransportDisconnect方法,或者执行notifyMasterFailure方法,或者则进行重试
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。