内容简介:之前在参与上面就是接下来的方法就和之前的博文
之前在参与 nacos
的开发过程中,有不少同学都在问,为什么我在 nacos console
中将服务进行下线了,但是这个被下线的服务还是可以被调用到,这不太符合官方宣称的秒级上下线特点呀。经过进一步询问发现,那些存在说实例下线后依旧可以对外提供服务的问题,有一个共同的特点——都有 rabbion
这个负载均衡的组件。因此本文将从两个方面探讨这个问题: nacos
的秒级上下线的实现方式以及 rabbion
的实例更新机制导致实例上下线感知延迟
Nacos 秒级上下线
@CanDistro @RequestMapping(value = "", method = RequestMethod.PUT) public String update(HttpServletRequest request) throws Exception { String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String agent = request.getHeader("Client-Version"); if (StringUtils.isBlank(agent)) { agent = request.getHeader("User-Agent"); } ClientInfo clientInfo = new ClientInfo(agent); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { serviceManager.updateInstance(namespaceId, serviceName, parseInstance(request)); } else { serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request)); } return "ok"; } 复制代码
上面就是 nacos console
端实例上下线的接口, parseInstance(request)
方法就是从 request
中提取 instance
实例信息。而背后的 updateInstance
方法如下
public void updateInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } if (!service.allIPs().contains(instance)) { throw new NacosException(NacosException.INVALID_PARAM, "instance not exist: " + instance); } addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); } public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } 复制代码
接下来的方法就和之前的博文 Nacos Server端注册一个服务实例流程
一样了。因此在 nacos console
中一旦点击实例下线,是立马更新 nacos naming server
中的实例信息数据的。
Rabbion的实例更新机制
首先看 nacos
实现的 rabbion
的实例拉取代码
public class NacosServerList extends AbstractServerList<NacosServer> { private NacosDiscoveryProperties discoveryProperties; private String serviceId; public NacosServerList(NacosDiscoveryProperties discoveryProperties) { this.discoveryProperties = discoveryProperties; } @Override public List<NacosServer> getInitialListOfServers() { return getServers(); } @Override public List<NacosServer> getUpdatedListOfServers() { return getServers(); } private List<NacosServer> getServers() { try { List<Instance> instances = discoveryProperties.namingServiceInstance() .selectInstances(serviceId, true); return instancesToServerList(instances); } catch (Exception e) { throw new IllegalStateException( "Can not get service instances from nacos, serviceId=" + serviceId, e); } } private List<NacosServer> instancesToServerList(List<Instance> instances) { List<NacosServer> result = new ArrayList<>(); if (null == instances) { return result; } for (Instance instance : instances) { result.add(new NacosServer(instance)); } return result; } public String getServiceId() { return serviceId; } @Override public void initWithNiwsConfig(IClientConfig iClientConfig) { this.serviceId = iClientConfig.getClientName(); } } 复制代码
可以看到 NacosServerList
继承了 AbstractServerList
,那么这个 AbstractServerList
最终在哪里被收集呢?通过代码跟踪可以看到,最终是在 DynamicServerListLoadBalancer
这个类中被收集
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } }; public DynamicServerListLoadBalancer(IClientConfig clientConfig) { initWithNiwsConfig(clientConfig); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { try { super.initWithNiwsConfig(clientConfig); String niwsServerListClassName = clientConfig.getPropertyAsString( CommonClientConfigKey.NIWSServerListClassName, DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS); ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory .instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig); // 获取所有ServerList接口的实现类 this.serverListImpl = niwsServerListImpl; // 获取Filter(对拉取的servers列表实行过滤操作) if (niwsServerListImpl instanceof AbstractServerList) { AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl) .getFilterImpl(clientConfig); niwsFilter.setLoadBalancerStats(getLoadBalancerStats()); this.filter = niwsFilter; } // 获取获取ServerListUpdater对象实现类类名 String serverListUpdaterClassName = clientConfig.getPropertyAsString( CommonClientConfigKey.ServerListUpdaterClassName, DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS); // 获取ServerListUpdater对象(实际对象为PollingServerListUpdater) this.serverListUpdater = (ServerListUpdater) ClientFactory.instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig); // 初始化或者重置 restOfInit(clientConfig); } catch (Exception e) { throw new RuntimeException( "Exception while initializing NIWSDiscoveryLoadBalancer:" + clientConfig.getClientName() + ", niwsClientConfig:" + clientConfig, e); } } void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); // 开启定时任务,这个任务就是定时刷新实例信息缓存 enableAndInitLearnNewServersFeature(); // 开启前进行一次实例拉取操作 updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); } // 这里就是进行实例信息缓存更新的操作 @VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { // 调用拉取新实例信息的方法 servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); // 用Filter对拉取的servers列表进行更新 if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } // 更新实例列表 updateAllServerList(servers); } 复制代码
来看看 enableAndInitLearnNewServersFeature();
的最终调用是什么
@Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { // 这里的UpdateAction对象就是在DynamicServerListLoadBalancer中封装的updateListOfServers实现 updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; // 默认任务执行时间间隔为30s scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS); } else { logger.info("Already active, no-op"); } } 复制代码
因此不难看出,虽然 nacos
实现了秒级的实例上下线,但是由于在 Spring Cloud
中,负载组件 rabbion
的实例信息更新是采用了定时任务的形式,有可能这个任务上一秒刚刚执行完,下一秒你就执行实例上下线操作,那么 rabbion
要感知这个变化,就必须要等待 refreshIntervalMs
秒后才可以感知到。
以上所述就是小编给大家介绍的《Nacos疑问之为什么我服务明明下线了却还是可以调用到?》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 面试官:你知道 Dubbo 怎么做优雅上下线的吗?你:优雅上下线是啥?
- SpringCloud服务的平滑上下线
- SpringCloud服务的平滑上下线
- Eureka 源码剖析(五):服务下线
- Elasticsearch 平滑下线节点实践指南
- Elasticsearch 平滑下线节点实践指南
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Kafka技术内幕
郑奇煌 / 人民邮电出版社 / 2017-11 / 119.00元
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者和控制器如何确保Kafka集群的分布式和容错特性,两种同步集群工具MirrorMaker和uReplicator,流处理的两种API以及Kafka的一些高级特性等。一起来看看 《Kafka技术内幕》 这本书的介绍吧!