博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊elasticsearch的SeedHostsResolver
阅读量:6583 次
发布时间:2019-06-24

本文共 10192 字,大约阅读时间需要 33 分钟。

本文主要研究一下elasticsearch的SeedHostsResolver

ConfiguredHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

public interface ConfiguredHostsResolver {        /**         * Attempt to resolve the configured unicast hosts list to a list of transport addresses.         *         * @param consumer Consumer for the resolved list. May not be called if an error occurs or if another resolution attempt is in         *                 progress.         */        void resolveConfiguredHosts(Consumer
> consumer); }复制代码
  • ConfiguredHostsResolver接口定义了resolveConfiguredHosts方法用于解析配置的transport address列表

SeedHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java

public class SeedHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver {    public static final Setting
LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING = Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Setting.Property.NodeScope, Setting.Property.Deprecated); public static final Setting
LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT = Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope, Setting.Property.Deprecated); public static final Setting
DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING = Setting.intSetting("discovery.seed_resolver.max_concurrent_resolvers", 10, 0, Setting.Property.NodeScope); public static final Setting
DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING = Setting.positiveTimeSetting("discovery.seed_resolver.timeout", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); private static final Logger logger = LogManager.getLogger(SeedHostsResolver.class); private final Settings settings; private final AtomicBoolean resolveInProgress = new AtomicBoolean(); private final TransportService transportService; private final SeedHostsProvider hostsProvider; private final SetOnce
executorService = new SetOnce<>(); private final TimeValue resolveTimeout; private final String nodeName; private final int concurrentConnects; public SeedHostsResolver(String nodeName, Settings settings, TransportService transportService, SeedHostsProvider seedProvider) { this.settings = settings; this.nodeName = nodeName; this.transportService = transportService; this.hostsProvider = seedProvider; resolveTimeout = getResolveTimeout(settings); concurrentConnects = getMaxConcurrentResolvers(settings); } public static int getMaxConcurrentResolvers(Settings settings) { if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.exists(settings)) { if (DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.exists(settings)) { throw new IllegalArgumentException("it is forbidden to set both [" + DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.getKey() + "] and [" + LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.getKey() + "]"); } return LEGACY_DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); } return DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING.get(settings); } public static TimeValue getResolveTimeout(Settings settings) { if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.exists(settings)) { if (DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.exists(settings)) { throw new IllegalArgumentException("it is forbidden to set both [" + DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.getKey() + "] and [" + LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.getKey() + "]"); } return LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); } return DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.get(settings); } /** * Resolves a list of hosts to a list of transport addresses. Each host is resolved into a transport address (or a collection of * addresses if the number of ports is greater than one). Host lookups are done in parallel using specified executor service up * to the specified resolve timeout. * * @param executorService the executor service used to parallelize hostname lookups * @param logger logger used for logging messages regarding hostname lookups * @param hosts the hosts to resolve * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport) * @param transportService the transport service * @param resolveTimeout the timeout before returning from hostname lookups * @return a list of resolved transport addresses */ public static List
resolveHostsLists( final ExecutorService executorService, final Logger logger, final List
hosts, final int limitPortCounts, final TransportService transportService, final TimeValue resolveTimeout) { Objects.requireNonNull(executorService); Objects.requireNonNull(logger); Objects.requireNonNull(hosts); Objects.requireNonNull(transportService); Objects.requireNonNull(resolveTimeout); if (resolveTimeout.nanos() < 0) { throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]"); } // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete final List
> callables = hosts .stream() .map(hn -> (Callable
) () -> transportService.addressesFromString(hn, limitPortCounts)) .collect(Collectors.toList()); final List
> futures; try { futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return Collections.emptyList(); } final List
transportAddresses = new ArrayList<>(); final Set
localAddresses = new HashSet<>(); localAddresses.add(transportService.boundAddress().publishAddress()); localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses())); // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the // hostname with the corresponding task by iterating together final Iterator
it = hosts.iterator(); for (final Future
future : futures) { final String hostname = it.next(); if (!future.isCancelled()) { assert future.isDone(); try { final TransportAddress[] addresses = future.get(); logger.trace("resolved host [{}] to {}", hostname, addresses); for (int addressId = 0; addressId < addresses.length; addressId++) { final TransportAddress address = addresses[addressId]; // no point in pinging ourselves if (localAddresses.contains(address) == false) { transportAddresses.add(address); } } } catch (final ExecutionException e) { assert e.getCause() != null; final String message = "failed to resolve host [" + hostname + "]"; logger.warn(message, e.getCause()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // ignore } } else { logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname); } } return Collections.unmodifiableList(transportAddresses); } @Override protected void doStart() { logger.debug("using max_concurrent_resolvers [{}], resolver timeout [{}]", concurrentConnects, resolveTimeout); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_configured_hosts_resolver]"); executorService.set(EsExecutors.newScaling(nodeName + "/" + "unicast_configured_hosts_resolver", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory, transportService.getThreadPool().getThreadContext())); } @Override protected void doStop() { ThreadPool.terminate(executorService.get(), 10, TimeUnit.SECONDS); } @Override protected void doClose() { } @Override public void resolveConfiguredHosts(Consumer
> consumer) { if (lifecycle.started() == false) { logger.debug("resolveConfiguredHosts: lifecycle is {}, not proceeding", lifecycle); return; } if (resolveInProgress.compareAndSet(false, true)) { transportService.getThreadPool().generic().execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { logger.debug("failure when resolving unicast hosts list", e); } @Override protected void doRun() { if (lifecycle.started() == false) { logger.debug("resolveConfiguredHosts.doRun: lifecycle is {}, not proceeding", lifecycle); return; } List
providedAddresses = hostsProvider.getSeedAddresses((hosts, limitPortCounts) -> resolveHostsLists(executorService.get(), logger, hosts, limitPortCounts, transportService, resolveTimeout)); consumer.accept(providedAddresses); } @Override public void onAfter() { resolveInProgress.set(false); } @Override public String toString() { return "SeedHostsResolver resolving unicast hosts list"; } }); } }}复制代码
  • SeedHostsResolver继承了AbstractLifecycleComponent,同时实现了ConfiguredHostsResolver接口;它提供了getMaxConcurrentResolvers、getResolveTimeout、resolveHostsLists(使用线程池并发执行transportService.addressesFromString)这几个静态方法
  • doStart方法使用EsExecutors.newScaling创建了EsThreadPoolExecutor;doStop方法则使用ThreadPool.terminate来终止线程池
  • resolveConfiguredHosts方法首先将resolveInProgress从false设置为true,之后通过transportService.getThreadPool()执行hostsProvider.getSeedAddresses,执行完成则设置resolveInProgress为false

小结

  • ConfiguredHostsResolver接口定义了resolveConfiguredHosts方法用于解析配置的transport address列表
  • SeedHostsResolver继承了AbstractLifecycleComponent,同时实现了ConfiguredHostsResolver接口;它提供了getMaxConcurrentResolvers、getResolveTimeout、resolveHostsLists(使用线程池并发执行transportService.addressesFromString)这几个静态方法
  • doStart方法使用EsExecutors.newScaling创建了EsThreadPoolExecutor;doStop方法则使用ThreadPool.terminate来终止线程池;resolveConfiguredHosts方法首先将resolveInProgress从false设置为true,之后通过transportService.getThreadPool()执行hostsProvider.getSeedAddresses,执行完成则设置resolveInProgress为false

doc

转载于:https://juejin.im/post/5cc31154e51d456e3b7018eb

你可能感兴趣的文章
一个有趣的命令
查看>>
我的友情链接
查看>>
已发布13集网站开发技术视频:http://blog.sina.com.cn/s/blog_67d27f340102vf7l.html
查看>>
Mysql ibdata 丢失或损坏如何通过frm&ibd 恢复数据
查看>>
MySQL数据库的优化(二)
查看>>
Deepin OS和WIN7双启动 花屏原因一例
查看>>
UIMenuController—为UITextField禁用UIMenuController功能
查看>>
Protobuf使用不当导致的程序内存上涨问题
查看>>
【原创】扯淡的Centos systemd与Docker冲突问题
查看>>
Spring+Mybatis多数据库的配置
查看>>
给大家推荐一个免费下载名称读写ntfs软件的地方
查看>>
在MySQL数据库建立多对多的数据表关系
查看>>
突然停电或死机导致没保存的文件怎么找回
查看>>
kudu
查看>>
jquery.validate.min.js表单验证使用
查看>>
在JS中捕获console.log的输出
查看>>
Python扫描IP段指定端口是否开放(一次扫描20个B网段没问题)
查看>>
一些常用的WebServices
查看>>
CentOS7使用firewalld打开关闭防火墙与端口
查看>>
maven 添加阿里云maven镜像
查看>>