Sentinel是阿里提供的一个限流降级组件,通过简单的API即可实现限流功能,可无缝嵌入Spring、Dubbo等应用
常见限流算法有漏铜、令牌桶、时间窗口,Sentinel用的是时间窗口算法来实现的限流
demo
1 | public static void main(String[] args) { |
这里从SphU.entry(resource)进行分析
1 | private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { |
Sentinel提供的处理器
1 | NodeSelectorSlot:收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级 |
Sentinel大致原理
从上面可以看出Sentinel的基本原理了,通过调用拦截,找到对应的处理链(加载过的话就从缓存里拿了),这里处理链都是实现ProcessorSlot接口的,然后形成调用链依次执行相应的entry方法,核心在于StatisticSlot统计插槽和FlowSlot限流插槽
StatisticSlot
StatisticSlot作用在于统计当前时间窗口的QPS与线程信息,用于FlowSlot进行限流的依据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
Iterator var8;
ProcessorSlotEntryCallback handler;
try {
this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
//走到这里说明没被限流,进行统计线程数和请求数
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
Iterator var13 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();
while(var13.hasNext()) {
ProcessorSlotEntryCallback<DefaultNode> handler = (ProcessorSlotEntryCallback)var13.next();
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException var10) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
}
var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();
while(var8.hasNext()) {
handler = (ProcessorSlotEntryCallback)var8.next();
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException var11) {
//这里就是被限流了,统计被限流的请求信息
BlockException e = var11;
context.getCurEntry().setError(var11);
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockQps(count);
}
var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();
while(var8.hasNext()) {
handler = (ProcessorSlotEntryCallback)var8.next();
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable var12) {
context.getCurEntry().setError(var12);
node.increaseExceptionQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps(count);
}
throw var12;
}
}
//记录被通过的请求
public void addPassRequest(int count) {
//这里走到了父类StatisticNode
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
public void addPassRequest(int count) {
//看下StatisticNode的这个属性怎么初始化的
this.rollingCounterInSecond.addPass(count);
this.rollingCounterInMinute.addPass(count);
}
private transient volatile Metric rollingCounterInSecond;
private transient Metric rollingCounterInMinute;
private LongAdder curThreadNum;
private long lastFetchTime;
public StatisticNode() {
//默认初始化一个sampleCount为2,intervalInMs为1000的ArrayMetric对象
this.rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
this.rollingCounterInMinute = new ArrayMetric(60, 60000, false);
this.curThreadNum = new LongAdder();
this.lastFetchTime = -1L;
}
看下ArrayMetric
public ArrayMetric(int sampleCount, int intervalInMs) {
//上面的构造方法走到了这里
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
//看下父类LeapArray的构造方法
super(sampleCount, intervalInMs);
this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
//单位时间窗口的时间长度
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
//时间窗口个数
this.sampleCount = sampleCount;
//与时间窗口数量相等的数组
this.array = new AtomicReferenceArray(sampleCount);
}
//回到上面的this.rollingCounterInSecond.addPass(count);
public void addSuccess(int count) {
//拿到当前时间的时间窗口,看下时间窗口怎么拿的
WindowWrap<MetricBucket> wrap = this.data.currentWindow();
//给当前时间窗口增加请求数,这样FlowSlot就能用这里值去做限流了
((MetricBucket)wrap.value()).addSuccess(count);
}
public WindowWrap<T> currentWindow() {
//获取当前时间戳对应的时间窗口
return this.currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0L) {
return null;
} else {
//计算当前时间窗口下标
int idx = this.calculateTimeIdx(timeMillis);
//计算当前时间戳所在的时间窗口的开始时间,即要计算出 WindowWrap 中 windowStart 的值,其实就是要算出小于当前时间戳,并且是 windowLengthInMs 的整数倍最大的数字,Sentinel 给出是算法为 ( timeMillis - timeMillis % windowLengthInMs )
long windowStart = this.calculateWindowStart(timeMillis);
while(true) {
while(true) {
WindowWrap<T> old = (WindowWrap)this.array.get(idx);
WindowWrap window;
//旧时间窗口不存在,说明当前是对应时间窗口的第一个请求,直接初始化一个时间窗口就好了
if (old == null) {
window = new WindowWrap((long)this.windowLengthInMs, windowStart, this.newEmptyBucket(timeMillis));
if (this.array.compareAndSet(idx, (Object)null, window)) {
return window;
}
Thread.yield();
} else {
//当前时间对应的窗口开始时间等于旧窗口的开始时间,直接返回旧窗口
if (windowStart == old.windowStart()) {
return old;
}
//如果当前时间窗口的开始时间已经超过了old窗口的开始时间,则放弃old窗口,并将time设置为新的时间窗口的开始时间,此时窗口向前滑动,arrays数组中的窗口将会有一个失效,会有另一个新的窗口进行替换
if (windowStart > old.windowStart()) {
if (this.updateLock.tryLock()) {
try {
window = this.resetWindowTo(old, windowStart);
} finally {
this.updateLock.unlock();
}
return window;
}
Thread.yield();
} else if (windowStart < old.windowStart()) {
//这里走不到
return new WindowWrap((long)this.windowLengthInMs, windowStart, this.newEmptyBucket(timeMillis));
}
}
}
}
}
}
private int calculateTimeIdx(long timeMillis) {
//重要:time每增加一个windowLength的长度,timeId就会增加1,时间窗口就会往前滑动一个
long timeId = timeMillis / (long)this.windowLengthInMs;
//获取对应时间窗口的数组下标
return (int)(timeId % (long)this.array.length());
}
protected long calculateWindowStart(long timeMillis) {
//用当前时间减去一个值,结果肯定是小于当前时间的,减去的是对单位时间窗口的余数,所以结果肯定是windowLengthInMs的倍数,所以就是小于当前时间并且是windowLengthInMs最大整数倍的一个值
return timeMillis - timeMillis % (long)this.windowLengthInMs;
}
通过对当前时间获取到当前时间对应的时间窗口的开始时间,来判断是要开始一个新的窗口还是使用旧窗口,然后统计当前窗口的请求,用于FlowSlot的限流判断依据
Sentinel为什么采用两个时间窗口
时间窗口中保存着很多统计数据,如果时间窗口过多的话,一方面会占用过多内存,另一方面时间窗口过多就意味着时间窗口的长度会变小,如果时间窗口长度变小,就会导致时间窗口过于频繁的滑动
Sentinel限流的总结
根据当前资源拿到对应的责任链,依次执行相应链条的entry方法,StatisticSlot用于统计时间窗口数据, FlowSlot根据StatisticSlot的统计数据与配置的规则来判断是否执行限流操作