结构图:
源码分析:(温馨提示,以下源码部分请使用 IDEA 打开源码结合查看阅读,否则荣誉晕)
入口:
@SentinelResource 注解的切面 :
@Aspect // AspectJ切面 public class SentinelResourceAspect extends AbstractSentinelAspectSupport { // 指定切入点为@SentinelResource注解 @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { } // 指定此为环绕通知 around advice @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { // Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try { // 要织入的、增强的功能 entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); // 调用目标方法 Object result = pjp.proceed(); return result; } catch (BlockException ex) { return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) { Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex; } finally { if (entry != null) { entry.exit(1, pjp.getArgs()); } } } }
.
public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args) throws BlockException { // 注意第4个参数值为1 return Env.sph.entryWithType(name, resourceType, trafficType, 1, args); }
.
@Override public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args) throws BlockException { // count参数:表示当前请求可以增加多少个计数,默认值 1 // 注意第5个参数为false return entryWithType(name, resourceType, entryType, count, false, args); } @Override public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized, Object[] args) throws BlockException { // 将信息封装为一个资源对象 StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType); // 返回一个资源操作对象entry // prioritized若为true,则表示当前访问必须等待“根据其优先级计算出的时间”后才可通过 // prioritized若为false,则当前请求无需等待 return entryWithPriority(resource, count, prioritized, args); }
.
/** * * @param resourceWrapper 资源实例 * @param count 默认值为1 * @param prioritized 默认值为false * @param args * @return * @throws BlockException */ private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { // 从ThreadLocal中获取context // 即一个请求会占用一个线程,一个线程会绑定一个context Context context = ContextUtil.getContext(); // 若context是NullContext类型,则表示当前系统中的context数量已经超出的阈值 // 即访问请求的数量已经超出了阈值。此时直接返回一个无需做规则检测的资源操作对象 if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } // 若当前线程中没有绑定context,则创建一个context并将其放入到ThreadLocal if (context == null) { // Using default context. //CONTEXT_DEFAULT_NAME=sentinel_default_context,如果没有显示的创建上下文对象,则创建默认的 context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } // 若全局开关是关闭的,则直接返回一个无需做规则检测的资源操作对象 // Global switch is close, no rule checking will do. if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } // 查找SlotChain,返回 DefaultProcessorSlotChain 的实例对象 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ // 若没有找到chain,则意味着chain数量超出了阈值,则直接返回一个无需做规则检测的资源操作对象 if (chain == null) { return new CtEntry(resourceWrapper, null, context); } // 创建一个资源操作对象 实现类为:DefaultProcessorSlotChain Entry e = new CtEntry(resourceWrapper, chain, context); try { // 对资源进行操作 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }
.
privatestaticfinalReentrantLockLOCK=newReentrantLock();
tranceOne", "appA"); * * @param name * @param origin * @return */ protected static Context trueEnter(String name, String origin) { // 尝试着从ThreadLocal中获取Context Context context = contextHolder.get(); // 若ThreadLocal中没有context,则尝试着从缓存map中获取 if (context == null) { // 缓存map的key为context名称,value为EntranceNode Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap; // 获取EntranceNode——双重检测锁DCL——为了防止并发创建 DefaultNode node = localCacheNameMap.get(name); if (node == null) { // 若缓存map的size 大于 context数量的最大阈值,则直接返回NULL_CONTEXT if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { LOCK.lock(); try { node = contextNameNodeMap.get(name); if (node == null) { if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { // 创建一个EntranceNode node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); // Add entrance node.将新建的node添加到ROOT Constants.ROOT.addChild(node); // 将新建node写入到缓存map // 为了防止“迭代稳定性问题”——iterate stable——对于共享集合的写操作 Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { LOCK.unlock(); } } } // 将context的name与entranceNode封装为context context = new Context(node, name); // 初始化context的来源 context.setOrigin(origin); // 将context写入到ThreadLocal contextHolder.set(context); } return context; }
.
private static final Object LOCK = new Object();
// 查找 SlotChain ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { // 从缓存map中获取当前资源的SlotChain // 缓存map的key为资源,value为其相关的SlotChain ProcessorSlotChain chain = chainMap.get(resourceWrapper); // DCL 双重检测锁 // 若缓存中没有相关的SlotChain,则创建一个并放入到缓存 if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. // 缓存map的size >= chain数量最大阈值,则直接返回null,不再创建新的chain if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // 创建新的chain chain = SlotChainProvider.newSlotChain(); // 防止迭代稳定性问题 Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }
.
// 创建新的 SlotChain public static ProcessorSlotChain newSlotChain() { // 若builder不为null,则直接使用builder构建一个chain,否则先创建一个builder if (slotChainBuilder != null) { return slotChainBuilder.build(); } // Resolve the slot chain builder SPI. // 通过SPI方式创建一个builder slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault(); // 若通过SPI方式未能创建builder,则手工new一个DefaultSlotChainBuilder if (slotChainBuilder == null) { // Should not go through here. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName()); } // 构建一个chain return slotChainBuilder.build(); }
对资源进行操作
@Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { // 转向下一个节点 first.transformEntry(context, resourceWrapper, t, count, prioritized, args); }
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; // 进入下个节点,调用 NodeSelectorSlot 节点执行,官方示意图中第一个节点 entry(context, resourceWrapper, t, count, prioritized, args); }
NodeSelectorSlot,负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降
@Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { // 从缓存中获取DefaultNode DefaultNode node = map.get(context.getName()); // DCL 双重检测锁 if (node == null) { synchronized (this) { node = map.get(context.getName()); if (node == null) { // 创建一个DefaultNode,并放入缓存map node = new DefaultNode(resourceWrapper, null); HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; // Build invocation tree // 将新建node添加到调用树中 ((DefaultNode) context.getLastNode()).addChild(node); } } } context.setCurNode(node); // 触发下一个节点 ClusterBuilderSlot fireEntry(context, resourceWrapper, node, count, prioritized, args); }
StatisticSlot 统计使用的
@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // Do some checking. // 调用SlotChain中后续的所有Slot,完成所有规则检测 // 其在执行过程中可能会抛出异常,例如,规则检测未通过,抛出BlockException fireEntry(context, resourceWrapper, node, count, prioritized, args); // Request passed, add thread count and pass count. // 代码能走到这里,说明前面所有规则检测全部通过,此时就可以将该请求统计到相应数据中了 // 增加线程数据 node.increaseThreadNum(); // 增加通过的请求数量 node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) { // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // Add block count. node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { // Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; } }
>>>>>>>>>> 接下一篇学习: <<<<<<<<<<<