戳蓝字「TopCoder 」关注我们哦!
sentinel 处理流程是基于slot链(ProcessorSlotChain)来完成的,比如限流、熔断等,其中重要的一个slot就是 StatisticSlot
sentinel 的slot链( ProcessorSlotChain
)是责任链模式的体现,那SlotChain是在哪创建的呢?是在 CtSph.lookProcessChain()
// DefaultSlotChainBuilder public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; }
public enum MetricEvent { PASS, // Normal pass. BLOCK, // Normal block. EXCEPTION, // 异常统计 SUCCESS, RT, // rt统计 OCCUPIED_PASS }
处理流程走到StatisticSlot时,首先触发后续slot.entry方法,然后统计各项指标,后续slot中数据判断来源就是这里统计的各项指标。StatisticSlot.entry 逻辑如下:
@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable { try { // 触发下一个Slot的entry方法 fireEntry(context, resourceWrapper, node, count, args); // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级 // 统计信息 node.increaseThreadNum(); node.addPassRequest(); // 省略部分代码 } catch (BlockException e) { context.getCurEntry().setError(e); // Add block count. node.increaseBlockedQps(); // 省略部分代码 throw e; } catch (Throwable e) { context.getCurEntry().setError(e); // Should not happen node.increaseExceptionQps(); // 省略部分代码 throw e; } }
注意:由于后续的 fireEntry
操作和更新本次统计信息是两个操作,不是原子的,会造成限流不准的小问题,比如设置的FlowRule count为20,并发情况下可能稍大于20,不过针对大部分场景来说,这点偏差是可以容忍的,毕竟我们要的是限流效果,而不是必须精确的限流操作。
我们可以看到 node.addPassRequest() 这段代码是在fireEntry执行之后执行的,这意味着,当前请求通过了sentinel的流控等规则,此时需要将当次请求记录下来,也就是执行 node.addPassRequest() 这行代码,具体的代码如下所示:
// DefaultNode public void addPassRequest() { super.addPassRequest(); this.clusterNode.addPassRequest(); }
这里的node是一个 DefaultNode 实例,这里特别补充一个 DefaultNode 和 ClusterNode 的区别:
上面代码不管是 DefaultNode 还是 ClusterNode ,走的都是StatisticNode 对象的 addPassRequest 方法:
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(2, 1000); private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000); public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); // 对每秒指标统计 rollingCounterInMinute.addPass(count); // 每分钟指标统计 }
每一个通过的指标(pass)都是调用Metric 的接口进行操作的,并且是通过 ArrayMetric 这种实现类,代码如下:
public ArrayMetric(int windowLength, int interval) { this.data = new WindowLeapArray(windowLength, interval); } public void addPass(int count) { // 获取当前时间窗口 WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count); }
首先通过 currentWindow() 获取当前时间窗口,然后更新当前时间窗口对应的统计指标,以下代码重点关注几个判断逻辑:
// LeapArray public WindowWrap<T> currentWindow() { return currentWindow(TimeUtil.currentTimeMillis()); } // TimeUtil public static long currentTimeMillis() { // currentTimeMillis是由一个tick线程每个1ms更新一次,具体逻辑在TimeUtil类中 return currentTimeMillis; } // LeapArray public WindowWrap<T> currentWindow(long timeMillis) { // 计算当前时间点落在滑动窗口的下标 int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. long windowStart = calculateWindowStart(timeMillis); // 获取当前时间点对应的windowWrap,array为AtomicReferenceArray while (true) { WindowWrap<T> old = array.get(idx); if (old == null) { // 1.为空表示当前时间窗口为初始化过,创建WindowWrap并cas设置到array中 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket()); if (array.compareAndSet(idx, null, window)) { return window; } else { Thread.yield(); } } else if (windowStart == old.windowStart()) { // 2.获取的时间窗口正好对应当前时间,直接返回 return old; } else if (windowStart > old.windowStart()) { // 3.获取的时间窗口为老的,进行reset操作复用 if (updateLock.tryLock()) { try { return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { Thread.yield(); } } else if (windowStart < old.windowStart()) { // 4.时间回拨了,正常情况下不会走到这里 return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket()); } } }
// wrap.value().addPass(count); public void addPass(int n) { add(MetricEvent.PASS, n); } // MetricBucket public MetricBucket add(MetricEvent event, long n) { // 对应MetricEvent枚举中值 counters[event.ordinal()].add(n); return this; }
private final long windowLengthInMs; // 时间窗口的长度 private long windowStart; // 时间窗口开始时间 private T value; // MetricBucket对象,保存各个指标数据
currentTimeMillis = System.currentTimeMillis(); Thread daemon = new Thread(new Runnable() { @Override public void run() { while (true) { currentTimeMillis = System.currentTimeMillis(); try { TimeUnit.MILLISECONDS.sleep(1); } catch (Throwable e) { } } } }); daemon.setDaemon(true); daemon.setName("sentinel-time-tick-thread"); daemon.start();
protected int windowLengthInMs; // 单个滑动窗口时间值 protected int sampleCount; // 滑动窗口个数 protected int intervalInMs; // 周期值(相当于所有滑动窗口时间值之和) public LeapArray(int sampleCount, int intervalInMs) { this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount); }
针对每秒滑动窗口, windowLengthInMs=500,sampleCount=2,intervalInMs=1000
,针对每分钟滑动窗口, windowLengthInMs=1000,sampleCount=60,intervalInMs=60000
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(2, 1000); private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000);
Sentinel 底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。最后以一张图结束吧:
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- View 体系详解:坐标系、滑动、手势和事件分发机制
- 滑动验证码的原理并利用 Vue 实现滑动验证码
- Flink 滑动窗口优化
- Flink 滑动窗口优化
- Flink 滑动窗口优化
- 撸一款”灵动“的滑动按钮
Hacker's Delight
Henry S. Warren Jr. / Addison-Wesley / 2002-7-27 / USD 59.99
A collection useful programming advice the author has collected over the years; small algorithms that make the programmer's task easier. * At long last, proven short-cuts to mastering difficult aspec......一起来看看 《Hacker's Delight》 这本书的介绍吧!