聊聊elasticsearch的NodesFaultDetection

栏目: 后端 · 发布时间: 6年前

内容简介:本文主要研究一下elasticsearch的NodesFaultDetectionelasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.javaelasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

本文主要研究一下elasticsearch的NodesFaultDetection

NodesFaultDetection

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

public class NodesFaultDetection extends AbstractComponent {

    public static interface Listener {

        void onNodeFailure(DiscoveryNode node, String reason);
    }

    private final ThreadPool threadPool;

    private final TransportService transportService;


    private final boolean connectOnNetworkDisconnect;

    private final TimeValue pingInterval;

    private final TimeValue pingRetryTimeout;

    private final int pingRetryCount;

    // used mainly for testing, should always be true
    private final boolean registerConnectionListener;


    private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();

    private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();

    private final FDConnectionListener connectionListener;

    private volatile DiscoveryNodes latestNodes = EMPTY_NODES;

    private volatile boolean running = false;

    public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService) {
        super(settings);
        this.threadPool = threadPool;
        this.transportService = transportService;

        this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", true);
        this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
        this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
        this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);
        this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true);

        logger.debug("[node  ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);

        transportService.registerHandler(PingRequestHandler.ACTION, new PingRequestHandler());

        this.connectionListener = new FDConnectionListener();
        if (registerConnectionListener) {
            transportService.addConnectionListener(connectionListener);
        }
    }

    public NodesFaultDetection start() {
        if (running) {
            return this;
        }
        running = true;
        return this;
    }

    public NodesFaultDetection stop() {
        if (!running) {
            return this;
        }
        running = false;
        return this;
    }

    public void close() {
        stop();
        transportService.removeHandler(PingRequestHandler.ACTION);
        transportService.removeConnectionListener(connectionListener);
    }

    //......
}
  • NodesFaultDetection继承了AbstractComponent,它定义了一个CopyOnWriteArrayList类型的listeners,一个ConcurrentMap的nodesFD,connectionListener、latestNodes、running等属性
  • 其构造器读取connect_on_network_disconnect( 默认true )、ping_interval( 默认1s )、ping_timeout( 默认30s )、ping_retries( 默认为3 )、register_connection_listener( 默认true )配置,然后给transportService注册了PingRequestHandler.ACTION的PingRequestHandler,添加了FDConnectionListener
  • start方法用于设置running为true;stop用于设置running为false;close方法先执行stop,然后从transportService移除PingRequestHandler.ACTION的handler,并移除connectionListener

PingRequestHandler

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> {

        public static final String ACTION = "discovery/zen/fd/ping";

        @Override
        public PingRequest newInstance() {
            return new PingRequest();
        }

        @Override
        public void messageReceived(PingRequest request, TransportChannel channel) throws Exception {
            // if we are not the node we are supposed to be pinged, send an exception
            // this can happen when a kill -9 is sent, and another node is started using the same port
            if (!latestNodes.localNodeId().equals(request.nodeId)) {
                throw new ElasticSearchIllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + latestNodes.localNodeId() + "]");
            }
            channel.sendResponse(new PingResponse());
        }

        @Override
        public String executor() {
            return ThreadPool.Names.SAME;
        }
    }

    static class PingRequest extends TransportRequest {

        // the (assumed) node id we are pinging
        private String nodeId;

        PingRequest() {
        }

        PingRequest(String nodeId) {
            this.nodeId = nodeId;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            nodeId = in.readString();
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeString(nodeId);
        }
    }

    private static class PingResponse extends TransportResponse {

        private PingResponse() {
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
        }
    }
  • PingRequestHandler的newInstance方法用于创建PingRequest,该对象定义了nodeId属性用于标识它要请求的目标nodeId;而messageReceived方法用于响应PingRequest请求,它会先判断目标nodeId是否跟localNodeId一致,一致的话则返回PingResponse

FDConnectionListener

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

private class FDConnectionListener implements TransportConnectionListener {
        @Override
        public void onNodeConnected(DiscoveryNode node) {
        }

        @Override
        public void onNodeDisconnected(DiscoveryNode node) {
            handleTransportDisconnect(node);
        }
    }

    private void handleTransportDisconnect(DiscoveryNode node) {
        if (!latestNodes.nodeExists(node.id())) {
            return;
        }
        NodeFD nodeFD = nodesFD.remove(node);
        if (nodeFD == null) {
            return;
        }
        if (!running) {
            return;
        }
        nodeFD.running = false;
        if (connectOnNetworkDisconnect) {
            try {
                transportService.connectToNode(node);
                nodesFD.put(node, new NodeFD());
                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(node));
            } catch (Exception e) {
                logger.trace("[node  ] [{}] transport disconnected (with verified connect)", node);
                notifyNodeFailure(node, "transport disconnected (with verified connect)");
            }
        } else {
            logger.trace("[node  ] [{}] transport disconnected", node);
            notifyNodeFailure(node, "transport disconnected");
        }
    }

    private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
        threadPool.generic().execute(new Runnable() {
            @Override
            public void run() {
                for (Listener listener : listeners) {
                    listener.onNodeFailure(node, reason);
                }
            }
        });
    }
  • FDConnectionListener在onNodeDisconnected的时候会执行handleTransportDisconnect;该方法会将该node从nodesFD中移除,标记该nodeFD的running为false
  • 如果connectOnNetworkDisconnect为true则对该node进行connect,成功则放入nodesFD,并注册对该node进行SendPingRequest的延时任务,延时pingInterval执行;如果connect异常或者connectOnNetworkDisconnect为false,否执行notifyNodeFailure方法
  • notifyNodeFailure方法则会触发NodesFaultDetection.Listener.onNodeFailure回调,这里回调ZenDiscovery的NodeFailureListener的onNodeFailure方法

ZenDiscovery

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {
    //......

    @Inject
    public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
                        TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
                        DiscoveryNodeService discoveryNodeService, ZenPingService pingService) {
        super(settings);
        this.clusterName = clusterName;
        this.threadPool = threadPool;
        this.clusterService = clusterService;
        this.transportService = transportService;
        this.discoveryNodeService = discoveryNodeService;
        this.pingService = pingService;

        // also support direct discovery.zen settings, for cases when it gets extended
        this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
        this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true);

        this.masterElectionFilterClientNodes = settings.getAsBoolean("discovery.zen.master_election.filter_client", true);
        this.masterElectionFilterDataNodes = settings.getAsBoolean("discovery.zen.master_election.filter_data", false);

        logger.debug("using ping.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", pingTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);

        this.electMaster = new ElectMasterService(settings);
        nodeSettingsService.addListener(new ApplySettings());

        this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
        this.masterFD.addListener(new MasterNodeFailureListener());

        this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
        this.nodesFD.addListener(new NodeFailureListener());

        this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener());
        this.pingService.setNodesProvider(this);
        this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());

        transportService.registerHandler(RejoinClusterRequestHandler.ACTION, new RejoinClusterRequestHandler());
    }

    protected void doStart() throws ElasticSearchException {
        Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
        // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
        String nodeId = UUID.randomBase64UUID();
        localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes);
        latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();
        nodesFD.updateNodes(latestDiscoNodes);
        pingService.start();

        // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
        asyncJoinCluster();
    }

    public void publish(ClusterState clusterState) {
        if (!master) {
            throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master");
        }
        latestDiscoNodes = clusterState.nodes();
        nodesFD.updateNodes(clusterState.nodes());
        publishClusterState.publish(clusterState);
    }

    private class NodeFailureListener implements NodesFaultDetection.Listener {

        @Override
        public void onNodeFailure(DiscoveryNode node, String reason) {
            handleNodeFailure(node, reason);
        }
    }

    private void handleNodeFailure(final DiscoveryNode node, String reason) {
        if (lifecycleState() != Lifecycle.State.STARTED) {
            // not started, ignore a node failure
            return;
        }
        if (!master) {
            // nothing to do here...
            return;
        }
        clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ProcessedClusterStateUpdateTask() {
            @Override
            public ClusterState execute(ClusterState currentState) {
                DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
                        .putAll(currentState.nodes())
                        .remove(node.id());
                latestDiscoNodes = builder.build();
                currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
                // check if we have enough master nodes, if not, we need to move into joining the cluster again
                if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
                    return rejoin(currentState, "not enough master nodes");
                }
                // eagerly run reroute to remove dead nodes from routing table
                RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build());
                return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
            }

            @Override
            public void clusterStateProcessed(ClusterState clusterState) {
                sendInitialStateEventIfNeeded();
            }
        });
    }

    //......
}
  • ZenDiscovery的构造器创建了NodesFaultDetection,并给它添加了NodeFailureListener;该listener实现了NodesFaultDetection.Listener接口,其onNodeFailure回调执行的是handleNodeFailure方法,它会执行ProcessedClusterStateUpdateTask,将该node从currentState.nodes()中移除,然后判断masterNode数量是否满足minimumMasterNodes,不够的话会执行rejoin方法,够的话则执行allocationService.reroute
  • 其doStart方法会根据配置文件的node配置创建localNode,然后加入到latestDiscoNodes中,之后执行nodesFD.updateNodes(latestDiscoNodes)方法,然后执行pingService.start()及asyncJoinCluster()
  • 其publish方法则根据clusterState的nodes来更新本地的latestDiscoNodes,然后执行nodesFD.updateNodes(latestDiscoNodes)方法,最后执行publishClusterState.publish(clusterState)

NodesFaultDetection.updateNodes

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

public class NodesFaultDetection extends AbstractComponent {
    //......

    public void updateNodes(DiscoveryNodes nodes) {
        DiscoveryNodes prevNodes = latestNodes;
        this.latestNodes = nodes;
        if (!running) {
            return;
        }
        DiscoveryNodes.Delta delta = nodes.delta(prevNodes);
        for (DiscoveryNode newNode : delta.addedNodes()) {
            if (newNode.id().equals(nodes.localNodeId())) {
                // no need to monitor the local node
                continue;
            }
            if (!nodesFD.containsKey(newNode)) {
                nodesFD.put(newNode, new NodeFD());
                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(newNode));
            }
        }
        for (DiscoveryNode removedNode : delta.removedNodes()) {
            nodesFD.remove(removedNode);
        }
    }

    //......
}
  • NodesFaultDetection提供了updateNodes方法用于更新自身的latestNodes,该方法调用了nodes.delta(prevNodes)来计算DiscoveryNodes.Delta,它的addedNodes方法返回新增的node,而emovedNodes()方法返回删除的node
  • 对于newNode先判断是否在nodesFD,如果不在的话,则会添加到nodesFD中,并注册一个SendPingRequest的延时任务,延时pingInterval执行
  • 对于removedNode则将其从nodesFD中移除;handleTransportDisconnect方法也会将一个disconnect的node从ndoesFD中移除,如果重试一次成功则会再次放入nodesFD中

SendPingRequest

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

private class SendPingRequest implements Runnable {

        private final DiscoveryNode node;

        private SendPingRequest(DiscoveryNode node) {
            this.node = node;
        }

        @Override
        public void run() {
            if (!running) {
                return;
            }
            transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout),
                    new BaseTransportResponseHandler<PingResponse>() {
                        @Override
                        public PingResponse newInstance() {
                            return new PingResponse();
                        }

                        @Override
                        public void handleResponse(PingResponse response) {
                            if (!running) {
                                return;
                            }
                            NodeFD nodeFD = nodesFD.get(node);
                            if (nodeFD != null) {
                                if (!nodeFD.running) {
                                    return;
                                }
                                nodeFD.retryCount = 0;
                                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, SendPingRequest.this);
                            }
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            // check if the master node did not get switched on us...
                            if (!running) {
                                return;
                            }
                            if (exp instanceof ConnectTransportException) {
                                // ignore this one, we already handle it by registering a connection listener
                                return;
                            }
                            NodeFD nodeFD = nodesFD.get(node);
                            if (nodeFD != null) {
                                if (!nodeFD.running) {
                                    return;
                                }
                                int retryCount = ++nodeFD.retryCount;
                                logger.trace("[node  ] failed to ping [{}], retry [{}] out of [{}]", exp, node, retryCount, pingRetryCount);
                                if (retryCount >= pingRetryCount) {
                                    logger.debug("[node  ] failed to ping [{}], tried [{}] times, each with  maximum [{}] timeout", node, pingRetryCount, pingRetryTimeout);
                                    // not good, failure
                                    if (nodesFD.remove(node) != null) {
                                        notifyNodeFailure(node, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
                                    }
                                } else {
                                    // resend the request, not reschedule, rely on send timeout
                                    transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()),
                                            options().withHighType().withTimeout(pingRetryTimeout), this);
                                }
                            }
                        }

                        @Override
                        public String executor() {
                            return ThreadPool.Names.SAME;
                        }
                    });
        }
    }
  • SendPingRequest方法会往目标node发送PingRequest,其超时时间为pingRetryTimeout;其handleResponse方法会判断该node是否在nodesFD中,如果已经被移除了则忽略,如果改nodeFD的running为false,也忽略,否则重置其retryCount,并重新注册SendPingRequest的延时任务,延时pingInterval执行
  • 如果请求出现TransportException则判断是否是ConnectTransportException,如果是则忽略,因为该异常已经由往transportService注册的FDConnectionListener的onNodeDisconnected来处理
  • 如果是其他异常则增加nodeFD.retryCount,当retryCount大于等于配置的pingRetryCount时,则会将该node从nodesFD中移除,并回调notifyNodeFailure方法,具体就是回调了ZenDiscovery的handleNodeFailure方法;如果没有超过配置的pingRetryCount则会进行重试,重新发送PingRequest请求

小结

对该node进行connect,成功则放入nodesFD,并注册对该node进行SendPingRequest的延时任务,延时pingInterval执行

doc


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Numerical Linear Algebra

Numerical Linear Algebra

Lloyd N. Trefethen、David Bau III / SIAM: Society for Industrial and Applied Mathematics / 1997-06-01 / USD 61.00

Numerical Linear Algebra is a concise, insightful, and elegant introduction to the field of numerical linear algebra.一起来看看 《Numerical Linear Algebra》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具