skywalking源码改造
1.需求描述:
在skywalking收集到业务请求日志时,可以将特定的数据转发到自己的服务中
2.思路:
使用动态代理在skywalking处理数据时调用kafka组件,将数据转发到kafka中,在自己的服务中使用监听器进行监听
3.实现:
skywalking在启动时,会将所有的实体module进行初始化。在初始化时,将kafka组件注入并且为每一个module添加一个动态代理(红色为修改的代码)
@SuppressWarnings("unchecked")
private void loadConfig(ApplicationConfiguration configuration) throws ConfigFileNotFoundException {
try {
log.info("begin init kafka plugin--------------!");
RecordStreamProcessor.getInstance().setArgInterceptorAspect(new ArgInterceptorAspect(ApplicationConfigLoader.class));
log.info("end init kafka plugin--------------!");
Reader applicationReader = ResourceUtils.read("application.yml");
Map<String, Map<String, Object>> moduleConfig = yaml.loadAs(applicationReader, Map.class);
if (CollectionUtils.isNotEmpty(moduleConfig)) {
selectConfig(moduleConfig);
moduleConfig.forEach((moduleName, providerConfig) -> {
if (providerConfig.size() > 0) {
log.info("Get a module define from application.yml, module name: {}", moduleName);
ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(
moduleName);
providerConfig.forEach((providerName, config) -> {
log.info(
"Get a provider define belong to {} module, provider name: {}", moduleName,
providerName
);
final Map<String, ?> propertiesConfig = (Map<String, ?>) config;
final Properties properties = new Properties();
if (propertiesConfig != null) {
propertiesConfig.forEach((propertyName, propertyValue) -> {
if (propertyValue instanceof Map) {
Properties subProperties = new Properties();
((Map) propertyValue).forEach((key, value) -> {
subProperties.put(key, value);
replacePropertyAndLog(key, value, subProperties, providerName);
});
properties.put(propertyName, subProperties);
} else {
properties.put(propertyName, propertyValue);
replacePropertyAndLog(propertyName, propertyValue, properties, providerName);
}
});
}
moduleConfiguration.addProviderConfiguration(providerName, properties);
});
} else {
log.warn(
"Get a module define from application.yml, but no provider define, use default, module name: {}",
moduleName
);
}
});
}
} catch (FileNotFoundException e) {
throw new ConfigFileNotFoundException(e.getMessage(), e);
}catch (Exception e){
e.printStackTrace();
}
}
private ArgInterceptorAspect argInterceptorAspect;
@SuppressWarnings("unchecked")
@Override
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) throws StorageException {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRecordDAO recordDAO;
try {
StorageBuilder storageBuilder = stream.builder().newInstance();
StorageBuilder proxyStorageBuilder = (StorageBuilder) Proxy.newProxyInstance(RecordStreamProcessor.class.getClassLoader(),
new Class<?>[]{StorageBuilder.class},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//切面类需要执行的方法
argInterceptorAspect.handle( JSONObject.toJSONString(args[0]));
Object object = method.invoke(storageBuilder, args);
return object;
}
});
recordDAO = storageDAO.newRecordDao(proxyStorageBuilder);
} catch ( InstantiationException | IllegalAccessException e ) {
throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
}
ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
Model model = modelSetter.add(
recordClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true);
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);
workers.put(recordClass, persistentWorker);
}