内容简介:本文主要研究一下elasticsearch的PeerFinderelasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeersRequest.javaelasticsearch-7.0.0/server/src/main/java/org/elasticsearch/cluster/coordination/PeersResponse.java
序
本文主要研究一下elasticsearch的PeerFinder
PeersRequest
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeersRequest.java
public class PeersRequest extends TransportRequest { private final DiscoveryNode sourceNode; private final List<DiscoveryNode> knownPeers; public PeersRequest(DiscoveryNode sourceNode, List<DiscoveryNode> knownPeers) { assert knownPeers.contains(sourceNode) == false : "local node is not a peer"; this.sourceNode = sourceNode; this.knownPeers = knownPeers; } public PeersRequest(StreamInput in) throws IOException { super(in); sourceNode = new DiscoveryNode(in); knownPeers = in.readList(DiscoveryNode::new); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); sourceNode.writeTo(out); out.writeList(knownPeers); } public List<DiscoveryNode> getKnownPeers() { return knownPeers; } public DiscoveryNode getSourceNode() { return sourceNode; } @Override public String toString() { return "PeersRequest{" + "sourceNode=" + sourceNode + ", knownPeers=" + knownPeers + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; PeersRequest that = (PeersRequest) o; return Objects.equals(sourceNode, that.sourceNode) && Objects.equals(knownPeers, that.knownPeers); } @Override public int hashCode() { return Objects.hash(sourceNode, knownPeers); } }
- PeersRequest有两个属性,分别是sourceNode以及knownPeers
PeersResponse
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/cluster/coordination/PeersResponse.java
public class PeersResponse extends TransportResponse { private final Optional<DiscoveryNode> masterNode; private final List<DiscoveryNode> knownPeers; private final long term; public PeersResponse(Optional<DiscoveryNode> masterNode, List<DiscoveryNode> knownPeers, long term) { assert masterNode.isPresent() == false || knownPeers.isEmpty(); this.masterNode = masterNode; this.knownPeers = knownPeers; this.term = term; } public PeersResponse(StreamInput in) throws IOException { masterNode = Optional.ofNullable(in.readOptionalWriteable(DiscoveryNode::new)); knownPeers = in.readList(DiscoveryNode::new); term = in.readLong(); assert masterNode.isPresent() == false || knownPeers.isEmpty(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeOptionalWriteable(masterNode.orElse(null)); out.writeList(knownPeers); out.writeLong(term); } /** * @return the node that is currently leading, according to the responding node. */ public Optional<DiscoveryNode> getMasterNode() { return masterNode; } /** * @return the collection of known peers of the responding node, or an empty collection if the responding node believes there * is currently a leader. */ public List<DiscoveryNode> getKnownPeers() { return knownPeers; } /** * @return the current term of the responding node. If the responding node is the leader then this is the term in which it is * currently leading. */ public long getTerm() { return term; } @Override public String toString() { return "PeersResponse{" + "masterNode=" + masterNode + ", knownPeers=" + knownPeers + ", term=" + term + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; PeersResponse that = (PeersResponse) o; return term == that.term && Objects.equals(masterNode, that.masterNode) && Objects.equals(knownPeers, that.knownPeers); } @Override public int hashCode() { return Objects.hash(masterNode, knownPeers, term); } }
- PeersResponse有三个属性,分别是masterNode、knownPeers、term
PeerFinder
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
public abstract class PeerFinder { private static final Logger logger = LogManager.getLogger(PeerFinder.class); public static final String REQUEST_PEERS_ACTION_NAME = "internal:discovery/request_peers"; // the time between attempts to find all peers public static final Setting<TimeValue> DISCOVERY_FIND_PEERS_INTERVAL_SETTING = Setting.timeSetting("discovery.find_peers_interval", TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); public static final Setting<TimeValue> DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING = Setting.timeSetting("discovery.request_peers_timeout", TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); private final Settings settings; private final TimeValue findPeersInterval; private final TimeValue requestPeersTimeout; private final Object mutex = new Object(); private final TransportService transportService; private final TransportAddressConnector transportAddressConnector; private final ConfiguredHostsResolver configuredHostsResolver; private volatile long currentTerm; private boolean active; private DiscoveryNodes lastAcceptedNodes; private final Map<TransportAddress, Peer> peersByAddress = new LinkedHashMap<>(); private Optional<DiscoveryNode> leader = Optional.empty(); private volatile List<TransportAddress> lastResolvedAddresses = emptyList(); public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver) { this.settings = settings; findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings); this.transportService = transportService; this.transportAddressConnector = transportAddressConnector; this.configuredHostsResolver = configuredHostsResolver; transportService.registerRequestHandler(REQUEST_PEERS_ACTION_NAME, Names.GENERIC, false, false, PeersRequest::new, (request, channel, task) -> channel.sendResponse(handlePeersRequest(request))); transportService.registerRequestHandler(UnicastZenPing.ACTION_NAME, Names.GENERIC, false, false, UnicastZenPing.UnicastPingRequest::new, new Zen1UnicastPingRequestHandler()); } public void activate(final DiscoveryNodes lastAcceptedNodes) { logger.trace("activating with {}", lastAcceptedNodes); synchronized (mutex) { assert assertInactiveWithNoKnownPeers(); active = true; this.lastAcceptedNodes = lastAcceptedNodes; leader = Optional.empty(); handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected } onFoundPeersUpdated(); // trigger a check for a quorum already } public void deactivate(DiscoveryNode leader) { final boolean peersRemoved; synchronized (mutex) { logger.trace("deactivating and setting leader to {}", leader); active = false; peersRemoved = handleWakeUp(); this.leader = Optional.of(leader); assert assertInactiveWithNoKnownPeers(); } if (peersRemoved) { onFoundPeersUpdated(); } } // exposed to subclasses for testing protected final boolean holdsLock() { return Thread.holdsLock(mutex); } private boolean assertInactiveWithNoKnownPeers() { assert holdsLock() : "PeerFinder mutex not held"; assert active == false; assert peersByAddress.isEmpty() : peersByAddress.keySet(); return true; } PeersResponse handlePeersRequest(PeersRequest peersRequest) { synchronized (mutex) { assert peersRequest.getSourceNode().equals(getLocalNode()) == false; final List<DiscoveryNode> knownPeers; if (active) { assert leader.isPresent() == false : leader; if (peersRequest.getSourceNode().isMasterNode()) { startProbe(peersRequest.getSourceNode().getAddress()); } peersRequest.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(this::startProbe); knownPeers = getFoundPeersUnderLock(); } else { assert leader.isPresent() || lastAcceptedNodes == null; knownPeers = emptyList(); } return new PeersResponse(leader, knownPeers, currentTerm); } } // exposed for checking invariant in o.e.c.c.Coordinator (public since this is a different package) public Optional<DiscoveryNode> getLeader() { synchronized (mutex) { return leader; } } // exposed for checking invariant in o.e.c.c.Coordinator (public since this is a different package) public long getCurrentTerm() { return currentTerm; } public void setCurrentTerm(long currentTerm) { this.currentTerm = currentTerm; } private DiscoveryNode getLocalNode() { final DiscoveryNode localNode = transportService.getLocalNode(); assert localNode != null; return localNode; } protected abstract void onActiveMasterFound(DiscoveryNode masterNode, long term); protected abstract void onFoundPeersUpdated(); public List<TransportAddress> getLastResolvedAddresses() { return lastResolvedAddresses; } public Iterable<DiscoveryNode> getFoundPeers() { synchronized (mutex) { return getFoundPeersUnderLock(); } } private List<DiscoveryNode> getFoundPeersUnderLock() { assert holdsLock() : "PeerFinder mutex not held"; return peersByAddress.values().stream() .map(Peer::getDiscoveryNode).filter(Objects::nonNull).distinct().collect(Collectors.toList()); } private Peer createConnectingPeer(TransportAddress transportAddress) { Peer peer = new Peer(transportAddress); peer.establishConnection(); return peer; } /** * @return whether any peers were removed due to disconnection */ private boolean handleWakeUp() { assert holdsLock() : "PeerFinder mutex not held"; final boolean peersRemoved = peersByAddress.values().removeIf(Peer::handleWakeUp); if (active == false) { logger.trace("not active"); return peersRemoved; } logger.trace("probing master nodes from cluster state: {}", lastAcceptedNodes); for (ObjectCursor<DiscoveryNode> discoveryNodeObjectCursor : lastAcceptedNodes.getMasterNodes().values()) { startProbe(discoveryNodeObjectCursor.value.getAddress()); } configuredHostsResolver.resolveConfiguredHosts(providedAddresses -> { synchronized (mutex) { lastResolvedAddresses = providedAddresses; logger.trace("probing resolved transport addresses {}", providedAddresses); providedAddresses.forEach(this::startProbe); } }); transportService.getThreadPool().scheduleUnlessShuttingDown(findPeersInterval, Names.GENERIC, new AbstractRunnable() { @Override public boolean isForceExecution() { return true; } @Override public void onFailure(Exception e) { assert false : e; logger.debug("unexpected exception in wakeup", e); } @Override protected void doRun() { synchronized (mutex) { if (handleWakeUp() == false) { return; } } onFoundPeersUpdated(); } @Override public String toString() { return "PeerFinder handling wakeup"; } }); return peersRemoved; } protected void startProbe(TransportAddress transportAddress) { assert holdsLock() : "PeerFinder mutex not held"; if (active == false) { logger.trace("startProbe({}) not running", transportAddress); return; } if (transportAddress.equals(getLocalNode().getAddress())) { logger.trace("startProbe({}) not probing local node", transportAddress); return; } peersByAddress.computeIfAbsent(transportAddress, this::createConnectingPeer); } private class Zen1UnicastPingRequestHandler implements TransportRequestHandler<UnicastZenPing.UnicastPingRequest> { @Override public void messageReceived(UnicastZenPing.UnicastPingRequest request, TransportChannel channel, Task task) throws Exception { final PeersRequest peersRequest = new PeersRequest(request.pingResponse.node(), Optional.ofNullable(request.pingResponse.master()).map(Collections::singletonList).orElse(emptyList())); final PeersResponse peersResponse = handlePeersRequest(peersRequest); final List<ZenPing.PingResponse> pingResponses = new ArrayList<>(); final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); pingResponses.add(new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(transportService.getLocalNode()), peersResponse.getMasterNode().orElse(null), clusterName, ClusterState.UNKNOWN_VERSION)); peersResponse.getKnownPeers().forEach(dn -> pingResponses.add( new ZenPing.PingResponse(ZenPing.PingResponse.FAKE_PING_ID, isZen1Node(dn) ? dn : createDiscoveryNodeWithImpossiblyHighId(dn), null, clusterName, ClusterState.UNKNOWN_VERSION))); channel.sendResponse(new UnicastZenPing.UnicastPingResponse(request.id, pingResponses.toArray(new ZenPing.PingResponse[0]))); } } //...... }
- PeerFinder的构造器注册了两个handler,一个是针对REQUEST_PEERS_ACTION_NAME,执行handlePeersRequest;一个是针对UnicastZenPing.ACTION_NAME,其handler为Zen1UnicastPingRequestHandler,该handler里头也调用了handlePeersRequest方法
- handlePeersRequest主要是针对masterNode及peersRequest.getKnownPeers()挨个执行startProbe,然后通过getFoundPeersUnderLock设置knownPeers,最后通过leader、knownPeers及currentTerm构造PeersResponse返回
- startProbe方法主要是执行createConnectingPeer,并将结果放入key为transportAddress的名为peersByAddress的map中;createConnectingPeer方法主要是创建Peer,然后调用Peer的establishConnection方法
Peer
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
private class Peer { private final TransportAddress transportAddress; private SetOnce<DiscoveryNode> discoveryNode = new SetOnce<>(); private volatile boolean peersRequestInFlight; Peer(TransportAddress transportAddress) { this.transportAddress = transportAddress; } @Nullable DiscoveryNode getDiscoveryNode() { return discoveryNode.get(); } boolean handleWakeUp() { assert holdsLock() : "PeerFinder mutex not held"; if (active == false) { return true; } final DiscoveryNode discoveryNode = getDiscoveryNode(); // may be null if connection not yet established if (discoveryNode != null) { if (transportService.nodeConnected(discoveryNode)) { if (peersRequestInFlight == false) { requestPeers(); } } else { logger.trace("{} no longer connected", this); return true; } } return false; } void establishConnection() { assert holdsLock() : "PeerFinder mutex not held"; assert getDiscoveryNode() == null : "unexpectedly connected to " + getDiscoveryNode(); assert active; logger.trace("{} attempting connection", this); transportAddressConnector.connectToRemoteMasterNode(transportAddress, new ActionListener<DiscoveryNode>() { @Override public void onResponse(DiscoveryNode remoteNode) { assert remoteNode.isMasterNode() : remoteNode + " is not master-eligible"; assert remoteNode.equals(getLocalNode()) == false : remoteNode + " is the local node"; synchronized (mutex) { if (active == false) { return; } assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get(); discoveryNode.set(remoteNode); requestPeers(); } assert holdsLock() == false : "PeerFinder mutex is held in error"; onFoundPeersUpdated(); } @Override public void onFailure(Exception e) { logger.debug(() -> new ParameterizedMessage("{} connection failed", Peer.this), e); synchronized (mutex) { peersByAddress.remove(transportAddress); } } }); } private void requestPeers() { assert holdsLock() : "PeerFinder mutex not held"; assert peersRequestInFlight == false : "PeersRequest already in flight"; assert active; final DiscoveryNode discoveryNode = getDiscoveryNode(); assert discoveryNode != null : "cannot request peers without first connecting"; if (discoveryNode.equals(getLocalNode())) { logger.trace("{} not requesting peers from local node", this); return; } logger.trace("{} requesting peers", this); peersRequestInFlight = true; final List<DiscoveryNode> knownNodes = getFoundPeersUnderLock(); final TransportResponseHandler<PeersResponse> peersResponseHandler = new TransportResponseHandler<PeersResponse>() { @Override public PeersResponse read(StreamInput in) throws IOException { return new PeersResponse(in); } @Override public void handleResponse(PeersResponse response) { logger.trace("{} received {}", Peer.this, response); synchronized (mutex) { if (active == false) { return; } peersRequestInFlight = false; response.getMasterNode().map(DiscoveryNode::getAddress).ifPresent(PeerFinder.this::startProbe); response.getKnownPeers().stream().map(DiscoveryNode::getAddress).forEach(PeerFinder.this::startProbe); } if (response.getMasterNode().equals(Optional.of(discoveryNode))) { // Must not hold lock here to avoid deadlock assert holdsLock() == false : "PeerFinder mutex is held in error"; onActiveMasterFound(discoveryNode, response.getTerm()); } } @Override public void handleException(TransportException exp) { peersRequestInFlight = false; logger.debug(new ParameterizedMessage("{} peers request failed", Peer.this), exp); } @Override public String executor() { return Names.GENERIC; } }; final String actionName; final TransportRequest transportRequest; final TransportResponseHandler<?> transportResponseHandler; if (isZen1Node(discoveryNode)) { actionName = UnicastZenPing.ACTION_NAME; transportRequest = new UnicastZenPing.UnicastPingRequest(1, ZenDiscovery.PING_TIMEOUT_SETTING.get(settings), new ZenPing.PingResponse(createDiscoveryNodeWithImpossiblyHighId(getLocalNode()), null, ClusterName.CLUSTER_NAME_SETTING.get(settings), ClusterState.UNKNOWN_VERSION)); transportResponseHandler = peersResponseHandler.wrap(ucResponse -> { Optional<DiscoveryNode> optionalMasterNode = Arrays.stream(ucResponse.pingResponses) .filter(pr -> discoveryNode.equals(pr.node()) && discoveryNode.equals(pr.master())) .map(ZenPing.PingResponse::node) .findFirst(); List<DiscoveryNode> discoveredNodes = new ArrayList<>(); if (optionalMasterNode.isPresent() == false) { Arrays.stream(ucResponse.pingResponses).map(PingResponse::master).filter(Objects::nonNull) .forEach(discoveredNodes::add); Arrays.stream(ucResponse.pingResponses).map(PingResponse::node).forEach(discoveredNodes::add); } return new PeersResponse(optionalMasterNode, discoveredNodes, 0L); }, UnicastZenPing.UnicastPingResponse::new); } else { actionName = REQUEST_PEERS_ACTION_NAME; transportRequest = new PeersRequest(getLocalNode(), knownNodes); transportResponseHandler = peersResponseHandler; } transportService.sendRequest(discoveryNode, actionName, transportRequest, TransportRequestOptions.builder().withTimeout(requestPeersTimeout).build(), transportResponseHandler); } @Override public String toString() { return "Peer{" + "transportAddress=" + transportAddress + ", discoveryNode=" + discoveryNode.get() + ", peersRequestInFlight=" + peersRequestInFlight + '}'; } }
- Peer的establishConnection方法主要是通过transportAddressConnector.connectToRemoteMasterNode请求masterNode,在结果成功返回时执行requestPeers及onFoundPeersUpdated方法;如果出现异常则从peersByAddress移除该transportAddress对应的Peer
- requestPeers方法通过getFoundPeersUnderLock方法获取knownNodes构造PeersRequest,然后通过transportService.sendRequest向discoveryNode发送请求,请求成功返回则执行masterNode的startProbe以及knownPeers的startProbe方法,如果当前discoveryNode是masterNode,则触发onActiveMasterFound方法
- 7.0版本的代码针对之前的版本做了不同的处理,requestPeers方法发送的request为UnicastZenPing.UnicastPingRequest
小结
- PeerFinder的构造器注册了两个handler,一个是针对REQUEST_PEERS_ACTION_NAME,执行handlePeersRequest;一个是针对UnicastZenPing.ACTION_NAME,其handler为Zen1UnicastPingRequestHandler,该handler里头也调用了handlePeersRequest方法
- handlePeersRequest主要是针对masterNode及peersRequest.getKnownPeers()挨个执行startProbe,然后通过getFoundPeersUnderLock设置knownPeers,最后通过leader、knownPeers及currentTerm构造PeersResponse返回;startProbe方法主要是执行createConnectingPeer,并将结果放入key为transportAddress的名为peersByAddress的map中;createConnectingPeer方法主要是创建Peer,然后调用Peer的establishConnection方法
- Peer的establishConnection方法主要是通过transportAddressConnector.connectToRemoteMasterNode请求masterNode,在结果成功返回时执行requestPeers及onFoundPeersUpdated方法;如果出现异常则从peersByAddress移除该transportAddress对应的Peer;requestPeers方法通过getFoundPeersUnderLock方法获取knownNodes构造PeersRequest,然后通过transportService.sendRequest向discoveryNode发送请求,请求成功返回则执行masterNode的startProbe以及knownPeers的startProbe方法,如果当前discoveryNode是masterNode,则触发onActiveMasterFound方法
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。