内容简介:本文主要研究一下elasticsearch的SeedHostsResolverelasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.javaelasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java
序
本文主要研究一下elasticsearch的SeedHostsResolver
ConfiguredHostsResolver
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
public interface ConfiguredHostsResolver {
/**
* Attempt to resolve the configured unicast hosts list to a list of transport addresses.
*
* @param consumer Consumer for the resolved list. May not be called if an error occurs or if another resolution attempt is in
* progress.
*/
void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer);
}
- ConfiguredHostsResolver接口定义了resolveConfiguredHosts方法用于解析配置的transport address列表
SeedHostsResolver
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java
public class SeedHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver {
public static final Setting<Integer> LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING =
Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Setting.Property.NodeScope,
Setting.Property.Deprecated);
public static final Setting<TimeValue> LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT =
Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5),
Setting.Property.NodeScope, Setting.Property.Deprecated);
public static final Setting<Integer> DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING =
Setting.intSetting("discovery.seed_resolver.max_concurrent_resolvers", 10, 0, Setting.Property.NodeScope);
public static final Setting<TimeValue> DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING =
Setting.positiveTimeSetting("discovery.seed_resolver.timeout", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope);
private static final Logger logger = LogManager.getLogger(SeedHostsResolver.class);
private final Settings settings;
private final AtomicBoolean resolveInProgress = new AtomicBoolean();
private final TransportService transportService;
private final SeedHostsProvider hostsProvider;
private final SetOnce<ExecutorService> executorService = new SetOnce<>();
private final TimeValue resolveTimeout;
private final String nodeName;
private final int concurrentConnects;
public SeedHostsResolver(String nodeName, Settings settings, TransportService transportService,
SeedHostsProvider seedProvider) {
this.settings = settings;
this.nodeName = nodeName;
this.transportService = transportService;
this.hostsProvider = seedProvider;
resolveTimeout = getResolveTimeout(settings);
concurrentConnects = getMaxConcurrentResolvers(settings);
}
public static int getMaxConcurrentResolvers(Settings settings) {
if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.exists(settings)) {
if (DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.exists(settings)) {
throw new IllegalArgumentException("it is forbidden to set both ["
+ DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.getKey() + "] and ["
+ LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.getKey() + "]");
}
return LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
}
return DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.get(settings);
}
public static TimeValue getResolveTimeout(Settings settings) {
if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.exists(settings)) {
if (DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.exists(settings)) {
throw new IllegalArgumentException("it is forbidden to set both ["
+ DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.getKey() + "] and ["
+ LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.getKey() + "]");
}
return LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
}
return DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.get(settings);
}
/**
* Resolves a list of hosts to a list of transport addresses. Each host is resolved into a transport address (or a collection of
* addresses if the number of ports is greater than one). Host lookups are done in parallel using specified executor service up
* to the specified resolve timeout.
*
* @param executorService the executor service used to parallelize hostname lookups
* @param logger logger used for logging messages regarding hostname lookups
* @param hosts the hosts to resolve
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
* @param transportService the transport service
* @param resolveTimeout the timeout before returning from hostname lookups
* @return a list of resolved transport addresses
*/
public static List<TransportAddress> resolveHostsLists(
final ExecutorService executorService,
final Logger logger,
final List<String> hosts,
final int limitPortCounts,
final TransportService transportService,
final TimeValue resolveTimeout) {
Objects.requireNonNull(executorService);
Objects.requireNonNull(logger);
Objects.requireNonNull(hosts);
Objects.requireNonNull(transportService);
Objects.requireNonNull(resolveTimeout);
if (resolveTimeout.nanos() < 0) {
throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
}
// create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
final List<Callable<TransportAddress[]>> callables =
hosts
.stream()
.map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))
.collect(Collectors.toList());
final List<Future<TransportAddress[]>> futures;
try {
futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Collections.emptyList();
}
final List<TransportAddress> transportAddresses = new ArrayList<>();
final Set<TransportAddress> localAddresses = new HashSet<>();
localAddresses.add(transportService.boundAddress().publishAddress());
localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));
// ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
// hostname with the corresponding task by iterating together
final Iterator<String> it = hosts.iterator();
for (final Future<TransportAddress[]> future : futures) {
final String hostname = it.next();
if (!future.isCancelled()) {
assert future.isDone();
try {
final TransportAddress[] addresses = future.get();
logger.trace("resolved host [{}] to {}", hostname, addresses);
for (int addressId = 0; addressId < addresses.length; addressId++) {
final TransportAddress address = addresses[addressId];
// no point in pinging ourselves
if (localAddresses.contains(address) == false) {
transportAddresses.add(address);
}
}
} catch (final ExecutionException e) {
assert e.getCause() != null;
final String message = "failed to resolve host [" + hostname + "]";
logger.warn(message, e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
}
} else {
logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
}
}
return Collections.unmodifiableList(transportAddresses);
}
@Override
protected void doStart() {
logger.debug("using max_concurrent_resolvers [{}], resolver timeout [{}]", concurrentConnects, resolveTimeout);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_configured_hosts_resolver]");
executorService.set(EsExecutors.newScaling(nodeName + "/" + "unicast_configured_hosts_resolver",
0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory, transportService.getThreadPool().getThreadContext()));
}
@Override
protected void doStop() {
ThreadPool.terminate(executorService.get(), 10, TimeUnit.SECONDS);
}
@Override
protected void doClose() {
}
@Override
public void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer) {
if (lifecycle.started() == false) {
logger.debug("resolveConfiguredHosts: lifecycle is {}, not proceeding", lifecycle);
return;
}
if (resolveInProgress.compareAndSet(false, true)) {
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug("failure when resolving unicast hosts list", e);
}
@Override
protected void doRun() {
if (lifecycle.started() == false) {
logger.debug("resolveConfiguredHosts.doRun: lifecycle is {}, not proceeding", lifecycle);
return;
}
List<TransportAddress> providedAddresses
= hostsProvider.getSeedAddresses((hosts, limitPortCounts)
-> resolveHostsLists(executorService.get(), logger, hosts, limitPortCounts,
transportService, resolveTimeout));
consumer.accept(providedAddresses);
}
@Override
public void onAfter() {
resolveInProgress.set(false);
}
@Override
public String toString() {
return "SeedHostsResolver resolving unicast hosts list";
}
});
}
}
}
使用线程池并发执行transportService.addressesFromString
小结
使用线程池并发执行transportService.addressesFromString
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Mathematica Cookbook
Sal Mangano / O'Reilly Media / 2009 / GBP 51.99
As the leading software application for symbolic mathematics, Mathematica is standard in many environments that rely on math, such as science, engineering, financial analysis, software development, an......一起来看看 《Mathematica Cookbook》 这本书的介绍吧!