内容简介:长按下方的二维码可以快速关注我们
背景
在分布式系统中为什么要使用开关?例如双十一电商平台需要做促销活动,此时订单量暴增,在下单环节,可能需要调用A、B、C三个接口来完成,但是其实A和B是必须的,C只是附加的功能(例如在下单的时候获取用户常用地址,或者发个推送消息之类的),可有可无,在平时系统没有压力,在容量充足的情况下,调用下没问题,但是在特殊节日的大促环节,系统已经满负荷了,这时候其实完全可以不去调用C接口,怎么实现这个呢?改代码重新发布?no,这样不太敏捷,于是开关诞生了,开发人员只要简单执行一下命令或者点一下页面,就可以关掉对于C接口的调用,在请求高峰过去之后,再把开关恢复回去即可。类似的使用场景还有A/B Test、灰度发布和数据的不停服切换等。
整体设计
高可用分布式开关设计?
一、 需求分析
开关服务的核心需求主要有以下几点:
1. 支持开关的分布式化管理
• 开关统一管理,发布、更新等操作只需在中心服务器上进行,一次操作,处处可用。
• 开关更新自动化,当开关发生变更,订阅该开关的客户端会自动发现变更,进而同步新值。
2. 具有容灾机制,保证服务的高可用
• 服务集群的高可用,当集群中的一台server不可用了,client发现后可以自动切换到其他server上进行访问。
• 客户端具备容灾机制,当开关中心完全不可用,可以在客户端对开关进行操作。
二、方案设计
针对此需求分析,我们抽象成了两大块分别是配置中心和SDK,整体架构如下:
各个系统模块介绍:
-
配置中心:
配置中心在此处提供开关的统一管理
-
zookeeper:
分布式开关统一注册中心,主要提供变更通知服务,客户端通过订阅开关节点,实时获取开关变更信息,从而同步更新到本地缓存
-
SDK:
client端获取开关,以及监听开关的变化从而更新本地缓存。
配置中心设计
配置中心在此处的作用有如下几点:
-
提供开关统一管理,包含开关发布、更新、查询等基本服务
-
操作开关的日志,比如谁在某一时刻将开关从关闭状态修改为打开状态。
-
系统权限控制,只有拥有相关权限的人才能操作开关。
配置中心整体设计如下:
其中db主要保存了开关信息,以及日志信息。
zk主要用来创建开关和监听开关的变化。
配置中心的设计看似简单,其实也需要注意以下几点:
1. 开关的命名重复问题
在设计系统的时候,开关是否要共享给所有系统,还是其中某一个系统,如果共享给所有系统,那么有权限的人对开关命名的时候难免会重复,针对此,我们设计了appid的概念,一个系统对应一个appid,在一个appid内开关名称不允许有重复,只有该appid的owner才有权限对该appid的下的开关做操作。
2. 开关的分类
我们可以将开关分为三大类,分别是功能开关、降级开关、灰度开关:
-
功能开关
针对某一个功能是否打开,例如在订单下单的时候需要获取下单用户的历史换绑手机号信息,但是由于B系统只是提供了接口定义,实际业务还未开发完成,A系统可以先提前开发并上线,待B系统上线之后,A系统将该功能开关打开。
-
降级开关
典型的应用场景是电商做促销的时候,比如双十一电商做促销,用户下单的时候获取用户历史常用地址,因为双十一系统已经达到负荷,为了系统性能,将该业务逻辑降级。或者A系统调用B系统,由于B系统整体宕机,为了不影响A系统继续运行,可以手动将B系统降级等。
-
灰度开关
针对某一功能做灰度,例如我们需要针对刷单用户在下单过程中做拦截,为此我们在下单阶段做了一套黑白名单处理,但是我们也无法知晓该套黑白名单的正确率多少,为了避免造成误拦,我们需要对该功能做灰度采样,以便及时调整我们的黑白名单逻辑。通常的灰度策略为 1% 灰度,10%灰度,30%灰度,50%。。。
3. zk中开关设计
zk中的设计结构为路径格式,我们将/appid 设置为根路径,例如appid为order的根路径为/order,则在该appid下设置的user_open开关的路径则为:
/order/user_open ,所以我们设计的路径公式如下:
/appid/switch
部分页面效果如下:
SDK设计
SDK主要是以jar的形式嵌入在client端的,它的作用主要是在client端获取开关,以及监听开关的变化从而更新本地缓存。SDK整体设计如下所示:
我们使用Curator来操作zk,因为它相比原生的zk 客户端确实好用不少,这里不做过多展开,为了提高系统性能我们将开关信息缓存在本地内存,这样做的目的是提升系统的性能,所以获取开关的流程图如下:
1. 监听开关变化
如果开关发生改变,我们需要将开关变化的信息载入到本地,监听代码如下:
1 private void nodeListener(final String key) { 2 final NodeCache nodeCache = new NodeCache(client, basePath + "/" + key); 3 try { 4 nodeCache.start(); 5 nodeCache.getListenable().addListener(new NodeCacheListener() { 6 7 public void nodeChanged() throws Exception { 8 String msg = new String(nodeCache.getCurrentData().getData()); 9 System.out.println("监听事件触发"); 10 System.out.println("重新获得节点内容为:" + msg); 11 //加入到本地缓存 12 dataMap.put(key, msg); 13 } 14 }); 15 } catch (Exception e) { 16 e.printStackTrace(); 17 } 18 19 20 }
2. 降级开关
降级开关和功能开关在底层实现上是一样的,就是从zk获取value为true的时候,是打开状态的,代码如下:
1 /** 2 * 获取开关,默认是打开的 3 * 4 * @param switchKey 5 * @return 6 */ 7 public boolean getSwitch(String switchKey) { 8 try { 9 String dataMsg = getDataMsg(switchKey); 10 if (isEmpty(dataMsg)) { 11 return true; 12 } 13 14 return Boolean.parseBoolean(dataMsg); 15 } catch (Exception e) { 16 e.printStackTrace(); 17 } 18 19 return true; 20 }
其中getDataMsg方法封装了本地缓存的调用,具体代码如下:
1 private String getDataMsg(String key) { 2 byte[] data; 3 try { 4 //先从本地缓存中找 5 String msg = dataMap.get(key); 6 if (!isEmpty(msg)) { 7 return msg; 8 } 9 10 //本地缓存没有,则从zk中去查找 11 data = dataBuilder.forPath(basePath + "/" + key); 12 if (data != null) { 13 String dataMsg = new String(data); 14 //重新塞入缓存 15 dataMap.put(key, dataMsg); 16 nodeListener(key); 17 return dataMsg; 18 } 19 } catch (Exception e) { 20 e.printStackTrace(); 21 } 22 23 return null; 24 }
降级开关和功能开关的代码完成后,接下来是一段测试demo,测试开关是否可以正常使用,代码如下:
1public class SwitchDemo { 2 3 public static void main(String[] args)throws Exception { 4 5 //user_open 开关 打开 6 if(SwitchHandler.config().getSwitch("user_open")){ 7 System.out.println("exe user open switch 1"); 8 } 9 10 Thread.sleep(10000); 11 12 13 //user_open 开关 关闭 14 if (SwitchHandler.config().getSwitch("user_open")){ 15 System.out.println("exe user open switch 2"); 16 }else{ 17 18 System.out.println("exe user not open"); 19 } 20 21 22 Thread.sleep(1000000000); 23 } 24}
接下来在本地启动一个zk单机服务,进入到zk的安装目录 ,启动命令如下:
1./zkServer.sh start
启动一个客户端,创建一个开关user_open value为true,假设我这个服务的appid叫sky,那么我应该先创建/sky 这个路径,接着创建,/sky/user_open这个路径,命令如下:
1create /sky 1 2create /sky/user_open true
接下来我们启动SwitchDemo测试类,在代码走到第一次sleep阶段,我们立马将user_open 这个值修改为false,修改zk的命令为:
1set /sky/user_open false
最终打印结果如下:
从结果可以看出,第一次执行的时候,由于user_open的value为true,所以
日志 exe user open switch 1 打印出来了,其次监听的日志也打印出来了,当代码执行到第十行的时候,我们将user_open的value修改为false,此时监听的日志监听到开关发生了变化,并将本地内存的开关地址修改了false,最后执行第14行代码的时候,由于开关是关闭状态,所以走到了第18行的逻辑。
3. 灰度开关设计
灰度开关主要针对某一个功能来进行灰度,那么就需要有一个灰度策略的概念,比如设置的是灰度10%,此时有1000个请求进来,应该只有100个左右的请求是命中这段逻辑,在微服务架构中,服务与服务之间的调用都会透传一个requestId(请求id),因此将requestId 当做灰度的主体是最适合不过了,简单的灰度算法可以将requestId 进行hash 取模100 然后跟设置的灰度值进行比较即可。代码如下:
1 /** 2 * 灰度开关 3 * 4 * @param switchKey 5 * @param strategyId 灰度策略,可以传入requestId,手机号来进行灰度 6 * @return 7 */ 8 public boolean getGrayscaleSwitch(String switchKey, String strategyId) { 9 10 int value = getInt(switchKey, 100); 11 12 int hash = strategyId.hashCode(); 13 14 return Math.abs(hash) % 100 <= value; 15 }
灰度不是一个精确值,请求量越大灰度的越精确,因此接下来我们的测试demo,会模拟10000条请求,如果命中了大约1000条左右,那么说明我们的灰度算法没啥问题,我们将当前时间戳当做requestId(当然实际不要这么做,应该用微服务之间透传的requestId,这里只是为了测试)
首先设置一个灰度开关user_gary,value为10(代表灰度10%,最大为100)
1create /sky/user_gary 10
测试代码如下:
1 //灰度10%开关 2 int grayCount=0;// 3 for (int i=0;i<10000;i++){ 4 String requestId = System.currentTimeMillis()+"-"+i; 5 if (SwitchHandler.config().getGrayscaleSwitch("user_gary",requestId)){ 6 grayCount++; 7 } 8 } 9 10 System.out.println("进入灰度开关的次数为:"+grayCount);
运行结果:
我们看到10000次请求,命中了1176次,大约灰度10%,说明灰度起作用了。
SDK完整代码
pom依赖:
1 <dependency> 2 <groupId>org.apache.curator</groupId> 3 <artifactId>curator-recipes</artifactId> 4 <version>4.0.1</version> 5 </dependency>
SDK完整代码:
1package com.wuzy.myswitch; 2 3import org.apache.curator.RetryPolicy; 4import org.apache.curator.framework.CuratorFramework; 5import org.apache.curator.framework.CuratorFrameworkFactory; 6import org.apache.curator.framework.api.GetDataBuilder; 7import org.apache.curator.framework.recipes.cache.NodeCache; 8import org.apache.curator.framework.recipes.cache.NodeCacheListener; 9import org.apache.curator.retry.ExponentialBackoffRetry; 10 11import java.util.Map; 12import java.util.concurrent.ConcurrentHashMap; 13 14public class SwitchHandler { 15 16 private String basePath = "/sky"; 17 private GetDataBuilder dataBuilder; 18 19 private Map<String, String> dataMap = new ConcurrentHashMap<String, String>(); 20 21 private SwitchHandler() { 22 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); 23 client = CuratorFrameworkFactory.builder() 24 .connectString("127.0.0.1:2181") 25 .retryPolicy(retryPolicy) 26 .sessionTimeoutMs(6000) 27 .connectionTimeoutMs(3000) 28 .build(); 29 client.start(); 30 31 dataBuilder = client.getData(); 32 33 //TODO 后期这里最好将数据库中的开关加载出来,从zk中找出放入本地缓存中,加快查询速度 34 } 35 36 private static class SwitchHandlerHolder { 37 private static final SwitchHandler switchHandler = new SwitchHandler(); 38 } 39 40 public static SwitchHandler config() { 41 return SwitchHandlerHolder.switchHandler; 42 } 43 44 private CuratorFramework client; 45 46 47 /** 48 * 获取开关,默认是打开的 49 * 50 * @param switchKey 51 * @return 52 */ 53 public boolean getSwitch(String switchKey) { 54 try { 55 String dataMsg = getDataMsg(switchKey); 56 if (isEmpty(dataMsg)) { 57 return true; 58 } 59 60 return Boolean.parseBoolean(dataMsg); 61 } catch (Exception e) { 62 e.printStackTrace(); 63 } 64 65 return true; 66 } 67 68 /** 69 * 灰度开关 70 * 71 * @param switchKey 72 * @param strategyId 灰度策略,可以传入requestId,手机号来进行灰度 73 * @return 74 */ 75 public boolean getGrayscaleSwitch(String switchKey, String strategyId) { 76 77 int value = getInt(switchKey, 100); 78 79 int hash = strategyId.hashCode(); 80 81 return Math.abs(hash) % 100 <= value; 82 } 83 84 85 public int getInt(String key, int defaultValue) { 86 try { 87 String dataMsg = getDataMsg(key); 88 if (isEmpty(dataMsg)) { 89 return defaultValue; 90 } 91 return Integer.parseInt(dataMsg); 92 93 } catch (Exception e) { 94 e.printStackTrace(); 95 } 96 97 return defaultValue; 98 } 99 100 101 private boolean isEmpty(String msg) { 102 return msg == null || msg.trim().equals(""); 103 } 104 105 private String getDataMsg(String key) { 106 byte[] data; 107 try { 108 //先从本地缓存中找 109 String msg = dataMap.get(key); 110 if (!isEmpty(msg)) { 111 return msg; 112 } 113 114 //本地缓存没有,则从zk中去查找 115 data = dataBuilder.forPath(basePath + "/" + key); 116 if (data != null) { 117 String dataMsg = new String(data); 118 //重新塞入缓存 119 dataMap.put(key, dataMsg); 120 nodeListener(key); 121 return dataMsg; 122 } 123 } catch (Exception e) { 124 e.printStackTrace(); 125 } 126 127 return null; 128 } 129 130 131 private void nodeListener(final String key) { 132 final NodeCache nodeCache = new NodeCache(client, basePath + "/" + key); 133 try { 134 nodeCache.start(); 135 nodeCache.getListenable().addListener(new NodeCacheListener() { 136 137 public void nodeChanged() throws Exception { 138 String msg = new String(nodeCache.getCurrentData().getData()); 139 System.out.println("监听事件触发"); 140 System.out.println("重新获得节点内容为:" + msg); 141 //加入到本地缓存 142 dataMap.put(key, msg); 143 } 144 }); 145 } catch (Exception e) { 146 e.printStackTrace(); 147 } 148 149 150 } 151}
本公众号团队成员由饿了么、阿里、蚂蚁金服等同事组成,关注架构师之巅,可以了解最前沿的技术。
长按下方的二维码可以快速关注我们
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 用手机实现射频开关
- Eureka Server 开关流程
- CSS开关按钮三例
- Element源码分析系列9-Switch(开关)
- 树莓派 GPIO按钮开关 原理与实现
- 解--头条的算法面试题-圆环开关灯
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
疯狂Java讲义
李刚 / 电子工业出版社 / 2014-7-1 / 109.00元
《疯狂Java讲义(第3版)(含CD光盘1张)》是《疯狂Java讲义》的第3版,第3版保持了前两版系统、全面、讲解浅显、细致的特性,全面新增介绍了Java 8的新特性,《疯狂Java讲义(第3版)(含CD光盘1张)》大部分示例程序都采用Lambda表达式、流式API进行了改写,因此务必使用Java 8的JDK来编译、运行。 《疯狂Java讲义(第3版)(含CD光盘1张)》深入介绍了Java编......一起来看看 《疯狂Java讲义》 这本书的介绍吧!