skywalking源码改造

1.需求描述: 在skywalking收集到业务请求日志时,可以将特定的数据转发到自己的服务中   2.思路: 使用动态代理在skywalking处理数据时调用kafka组件,将数据转发到kafka中,在自己的服务中使用监听器进行监听   3.实现: skywalking在启动时,会将所有的实体module进行初始化。在初始化时,将kafka组件注入并且为每一个module添加一个动态代理(红色为修改的代码)   @SuppressWarnings("unchecked") private void loadConfig(ApplicationConfiguration configuration) throws ConfigFileNotFoundException { try { log.info("begin init kafka plugin--------------!"); RecordStreamProcessor.getInstance().setArgInterceptorAspect(new ArgInterceptorAspect(ApplicationConfigLoader.class)); log.info("end init kafka plugin--------------!"); Reader applicationReader = ResourceUtils.read("application.yml"); Map<String, Map<String, Object>> moduleConfig = yaml.loadAs(applicationReader, Map.class); if (CollectionUtils.isNotEmpty(moduleConfig)) { selectConfig(moduleConfig); moduleConfig.forEach((moduleName, providerConfig) -> { if (providerConfig.size() > 0) { log.info("Get a module define from application.yml, module name: {}", moduleName); ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule( moduleName); providerConfig.forEach((providerName, config) -> { log.info( "Get a provider define belong to {} module, provider name: {}", moduleName, providerName ); final Map<String, ?> propertiesConfig = (Map<String, ?>) config; final Properties properties = new Properties(); if (propertiesConfig != null) { propertiesConfig.forEach((propertyName, propertyValue) -> { if (propertyValue instanceof Map) { Properties subProperties = new Properties(); ((Map) propertyValue).forEach((key, value) -> { subProperties.put(key, value); replacePropertyAndLog(key, value, subProperties, providerName); }); properties.put(propertyName, subProperties); } else { properties.put(propertyName, propertyValue); replacePropertyAndLog(propertyName, propertyValue, properties, providerName); } }); } moduleConfiguration.addProviderConfiguration(providerName, properties); }); } else { log.warn( "Get a module define from application.yml, but no provider define, use default, module name: {}", moduleName ); } }); } } catch (FileNotFoundException e) { throw new ConfigFileNotFoundException(e.getMessage(), e); }catch (Exception e){ e.printStackTrace(); } }     private ArgInterceptorAspect argInterceptorAspect;   @SuppressWarnings("unchecked") @Override public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) throws StorageException { if (DisableRegister.INSTANCE.include(stream.name())) { return; }   StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); IRecordDAO recordDAO;   try { StorageBuilder storageBuilder = stream.builder().newInstance(); StorageBuilder proxyStorageBuilder = (StorageBuilder) Proxy.newProxyInstance(RecordStreamProcessor.class.getClassLoader(), new Class<?>[]{StorageBuilder.class}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //切面类需要执行的方法 argInterceptorAspect.handle( JSONObject.toJSONString(args[0])); Object object = method.invoke(storageBuilder, args); return object; } }); recordDAO = storageDAO.newRecordDao(proxyStorageBuilder);   } catch ( InstantiationException | IllegalAccessException e ) { throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e); }   ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class); Model model = modelSetter.add( recordClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true); RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);   workers.put(recordClass, persistentWorker); }  

上一篇:Skywalking内置Tags


下一篇:软件安装 -> STM32CubeMX