sentinel-initFunc
Sentinel提供了一套完整的流控配置UI界面,用于配置流控的各个参数,阈值。其实现的大体逻辑是引入一个web服务,这个web服务可以接收界面上的请求参数,来初始化阈值,流控规则等。并且与控制台服务sentinel-dashboard保持心跳。
那么这个web服务又是如何启动监听,初始化业务handler,以及保存流控规则的呢?将会在下面的源码中讲解。
在sentinel-core的Env中 执行一个静态方法快
public class Env { public static final Sph sph = new CtSph(); static { // If init fails, the process will exit. InitExecutor.doInit(); } }
还是通过SPI的方式获取所有的InitFunc的实现,执行其init方法,接下来看一下这个实现类 com.alibaba.csp.sentinel.metric.extension.MetricCallbackInit
public class MetricCallbackInit implements InitFunc { @Override public void init() throws Exception { StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(), new MetricEntryCallback()); StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(), new MetricExitCallback()); } }
注册了一个entry的回调和exit的回调,内部还是通过SPI的方式获取到MetricExtension的实现,来实现一些扩展计量器的addPass方法等
public class MetricEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> { @Override public void onPass(Context context, ResourceWrapper rw, DefaultNode param, int count, Object... args) throws Exception { for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { if (m instanceof AdvancedMetricExtension) { ((AdvancedMetricExtension) m).onPass(rw, count, args); } else { m.increaseThreadNum(rw.getName(), args); m.addPass(rw.getName(), count, args); } } } @Override public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count, Object... args) { for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) { if (m instanceof AdvancedMetricExtension) { ((AdvancedMetricExtension) m).onBlocked(resourceWrapper, count, context.getOrigin(), ex, args); } else { m.addBlock(resourceWrapper.getName(), count, context.getOrigin(), ex, args); } } } }
当然,在sentinel-core中没有注册默认的MetricExtension。
控制台server的实现
依赖这部分的初始化方法,在自己的项目中引入sentinel-transport。在sentinel-transport-common的resources中InitFunc定义了两个初始化实现
com.alibaba.csp.sentinel.transport.init.CommandCenterInitFunc
com.alibaba.csp.sentinel.transport.init.HeartbeatSenderInitFunc
CommandCenterInitFunc
@InitOrder(-1) public class CommandCenterInitFunc implements InitFunc { @Override public void init() throws Exception { CommandCenter commandCenter = CommandCenterProvider.getCommandCenter(); if (commandCenter == null) { RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter"); return; } // 添加handler处理器 commandCenter.beforeStart(); // 启动服务端 commandCenter.start(); RecordLog.info("[CommandCenterInit] Starting command center: " + commandCenter.getClass().getCanonicalName()); } }
在上面的SPI的处理中获CommandCenter的实现,假如引入的是sentinel-transport-simple-http,在其resource中com.alibaba.csp.sentinel.transport.command.SimpleHttpCommandCenter
然后处理beforeStart和start方法,做的事初始化请求处理的各个handler以及启动server监听,很简单这边不做过多阐述,读者可以自行查阅代码。
HeartbeatSenderInitFunc
还是假如引入的是sentinel-transport-simple-http,在其resource中com.alibaba.csp.sentinel.transport.heartbeat.SimpleHttpHeartbeatSender
每隔10会发送一次心跳
@Override public boolean sendHeartbeat() throws Exception { if (TransportConfig.getRuntimePort() <= 0) { RecordLog.info("[SimpleHttpHeartbeatSender] Command server port not initialized, won't send heartbeat"); return false; } Endpoint addrInfo = getAvailableAddress(); if (addrInfo == null) { return false; } SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath()); request.setParams(heartBeat.generateCurrentMessage()); try { SimpleHttpResponse response = httpClient.post(request); if (response.getStatusCode() == OK_STATUS) { return true; } else if (clientErrorCode(response.getStatusCode()) || serverErrorCode(response.getStatusCode())) { RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo + ", http status code: " + response.getStatusCode()); } } catch (Exception e) { RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addrInfo, e); } return false; }