Sentinel 之 核心源码

结构图:Sentinel 之 核心源码

 

 

源码分析:(温馨提示,以下源码部分请使用 IDEA  打开源码结合查看阅读,否则荣誉晕)

入口:

@SentinelResource 注解的切面 :

@Aspect   //  AspectJ切面
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
 
    // 指定切入点为@SentinelResource注解
    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }
 
    // 指定此为环绕通知 around advice
    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        Method originMethod = resolveMethod(pjp);
 
        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        if (annotation == null) {
            // Should not go through here.
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }
        String resourceName = getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        int resourceType = annotation.resourceType();
        Entry entry = null;
        try {
            // 要织入的、增强的功能
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            // 调用目标方法
            Object result = pjp.proceed();
            return result;
        } catch (BlockException ex) {
            return handleBlockException(pjp, annotation, ex);
        } catch (Throwable ex) {
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            // The ignore list will be checked first.
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                traceException(ex);
                return handleFallback(pjp, annotation, ex);
            }
 
            // No fallback function can handle the exception, so throw it out.
            throw ex;
        } finally {
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }
}

.

public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
    throws BlockException {
    // 注意第4个参数值为1
    return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
}

.

@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
    throws BlockException {
    // count参数:表示当前请求可以增加多少个计数,默认值 1
    // 注意第5个参数为false
    return entryWithType(name, resourceType, entryType, count, false, args);
}
 
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
                           Object[] args) throws BlockException {
    // 将信息封装为一个资源对象
    StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
    // 返回一个资源操作对象entry
    // prioritized若为true,则表示当前访问必须等待“根据其优先级计算出的时间”后才可通过
    // prioritized若为false,则当前请求无需等待
    return entryWithPriority(resource, count, prioritized, args);
}

.

/**
 *
 * @param resourceWrapper  资源实例
 * @param count  默认值为1
 * @param prioritized  默认值为false
 * @param args
 * @return
 * @throws BlockException
 */
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
    throws BlockException {
    // 从ThreadLocal中获取context
    // 即一个请求会占用一个线程,一个线程会绑定一个context
    Context context = ContextUtil.getContext();
    // 若context是NullContext类型,则表示当前系统中的context数量已经超出的阈值
    // 即访问请求的数量已经超出了阈值。此时直接返回一个无需做规则检测的资源操作对象
    if (context instanceof NullContext) {
        // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
        // so here init the entry only. No rule checking will be done.
        return new CtEntry(resourceWrapper, null, context);
    }
 
    // 若当前线程中没有绑定context,则创建一个context并将其放入到ThreadLocal
    if (context == null) {
        // Using default context.
    //CONTEXT_DEFAULT_NAME=sentinel_default_context,如果没有显示的创建上下文对象,则创建默认的
        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
    }
 
    // 若全局开关是关闭的,则直接返回一个无需做规则检测的资源操作对象
    // Global switch is close, no rule checking will do.
    if (!Constants.ON) {
        return new CtEntry(resourceWrapper, null, context);
    }
 
    // 查找SlotChain,返回 DefaultProcessorSlotChain 的实例对象
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
 
    /*
     * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
     * so no rule checking will be done.
     */
    // 若没有找到chain,则意味着chain数量超出了阈值,则直接返回一个无需做规则检测的资源操作对象
    if (chain == null) {
        return new CtEntry(resourceWrapper, null, context);
    }
 
    // 创建一个资源操作对象 实现类为:DefaultProcessorSlotChain
    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        // 对资源进行操作
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        // This should not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}

.

privatestaticfinalReentrantLockLOCK=newReentrantLock();
tranceOne", "appA");
 *
 * @param name
 * @param origin
 * @return
 */
protected static Context trueEnter(String name, String origin) {
    // 尝试着从ThreadLocal中获取Context
    Context context = contextHolder.get();
    // 若ThreadLocal中没有context,则尝试着从缓存map中获取
    if (context == null) {
        // 缓存map的key为context名称,value为EntranceNode
        Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
        // 获取EntranceNode——双重检测锁DCL——为了防止并发创建
        DefaultNode node = localCacheNameMap.get(name);
        if (node == null) {
            // 若缓存map的size 大于 context数量的最大阈值,则直接返回NULL_CONTEXT
            if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                setNullContext();
                return NULL_CONTEXT;
            } else {
                LOCK.lock();
                try {
                    node = contextNameNodeMap.get(name);
                    if (node == null) {
                        if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                            setNullContext();
                            return NULL_CONTEXT;
                        } else {
                            // 创建一个EntranceNode
                            node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                            // Add entrance node.将新建的node添加到ROOT
                            Constants.ROOT.addChild(node);
 
                            // 将新建node写入到缓存map
                            // 为了防止“迭代稳定性问题”——iterate stable——对于共享集合的写操作
                            Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                            newMap.putAll(contextNameNodeMap);
                            newMap.put(name, node);
                            contextNameNodeMap = newMap;
                        }
                    }
                } finally {
                    LOCK.unlock();
                }
            }
        }
        // 将context的name与entranceNode封装为context
        context = new Context(node, name);
        // 初始化context的来源
        context.setOrigin(origin);
        // 将context写入到ThreadLocal
        contextHolder.set(context);
    }
 
    return context;
}

.

private static final Object LOCK = new Object();
// 查找 SlotChain
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    // 从缓存map中获取当前资源的SlotChain
    // 缓存map的key为资源,value为其相关的SlotChain
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    // DCL 双重检测锁
    // 若缓存中没有相关的SlotChain,则创建一个并放入到缓存
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.
                // 缓存map的size >= chain数量最大阈值,则直接返回null,不再创建新的chain
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }
 
                // 创建新的chain
                chain = SlotChainProvider.newSlotChain();
 
                // 防止迭代稳定性问题
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

.

// 创建新的 SlotChain
public static ProcessorSlotChain newSlotChain() {
    // 若builder不为null,则直接使用builder构建一个chain,否则先创建一个builder
    if (slotChainBuilder != null) {
        return slotChainBuilder.build();
    }
 
    // Resolve the slot chain builder SPI.
    // 通过SPI方式创建一个builder
    slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
 
    // 若通过SPI方式未能创建builder,则手工new一个DefaultSlotChainBuilder
    if (slotChainBuilder == null) {
        // Should not go through here.
        RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
        slotChainBuilder = new DefaultSlotChainBuilder();
    } else {
        RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
            slotChainBuilder.getClass().getCanonicalName());
    }
    // 构建一个chain
    return slotChainBuilder.build();
}

对资源进行操作

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
    throws Throwable {
    // 转向下一个节点
    first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
    throws Throwable {
    T t = (T)o;
    // 进入下个节点,调用 NodeSelectorSlot 节点执行,官方示意图中第一个节点
    entry(context, resourceWrapper, t, count, prioritized, args);
}

NodeSelectorSlot,负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
    throws Throwable {
    // 从缓存中获取DefaultNode
    DefaultNode node = map.get(context.getName());
    // DCL 双重检测锁
    if (node == null) {
        synchronized (this) {
            node = map.get(context.getName());
            if (node == null) {
                // 创建一个DefaultNode,并放入缓存map
                node = new DefaultNode(resourceWrapper, null);
                HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                cacheMap.putAll(map);
                cacheMap.put(context.getName(), node);
                map = cacheMap;
                // Build invocation tree
                // 将新建node添加到调用树中
                ((DefaultNode) context.getLastNode()).addChild(node);
            }
 
        }
    }
 
    context.setCurNode(node);
    // 触发下一个节点 ClusterBuilderSlot
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

StatisticSlot 统计使用的

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    try {
        // Do some checking.
        // 调用SlotChain中后续的所有Slot,完成所有规则检测
        // 其在执行过程中可能会抛出异常,例如,规则检测未通过,抛出BlockException
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
 
        // Request passed, add thread count and pass count.
        // 代码能走到这里,说明前面所有规则检测全部通过,此时就可以将该请求统计到相应数据中了
        // 增加线程数据
        node.increaseThreadNum();
        // 增加通过的请求数量
        node.addPassRequest(count);
 
        if (context.getCurEntry().getOriginNode() != null) {
            // Add count for origin node.
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }
 
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }
 
        // Handle pass event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (PriorityWaitException ex) {
        node.increaseThreadNum();
        if (context.getCurEntry().getOriginNode() != null) {
            // Add count for origin node.
            context.getCurEntry().getOriginNode().increaseThreadNum();
        }
 
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
        }
        // Handle pass event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (BlockException e) {
        // Blocked, set block exception to current entry.
        context.getCurEntry().setBlockError(e);
 
        // Add block count.
        node.increaseBlockQps(count);
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseBlockQps(count);
        }
 
        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseBlockQps(count);
        }
 
        // Handle block event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onBlocked(e, context, resourceWrapper, node, count, args);
        }
 
        throw e;
    } catch (Throwable e) {
        // Unexpected internal error, set error to current entry.
        context.getCurEntry().setError(e);
 
        throw e;
    }
}

 

>>>>>>>>>>  接下一篇学习: <<<<<<<<<<<

 

上一篇:Sentinel -- sentinel控制台的简单应用,java核心技术电子版下载


下一篇:Redis集群模式