【Java】【Flume】Flume-NG源代码分析的启动过程(两)

本节分析配置文件的解析,即PollingPropertiesFileConfigurationProvider.FileWatcherRunnable.run中的eventBus.post(getConfiguration())。分析getConfiguration()方法。

此方法在AbstractConfigurationProvider类中实现了,而且这个类也初始化了三大组件的工厂类:this.sourceFactory = new DefaultSourceFactory();this.sinkFactory
= new DefaultSinkFactory();this.channelFactory = new DefaultChannelFactory()。

  getConfiguration()的详细代码例如以下:  

【Java】【Flume】Flume-NG源代码分析的启动过程(两)
public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();//三大组件
//载入配置文件,PropertiesFileConfigurationProvider中,解析配置文件,得出代理名字,sources...,各个配置属性和值
FlumeConfiguration fconfig = getFlumeConfiguration();
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());//配置文件
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
loadChannels(agentConf, channelComponentMap);
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames =
new HashSet<String>(channelComponentMap.keySet());
for(String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.
get(channelName);
if(channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap = channelCache.
get(channelComponent.channel.getClass());
if(nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}
【Java】【Flume】Flume-NG源代码分析的启动过程(两)

  1、SimpleMaterializedConfiguration对象构造了三大组件

 public SimpleMaterializedConfiguration() {
channels = new HashMap<String, Channel>();
sourceRunners = new HashMap<String, SourceRunner>();
sinkRunners = new HashMap<String, SinkRunner>();
}

  2、getFlumeConfiguration()方法是在AbstractConfigurationProvider的子类PropertiesFileConfigurationProvider中实现了。这种方法读取配置文件,然后解析成name(输姓名全称,即等号左側的所有)、value(等号的右側)对,存入一个Map其中。返回一个封装了这个Map的FlumeConfiguration对象。

FlumeConfiguration类的构造函数会遍历这个Map的所有<name,value>对,调用addRawProperty(String
name, String value)处理<name,value>对。假设为false就忽略了。遍历完后调用validateConfiguration()来验证和删除配置不当组件。

  一、addRawProperty方法会先做一些合法性检查。首次启动Flume会构造一个AgentConfiguration对象aconf。然后agentConfigMap.put(agentName, aconf),以后动态载入配置文件时仅仅须要AgentConfiguration aconf = agentConfigMap.get(agentName)就能够得到,然后调用aconf.addProperty(configKey, value)处理,configKey是配置文件里等号左側去掉agent名字和点之后的内容。agentConfigMap是封装了该agent和其全部组件的配置信息的一个Map。    

  (1)addProperty(configKey, value)方法首先会依次推断是否是sources、sinks、channels、sinkgroups四大组件的总配,不同意反复,将相应的value赋值给这四个String类型的对象。

比如:

    caiji-agent.sources = log-source

    caiji-agent.sinks = avro-sink16 avro-sink15

    caiji-agent.channels = mem-channel

    caiji-agent.sinkgroups = sg

  ps:以上是举例,到这一步时事实上已经没了"caiji-agent."这个字段了。举一个详细的匹配样例。其它3个和这个同样仅仅是匹配内容不同而已,代码例如以下:  

【Java】【Flume】Flume-NG源代码分析的启动过程(两)
  if (key.equals(BasicConfigurationConstants.CONFIG_SOURCES)) {  //等于sources,在此是配置组件名称时
if (sources == null) {
sources = value;
return true;
} else { //反复指定source
logger
.warn("Duplicate source list specified for agent: " + agentName);
errorList.add(new FlumeConfigurationError(agentName,
BasicConfigurationConstants.CONFIG_SOURCES,
FlumeConfigurationErrorType.DUPLICATE_PROPERTY,
ErrorOrWarning.ERROR));
return false;
}
}
【Java】【Flume】Flume-NG源代码分析的启动过程(两)

  BasicConfigurationConstants.CONFIG_SOURCES的值是sources。

  (2)、addProperty(configKey, value)中假设configKey參数均不是上述四大组件总配,则是详细的单个组件的详细參数配置。

则调用parseConfigKey(String key, String prefix)方法来推断解析sources、sinks、channels、sinkgroups的详细组件,还是举一个代码样例。其它3个和此同样:

【Java】【Flume】Flume-NG源代码分析的启动过程(两)
  ComponentNameAndConfigKey cnck = parseConfigKey(key,
BasicConfigurationConstants.CONFIG_SOURCES_PREFIX); if (cnck != null) {
// it is a source
String name = cnck.getComponentName();
Context srcConf = sourceContextMap.get(name); //这个map,key是组件名字,value是其相应的配置属性context if (srcConf == null) {
srcConf = new Context();
sourceContextMap.put(name, srcConf);//j将组件放入map
} srcConf.put(cnck.getConfigKey(), value); //将属性名和相应的值放入context
return true;
}
【Java】【Flume】Flume-NG源代码分析的启动过程(两)

  BasicConfigurationConstants.CONFIG_SOURCES_PREFIX的值是"sources.",注意这个带点。另外,这里还会构造相应四大组件的四个存储配置信息的Map<StringContext>
 :sourceContextMap、channelContextMap、sinkContextMap和sinkGroupContextMap,这四个Map分别存储相应的总配信息中指定个数的组件的相应配置信息,比方上述总配信息中,sourceContextMap、channelContextMap和sinkGroupContextMap依次存储着<log-source。log-source的配置信息>、<mem-channel,mem-channel的配置信息>、<sg,sg的配置信息>以及sinkContextMap的<avro-sink16。avro-sink16的配置信息>、<avro-sink15,avro-sink15的配置信息>。

  parseConfigKey(String key, String prefix)中的prefix用来鉴别是什么类型的组件。

这种方法返回一个封装了解析后的(name是组件名称, configKey相应的组件属性名称)的对象。

  二、validateConfiguration()来验证和删除配置不当组件。

此方法会遍历agentConfigMap中的每一个agent,推断agent相应的配置文件是否合法aconf.isValid(),不合法就从agentConfigMap中删除这个agent。

aconf.isValid()是AgentConfiguration.isValid()方法。

  (1)、会先推断channels是否为空,不为空的话,就推断channels组件集channelSet是否合法channelSet = validateChannels(channelSet)。validateChannels就是遍历全部的channel:先推断是否有相应的配置信息context,没有的话就删除组件;有相应配置信息的话,再推断是否是flume内置的type类型,还是自己定义的,假设是自己定义的还要推断是否有外部的config配置类,假设没有有配置config參数指定外部配置类,则自己定义的type会自己主动设置为OTHER。否则config设置为指定的外部配置类。

  构造ChannelConfiguration对象conf(这个类继承自ComponentConfiguration)=ComponentConfigurationFactory.create(channelName, config, ComponentType.CHANNEL)。当中config指的是配置类,假设配置了就会依据配置类进行初始化返回一个配置类对象。此时不会设置conf.isNotFoundConfigClass();假设没有配置config參数。默认的类型及自己定义的类型都会爆异常。在异常处理时,则返回的都是一个instance
= ChannelConfiguration(name)对象且内置类型会instance.setNotFoundConfigClass()。由于内置的channel也没实现配置类,自己定义的类型不会设置conf.isNotFoundConfigClass()。

  然后conf.configure(channelContext)运行配置,在这说明内置的channel通过封装使得Context配置信息变成ComponentConfiguration配置信息;假设是自己定义的类型且有外部配置类即有config參数时,会将这个channel对象及配置信息放入channelConfigMap中。否则,没有config參数的channel对象,包含内置的以及自己定义没有外围配置类config的,存放在newContextMap中。然后是重置channelContextMap =
newContextMap,重置的目的是为了以后分类对内置和自己定义的channel分别做处理,这在后面要讲的loadChannels时会用到。

然后返回的内容是channelContextMap 和channelConfigMap中的key值的综合与channelSet的交集部分。

  validateChannels(channelSet)方法会终于返回一个两个set(一个是从总配中解析出来的。另外一个是从channelContextMap中解析出来的,后者相应配置文件的实际配置信息。由于存在:1、在总配中声明组件但在详细配置时没有配置。2、没有在总配中声明的组件,但在详细配置时有详细配置信息)的交集。

  (2)、validateSources(channelSet)和(1)的大体思路是一样的。仅仅只是须要获取在总配中的sources和每一个source的channels(在这可能指定多个channel)的交集,并将交集又一次配置:srcContext.put(BasicConfigurationConstants.CONFIG_CHANNELS,this.getSpaceDelimitedList(channels));//将set转化为String

  另外在srcConf.configure(srcContext)中除了获取该source的channels之外,还会对selector(默认是REPLICATING)进行配置。须要时再讲。channelSet(通过检查合格的channel。活动的channel)作为參数传进来的目的是要配置文件里source相应的channels取交集。除去在配置文件里无效的channel。

  方法的返回值的思路參考(1)。  值得注意的是,从代码能够看出每一个agent能够配置多个source。证据在此:Set<String> sourceSet =new HashSet<String>(Arrays.asList(sources.split("\\s+")))。sourceSet的组成也分两部分:一部分是有指定config外部配置的sourceConfigMap仅仅可能是自己定义的组件非内置的,还有一部分是没有指定參数config的source,后者可能包含自己定义的以及内置的。

  (3)、validateSinks(channelSet)和(2)基本类似,仅仅只是每一个sink仅仅能配一个channel,不像source能够有多个”爱妾“,全部在这仅仅须要推断sink的channel是否包括在channelSet之中:channelSet.contains(sinkConf.getChannel())。不包括就异常退出。

其它可參考(1)的思路。

返回的sinkSet也分为两部分可參考(2)中返回的sourceSet的两部分组成。

  (4)、validateGroups(sinkSet)遍历全部sinkgroups。获取合法的sink,并将正确的sinkgroupConfigMap.put(sinkgroupName, conf)。

conf =(SinkGroupConfiguration) ComponentConfigurationFactory.create(sinkgroupName, "sinkgroup", ComponentType.SINKGROUP)已不像上述三种组件有外围的配置类。sinkgroup没有自己定义的功能。也就没有指定外围配置类的功能。所以是固定的"sinkgroup",该方法返回的是SinkGroupConfiguration(name)。conf.configure(context)会获取配置文件里的sinkgroups,并对processor的配置类进行配置。

validGroupSinks(sinkSet, usedSinks, conf)方法删除不符合条件的:1、和已知的其它sinkgroup有同样的sink;2、使用了非活动的sink。满足这俩种不论什么一个相应的sink将会被删除。解析出来的sinkgroup(有可能为null。可能少一个或者多个。又或者都满足)都同一存入sinkgroupConfigMap,和其它三个组件有所不同。其它三个组件的ConfigMap都是存放自己定义且有外围实现配置类的。

  以上4个组件均能够在总配中配置多项,且上述4个方法的返回值均是符合要求的组件,去除了声明可是没配置的以及配置但没声明等组件。

  (5)、再推断返回的sink和source是否为空。

  (6)、将合法的四个组件均转换为String。getSpaceDelimitedList(Set<String> entries)就是将set转换为String。

     this.sources = getSpaceDelimitedList(sourceSet);
this.channels = getSpaceDelimitedList(channelSet);
this.sinks = getSpaceDelimitedList(sinkSet);
this.sinkgroups = getSpaceDelimitedList(sinkgroupSet);

  (7)、终于返回true。validateConfiguration()中若返回的是false则删除此agent。

  这样就完毕了FlumeConfiguration对象的构造,本文開始的2步骤中getFlumeConfiguration()也得到了。

  3、loadChannels(agentConf, channelComponentMap)方法。

该方法首先是会先保存旧的channels。从channelCache复制到channelsNotReused暂存。

然后是获取channelNames=agentConf.getChannelSet()和compMap=agentConf.getChannelConfigMap()。前者是全部的channel的名字。后者是全部chennel对象中有在配置文件里配置config參数,即外部配置类的channel配置信息,所以Configurables.configure(channel,
comp)会对有config外围配置类的进行配置。           agentConf.getChannelContext().get(chName)则是获取没有外围配置类(没有configcan參数。包含自己定义的及内置的)的channel。自己定义的必须实现Configurable接口。所以Configurables.configure(channel, context)会起作用对自己定义的channel进行配置。我们再自己定义或者在看源代码时看到的configure(context)方法会在此时调用。

在上述两次Configurables.configure分别会调用一次getOrCreateChannel方法,该方法除了返回指定的channel之外,还会将和channelsNotReused内同名的channel删除,这样保证channelsNotReused中是没有又一次使用的channel,使得最后从channelCache
删除。但channelCache中可能还存在已失效的channel,因此须要依据channelsNotReused剩余的从channelCache中所有删除,可使channelCache中缓存的就是正在使用的channel。

由于channelSet可能会包括两种:一种就是内置的;一种就是自己定义的。所以就分两种类型初始化并配置。  

  channelComponentMap则是全部(包含内置和自己定义(假设有的话))channel的配置信息。一般来说,自己定义channel非常少,内置的channel类型能满足绝大说的情况。

  由此,我们也能够看出外部的配置类至少须要具备下面几个条件:一、必须有configure(ComponentConfiguration conf)方法;二、实现ConfigurableComponent接口。三、必须有String类型做參数的构造方法。

  ChannelComponent类用来channel(对象)及其相应的sink和source(名字)。

是channelComponentMap中value。key是channel的名字。

  channelsNotReused存在的意义就是当动态载入的时候可以清除channelCache中不在新的配置文件里的channel。

  4、loadSources(agentConf, channelComponentMap, sourceRunnerMap)方法。

loadSources和loadChannels有一些相似。

  首先会获取getSourceSet()source集合。以及getSourceConfigMap()有外部配置类的channel,然后遍历soureSet对有外部配置类的source创建相应的source对象,并运行source的configure(context)方法进行配置。然后对每一个source获取相应的channels,兵构造ChannelProcessor,运行ChannelProcessor.configure方法。依据source的类型构造SourceRunner。

【Java】【Flume】Flume-NG源代码分析的启动过程(两)
public static SourceRunner forSource(Source source) {
SourceRunner runner = null; if (source instanceof PollableSource) {
runner = new PollableSourceRunner();
((PollableSourceRunner) runner).setSource((PollableSource) source);
} else if (source instanceof EventDrivenSource) {
runner = new EventDrivenSourceRunner();
((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
} else {
throw new IllegalArgumentException("No known runner type for source "
+ source);
} return runner;
}
【Java】【Flume】Flume-NG源代码分析的启动过程(两)

  从上述代码能够看到source有两种。一种是实现PollableSource的就构造PollableSourceRunner;第二种是实现EventDrivenSource接口的,就构造EventDrivenSourceRunner。两种的差别。在这里中有说明。然后将source相应的ChannelComponent增加:channelComponent.components.add(sourceName)。

  其次是getSourceContext(),对没有外部配置类的source进行载入。

Configurables.configure(source, context)将会调用source.configure(context)对source自身进行配置。其他和上面基本同样。

  參数sourceRunnerMap则是保存了全部soure运行的方式。

  5、loadSinks(agentConf, channelComponentMap, sinkRunnerMap)方法。载入sink的过程中也是分两部分:

  首先是对有外部配置类的sink,先构造Sink对象,然后对其调用sink.configure方法进行參数配置。然后检查此sink是否有channel相连,接着将sinkName与sink一同添加sinks,并对对应的channel添加组件信息channelComponent.components.add(sinkName)。

  其次就是对没有指定外部配置类的sink进行和上述相同的操作。仅仅只是Configurables.configure(sink, context)是调用的source的configure运行參数配置。

  最后调用loadSinkGroups(agentConf, sinks, sinkRunnerMap)

  6、loadSinkGroups(agentConf, sinks, sinkRunnerMap)方法,用于载入sinkGroups。sinkgroup与sink、source、channel不同,没有外部配置类,故仅仅有getSinkGroupConfigMap()。载入sinkGroup方法首先会遍历全部的sinkgroup,获取每一个sinkGroup相应的sink,然后构造SinkGroup对象,并对其进行參数配置Configurables.configure(group, groupConf),sinkRunnerMap.put(comp.getComponentName(),new
SinkRunner(group.getProcessor()))这句代码则是将sinkgroup放入sinkRunnerMap,group.getProcessor()是获取processor的类型(null、org.apache.flume.sink.FailoverSinkProcessor容错、org.apache.flume.sink.DefaultSinkProcessor默认、org.apache.flume.sink.LoadBalancingSinkProcessor负载均衡。这四种之中的一个)。

  然后对全部的sink遍历,假设sink没有參与sinkgroup则使用默认DefaultSinkProcessor,构造SinkProcessor对象。对SinkProcessor进行參数配置

Configurables.configure(pr, new Context())然后增加sinkRunnerMap.put(entry.getKey(),new SinkRunner(pr))。

  7、检查每一个channel的channelComponent.components是否为空,为空则表明没有和这个channel相连接的组件应该删除。否则将全部的channel、SourceRunner、SinkRunner增加1中的MaterializedConfiguration对象。  

【Java】【Flume】Flume-NG源代码分析的启动过程(两)
    ...
    conf.addChannel(channelName, channelComponent.channel);
    ...
    for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
【Java】【Flume】Flume-NG源代码分析的启动过程(两)

  由代码可知,将解析出来的都存储进MaterializedConfiguration的三大组件,即1中的三大组件。

source与channel的相应关系存储在SourceRunner中的source中的channelProcessor中的selector中。sink与channel的关系在sink.setChannel(channelComponent.channel)设定。

  8、清空channelComponentMap、sourceRunnerMap、sinkRunnerMap。

  9、返回MaterializedConfiguration对象conf。

  

  接下来就返回到PollingPropertiesFileConfigurationProvider.FileWatcherRunnable.run()方法中的eventBus.post(getConfiguration())。会通知Application.handleConfigurationEvent(MaterializedConfiguration conf)方法。下一篇再讲这个。

总结:1、上面有指定外部配置类的必然是自己定义的组件;2、每一个配置文件能够配置多个agent。用命令选择使用哪个,每一个agent能够配置多个source、多个sink、多个channel、多个sinkgroups,可是每一个sink仅仅能相应一个sink、每一个source能够相应多个channel、每一个channel能够相应多个sink也能够相应多个source。

问题:1、为什么仅仅有channel有channelCache。source没有sourceCache。sink没有sinkCache??

   答:可是loadChannels方法的最后会将不再反复用的channel从channelCache中删除,每次调用loadChannels方法都会尝试去删除不再重用的channel,我觉得是channel相对于其它组件比較少定制。变化也少,缓存后当又一次载入配置文件时能够马上从缓存中获取channel(假设有的话)这样能够节省一些时间。source和sink都是易变的组件因此每次都又一次载入。(这样是不是有点牵强,还是我理解错了?)

版权声明:本文博主原创文章。博客,未经同意不得转载。

上一篇:ajax.js


下一篇:ABAP-FI-Coding block激活问题