Sentinel-Sentinel原理

Sentinel是阿里提供的一个限流降级组件,通过简单的API即可实现限流功能,可无缝嵌入Spring、Dubbo等应用
常见限流算法有漏铜、令牌桶、时间窗口,Sentinel用的是时间窗口算法来实现的限流

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
initFlowRules(); //初始化一个规则
while(true){
Entry entry=null;
try{
entry= SphU.entry(resource); //它做了什么
System.out.println("Hello Word");
}catch (BlockException e){//如果被限流了,那么会抛出这个异常
e.printStackTrace();
}finally {
if(entry!=null){
entry.exit();// 释放
}
}
}
}

这里从SphU.entry(resource)进行分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
return new CtEntry(resourceWrapper, (ProcessorSlot)null, context);
} else {
if (context == null) {
context = CtSph.InternalContextUtil.internalEnter("sentinel_default_context");
}

if (!Constants.ON) {
return new CtEntry(resourceWrapper, (ProcessorSlot)null, context);
} else {
//重点在这里,从上面的资源名初始化了一个resourceWrapper对象,然后获取对应资源的处理链条
ProcessorSlot<Object> chain = this.lookProcessChain(resourceWrapper);
if (chain == null) {
return new CtEntry(resourceWrapper, (ProcessorSlot)null, context);
} else {
CtEntry e = new CtEntry(resourceWrapper, chain, context);

try {
chain.entry(context, resourceWrapper, (Object)null, count, prioritized, args);
} catch (BlockException var9) {
e.exit(count, args);
throw var9;
} catch (Throwable var10) {
RecordLog.info("Sentinel unexpected exception", var10);
}

return e;
}
}
}
}

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);
if (chain == null) {
synchronized(LOCK) {
//将资源对应的处理链存入HASHMAP
chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);
//双重检查防止并发
if (chain == null) {
//可见资源规则不能超过6K个,一般也不会那么多
if (chainMap.size() >= 6000) {
return null;
}
//从这里看下是怎么初始化处理链的
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap(chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}

return chain;
}
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
} else {
//通过SPI可自定义处理链构造器
slotChainBuilder = (SlotChainBuilder)SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
if (slotChainBuilder == null) {
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default", new Object[0]);
//使用默认的处理链构造器
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: " + slotChainBuilder.getClass().getCanonicalName(), new Object[0]);
}
//走到这里,看下默认的处理链构造器怎么初始化处理链的
return slotChainBuilder.build();
}
}
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
//去加载处理链,Sentinel提供的处理器都实现了ProcessorSlot接口
List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
Iterator var3 = sortedSlotList.iterator();

while(var3.hasNext()) {
ProcessorSlot slot = (ProcessorSlot)var3.next();
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain", new Object[0]);
} else {
//构建责任链
chain.addLast((AbstractLinkedProcessorSlot)slot);
}
}

return chain;
}
public static <T> List<T> loadPrototypeInstanceListSorted(Class<T> clazz) {
try {
//这里就拿到了默认的处理器了
ServiceLoader<T> serviceLoader = ServiceLoaderUtil.getServiceLoader(clazz);
List<SpiLoader.SpiOrderWrapper<T>> orderWrappers = new ArrayList();
Iterator var3 = serviceLoader.iterator();

while(var3.hasNext()) {
T spi = var3.next();
int order = SpiLoader.SpiOrderResolver.resolveOrder(spi);
//这里对这些处理器按照order进行排序
SpiLoader.SpiOrderResolver.insertSorted(orderWrappers, spi, order);
RecordLog.debug("[SpiLoader] Found {} SPI: {} with order {}", new Object[]{clazz.getSimpleName(), spi.getClass().getCanonicalName(), order});
}

List<T> list = new ArrayList(orderWrappers.size());

for(int i = 0; i < orderWrappers.size(); ++i) {
list.add(((SpiLoader.SpiOrderWrapper)orderWrappers.get(i)).spi);
}

return list;
} catch (Throwable var6) {
RecordLog.error("[SpiLoader] ERROR: loadPrototypeInstanceListSorted failed", var6);
var6.printStackTrace();
return new ArrayList();
}
}

Sentinel提供的处理器

1
2
3
4
5
6
7
8
NodeSelectorSlot:收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级
ClusterBuilderSlot:用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据
LogSlot:发生限流时记录日志用的
StatisticSlot:用于记录、统计不同纬度的 runtime 指标监控信息
SystemSlot:通过系统的状态,例如 load1 等,来控制总的入口流量
AuthoritySlot:根据配置的黑白名单和调用来源信息,来做黑白名单控制
FlowSlot:用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制
DegradeSlot:通过统计信息以及预设的规则,来做熔断降级

Sentinel大致原理

从上面可以看出Sentinel的基本原理了,通过调用拦截,找到对应的处理链(加载过的话就从缓存里拿了),这里处理链都是实现ProcessorSlot接口的,然后形成调用链依次执行相应的entry方法,核心在于StatisticSlot统计插槽和FlowSlot限流插槽

StatisticSlot

StatisticSlot作用在于统计当前时间窗口的QPS与线程信息,用于FlowSlot进行限流的依据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
Iterator var8;
ProcessorSlotEntryCallback handler;
try {
this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
//走到这里说明没被限流,进行统计线程数和请求数
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}

if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}

Iterator var13 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();

while(var13.hasNext()) {
ProcessorSlotEntryCallback<DefaultNode> handler = (ProcessorSlotEntryCallback)var13.next();
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException var10) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
}

if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
}

var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();

while(var8.hasNext()) {
handler = (ProcessorSlotEntryCallback)var8.next();
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException var11) {
//这里就是被限流了,统计被限流的请求信息
BlockException e = var11;
context.getCurEntry().setError(var11);
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}

if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockQps(count);
}

var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();

while(var8.hasNext()) {
handler = (ProcessorSlotEntryCallback)var8.next();
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}

throw e;
} catch (Throwable var12) {
context.getCurEntry().setError(var12);
node.increaseExceptionQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps(count);
}

if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps(count);
}

throw var12;
}

}
//记录被通过的请求
public void addPassRequest(int count) {
//这里走到了父类StatisticNode
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}

public void addPassRequest(int count) {
//看下StatisticNode的这个属性怎么初始化的
this.rollingCounterInSecond.addPass(count);
this.rollingCounterInMinute.addPass(count);
}
private transient volatile Metric rollingCounterInSecond;
private transient Metric rollingCounterInMinute;
private LongAdder curThreadNum;
private long lastFetchTime;

public StatisticNode() {
//默认初始化一个sampleCount为2,intervalInMs为1000的ArrayMetric对象
this.rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
this.rollingCounterInMinute = new ArrayMetric(60, 60000, false);
this.curThreadNum = new LongAdder();
this.lastFetchTime = -1L;
}
看下ArrayMetric
public ArrayMetric(int sampleCount, int intervalInMs) {
//上面的构造方法走到了这里
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
//看下父类LeapArray的构造方法
super(sampleCount, intervalInMs);
this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
//单位时间窗口的时间长度
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
//时间窗口个数
this.sampleCount = sampleCount;
//与时间窗口数量相等的数组
this.array = new AtomicReferenceArray(sampleCount);
}
//回到上面的this.rollingCounterInSecond.addPass(count);
public void addSuccess(int count) {
//拿到当前时间的时间窗口,看下时间窗口怎么拿的
WindowWrap<MetricBucket> wrap = this.data.currentWindow();
//给当前时间窗口增加请求数,这样FlowSlot就能用这里值去做限流了
((MetricBucket)wrap.value()).addSuccess(count);
}
public WindowWrap<T> currentWindow() {
//获取当前时间戳对应的时间窗口
return this.currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0L) {
return null;
} else {
//计算当前时间窗口下标
int idx = this.calculateTimeIdx(timeMillis);
//计算当前时间戳所在的时间窗口的开始时间,即要计算出 WindowWrap 中 windowStart 的值,其实就是要算出小于当前时间戳,并且是 windowLengthInMs 的整数倍最大的数字,Sentinel 给出是算法为 ( timeMillis - timeMillis % windowLengthInMs )
long windowStart = this.calculateWindowStart(timeMillis);

while(true) {
while(true) {
WindowWrap<T> old = (WindowWrap)this.array.get(idx);
WindowWrap window;
//旧时间窗口不存在,说明当前是对应时间窗口的第一个请求,直接初始化一个时间窗口就好了
if (old == null) {
window = new WindowWrap((long)this.windowLengthInMs, windowStart, this.newEmptyBucket(timeMillis));
if (this.array.compareAndSet(idx, (Object)null, window)) {
return window;
}

Thread.yield();
} else {
//当前时间对应的窗口开始时间等于旧窗口的开始时间,直接返回旧窗口
if (windowStart == old.windowStart()) {
return old;
}
//如果当前时间窗口的开始时间已经超过了old窗口的开始时间,则放弃old窗口,并将time设置为新的时间窗口的开始时间,此时窗口向前滑动,arrays数组中的窗口将会有一个失效,会有另一个新的窗口进行替换
if (windowStart > old.windowStart()) {
if (this.updateLock.tryLock()) {
try {
window = this.resetWindowTo(old, windowStart);
} finally {
this.updateLock.unlock();
}

return window;
}

Thread.yield();
} else if (windowStart < old.windowStart()) {
//这里走不到
return new WindowWrap((long)this.windowLengthInMs, windowStart, this.newEmptyBucket(timeMillis));
}
}
}
}
}
}
private int calculateTimeIdx(long timeMillis) {
//重要:time每增加一个windowLength的长度,timeId就会增加1,时间窗口就会往前滑动一个
long timeId = timeMillis / (long)this.windowLengthInMs;
//获取对应时间窗口的数组下标
return (int)(timeId % (long)this.array.length());
}
protected long calculateWindowStart(long timeMillis) {
//用当前时间减去一个值,结果肯定是小于当前时间的,减去的是对单位时间窗口的余数,所以结果肯定是windowLengthInMs的倍数,所以就是小于当前时间并且是windowLengthInMs最大整数倍的一个值
return timeMillis - timeMillis % (long)this.windowLengthInMs;
}
通过对当前时间获取到当前时间对应的时间窗口的开始时间,来判断是要开始一个新的窗口还是使用旧窗口,然后统计当前窗口的请求,用于FlowSlot的限流判断依据

Sentinel为什么采用两个时间窗口

时间窗口中保存着很多统计数据,如果时间窗口过多的话,一方面会占用过多内存,另一方面时间窗口过多就意味着时间窗口的长度会变小,如果时间窗口长度变小,就会导致时间窗口过于频繁的滑动

Sentinel限流的总结

根据当前资源拿到对应的责任链,依次执行相应链条的entry方法,StatisticSlot用于统计时间窗口数据, FlowSlot根据StatisticSlot的统计数据与配置的规则来判断是否执行限流操作

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×