flume基本思想:
source负责收集数据,channel负责缓存数据,sink负责消费channel中的数据,具体使用方式这里不赘述
生命周期管理:
生命周期相关代码在flume-ng-core文件夹下的lifecycle子文件夹内
flume的所有组件(除了monitor service)都有生命周期的概念,主要作用是用来标记组件目前所属的状态。flume组件的生命周期有四个状态,分别是IDLE,START,STOP,ERROR,意义如下:
IDLE | 组件已经构造完成 |
START | 组件已经启动 |
STOP | 组件已经停止 |
ERROR | 组件发生了错误 |
其中对象本身需要实现LifecycleAware接口【见代码】,flume对组件初始化完成后会调用LifecycleSupervisor.supervise()将该组件加入监控,LifecycleSupervisor类内部对每一个组件会启动一个定时线程MonitorRunnable,在定时线程中调用组件的start/stop函数对组件进行控制
这里注意对组件stop函数的调用是在unsupervise里面进行的,而不是通过定时线程MonitorRunnable控制的
abstract public class AbstractSink implements Sink, LifecycleAware { private LifecycleState lifecycleState; public AbstractSink() {
lifecycleState = LifecycleState.IDLE;
} @Override
public synchronized void start() {
......
lifecycleState = LifecycleState.START;
......
} @Override
public synchronized void stop() {
......
lifecycleState = LifecycleState.STOP;
......
} @Override
public synchronized LifecycleState getLifecycleState() {
return lifecycleState;
} ...... }
flume 启动流程:
flume的main函数在flume-ng-node文件夹下Application类中
实际上启动流程的主要目的有两个:
1.通过配置文件构造组件,连接有关联的组件【hook】,将组件填入到MaterializedConfiguration类中
2.从MaterializedConfiguration类中获取组件,将组件注册到LifecycleSupervisor中,通过LifecycleSupervisor启动
可以看出MaterializedConfiguration的重要性,下面是其定义,可见其意义就是存储所有初始化好的组件并提供给程序调用:
public interface MaterializedConfiguration { public void addSourceRunner(String name, SourceRunner sourceRunner); public void addSinkRunner(String name, SinkRunner sinkRunner); public void addChannel(String name, Channel channel); public ImmutableMap<String, SourceRunner> getSourceRunners(); public ImmutableMap<String, SinkRunner> getSinkRunners(); public ImmutableMap<String, Channel> getChannels(); }
main函数依次做了以下几件事
1.解析命令行
2.寻找命令行里的配置文件路径,解析配置文件,并放在一个File对象里
3.调用AbstractConfigurationProvider实例,传入File配置文件构造对象,返回一个MaterializedConfiguration对象,该对象里存放所有的sink,sinkgroup,source,channel组件
4.调用application.start(),该函数将所有组件加入生命周期管理并注册启动monitor服务
5.向JVM注册一个函数,当进程结束时停止所有组件
这里要注意第三步其实根据命令行中的no-reload-conf参数【是否关闭重载配置文件】选择调用AbstractConfigurationProvider的派生类PollingPropertiesFileConfigurationProvider或者PropertiesFileConfigurationProvider。
其他几步都很直接,比较复杂的流程在调用AbstractConfigurationProvider,解析配置文件并初始化组件这一步。
调用AbstractConfigurationProvider的目的是初始化所有组件,挂好相互关联的钩子并将组件封装到MaterializedConfiguration对象中返回给上层。
AbstractConfigurationProvider类的功能比较好理解,关键在于flume支持配置重载,配置重载则channel组件涉及到是否复用的问题【重新生成channel可能会导致之前数据的丢失问题】。AbstractConfigurationProvider类的getConfiguration()函数主要处理了这一套逻辑
getConfiguration()函数的执行基本流程如下:
1.加载配置文件
2.load channels
3.load sources,这一块要传入2中load的channel,因为要挂channel和source之间的钩子【这一块实际上是将channel封装到ChannelProcessor中,将ChannelProcessor传入Source,关于ChannelProcessor的作用见下文Channel结构介绍】
4.load sinks,这一块要传入2中load的channel,因为要挂channel和sink之间的钩子
5.移除没有sink和source连接的channel【如果只有source/sink这里不会检测】
6.将sink,source,channel封装在MaterializedConfiguration对象中并返回,注意source是封装在SourceRunner类中,sink是封装在SinkRunner中。这里的定义是Runner负责与操作系统打交道,而source/sink专注于维护自身逻辑。SinkRunner中还要注意一点是一个Sink只能在一个SinkGroup里面。
注意:
1.加载配置文件时调用了FlumeConfiguration类,该类内部有对配置文件进行合法性校验,校验的错误列表可以通过getConfigurationErrors()接口获取
2.每个组件都有对应的Factor类(如ChannelFactor),通过调用Factor类的create()函数创建对象
3.load_channel函数中channel可以通过加上@Disposable声明表示不复用,重新加载配置时生成一个新的channel
4.细读load_channels,load_sources,load_sinks代码,可以发现加载组件时分了两种情况,has ComponentConfiguration object和do not hava a ComponentConfiguration object。这一块可以不用理会,细读配置文件逻辑好像全部都是后一种do not hava a ComponentConfiguration object
以上步骤执行完,flume就可以提供服务了。
flume各组件的内部结构:
组件的结构代码在flume-ng-core文件夹中
channel:
channel模块的作用是给数据提供中转的储藏地,由flume的架构图可以看出channel负责给source模块提供写数据接口,给sink模块提供读数据接口。实际实现中每一个source模块中都有一个ChannelProcessor,通过ChannelProcessor向Channel写入数据,而sink模块是直接通过Channel模块提供的接口读取。
为什么Source写入要通过封装的Processor进行?这里有两个原因:1.source模块写入的数据可能在写入Channel前需要进行过滤处理;2.source模块可能要将不同的数据写到不同的Channel上实现数据分发的效果。ChannelProcessor主要就是为了解决这两个问题存在的。以下是ChannelProcessor的结构图:
上图中:InterceptorChain模块主要解决问题1,source写入的数据可以经过InterceptorChain模块中一系列的过滤器依次处理,最终过滤出来的数据进入ChannelSelector模块,ChannelSelector解决了上面提到的问题2,提供了将不同数据分发到不同Channel的机制。
Channel中基本的数据单元是Event,每个Event包含有一个map<String, String>类型的header,每一个Interceptor可以在header里面添加key,value形式的tag让后续流程处理,或者按照某些规则将过滤event。
ChannelSelector目前有两种类型。一种是ReplicatingChannelSelector,消息发往所有的channel,另一种是MultiplexingChannelSelector,可以指定发往某些Channel
其中Channel也有两种类型:一种是required,一种是optional.这两者的区别是required类型如果写入失败会throw Exception。optional类型的写入失败不处理。
Channel中处理数据的最小单元是Transaction,一个Transaction由多个Event组成。Channel内部实现要保证一个Transaction要么全部成功,要么全部不成功。具体实现方法下面细说。
下面是这几个类之间的关系图。图中Channel只画了MemoryChannel,Interceptor的具体实现类没有画。
其中要注意的是:
1.ChannelProcessor的初始化是在初始化Source模块的时候。
2.每一个线程有自己的Transaction,这个具体实现是在BasicChannelSemantics类中利用TreadLocal机制。这么做的好处是维护transaction相对方便。
3. Transaction的维护方法:MemoryChannel的机制是每一个Transaction有自己的存储空间,分为putlist和takelist,MemoryChannel有一个公共的queue。
每次take时从queue中取出一个event并放置在本地的takelist内并返回该event,commit的时候清空takelist,rollback时将takelist中的event重新放回queue。
每次put时将event放置在putlist中,commit时将putlist中内容写入queue中,rollback时将putlist清空。
4.BasicChannelSemantics类主要负责的是维护(thread,transaction)这样的关系。而由于Transaction是接口,不能直接给BasicChannelSemantics进行操作。所以这里又封装了一个BasicTransactionSemantics类。这两个类实现相对简单,这里不细述。
上面介绍初始化的时候曾经说过source和sink封装在SourceRunner和SinkRunner中。Source和Sink这两部分主要各种对接外部系统的模块比较多,实际结构还是比较简单的
source:
source的主要功能是收集外部数据,并通过ChannelProcessor写入Channel。Source分为两种类型,PollableSource和EventDrivenSource,分别对应不同的SourceRunner。其中PollableSource是指那些需要定时轮询的Source,比如定时扫描文件,读取新增内容。其他Source都是EventDrivenSource。
1.注意BasicSourceSemantics类型维护了Source的生命周期。
2.PollableSource类型的source具体的功能函数封装在process()中,PollableSourceRunner负责定期调source.process()将数据写入Channel.
3.EventDrivenSource类型全部逻辑基本都在Source实现里面,通过start接口启动【start接口是LifecycleSupervisor负责调用的,上文有描述】。每一种source根据不同的外部系统有不同的处理逻辑,这里不赘述。
sink&sinkgroup:
sink/sinkgroup的主要作用是消费Channel中的数据,其基本思想是定期轮询Sink,从Channel中拉取一定量数据做处理。
一个Sink只能对应一个下游,但是一个下游不可靠。所以flume里面有一个机制将多个sink封装为一个SinkGroup,每次消费时通过SinkSelector选择不同Sink实现容错/负载均衡方案
如果SinkSelector需要对Sink做筛选,那么它需要知道每一个Sink的状态。每一个sink都要自己维护一个status变量,有两个值【READY, BACKOFF】,定义在interface Sink中,READY的意思是该sink可以从Channel中取数据,BACKOFF的意义是该sink目前不能从Channel中取数据。这个status就是给SinkSelector做决策使用的。
Sink的结构图如下:
SinkRunner的逻辑跟PollableSource一致。每隔一段时间调用SinkProcessor的process()函数
SinkProcessor里面封装了Sink和SinkSelector。如果是单独sink,直接调用这个sink的process(),如果是一组sink,通过SinkSelector选择出一个sink,再调用这个sink的process().
目前flume里面实现的SinkProcessor有三种,DefaultSinkProcessor/FailoverSinkProcessor/LoadBalancingSinkProcessor
1.DefaultSinkProcessor:提供给单独Sink使用。直接调用sink的process()函数
2.FailoverSinkProcessor:提供给SinkGroup使用。这个Processor给每个sink都设置了一个priority【通过配置文件】,其思想是每次都会【尽量】调用priority最大的sink。
这个Processor将sinks分为两组,一组是Queue类型的failedSinks队列,保存上一次sink.process()调用出错的sink,并且每一个调用出错的sink都有一个冷却时间,冷却时间之内的sink不会调用,另一组是SortedMap<Integer, Sink> liveSinks,保存正常的sink,其中Integer指该sink的priority
每次选择sink的流程如下:a.从failedSinks里面取出冷却最久的sink,调用process,如果成功则将该sink重新加入liveSinks,如果失败则刷新这个sink的冷却时间并将其重新push回failedSinks。然后试图在failedSinks寻找下一个sink
b.如果failedSinks为空或者failedSinks内的sink全部不可用,则调用liveSinks里面权重最大的sink,如果该sink调用失败则塞入failedSinks
3.LoadBalancingSinkProcessor:基本思想是将Channel中的数据平均分配到该Processor的所有sink里。
这个Processor下面有两个Selector,分别是RandomOrderSinkSelector和RoundRobinSinkSelector。前者提供的功能是返回一个随机排序的sink数组,后者则是返回一个轮询调度形式的数组,比如这一次返回【1,2,3】,下一次就是【2,3,1】,下下一次就是【3,1,2】。
LoadBalancingSinkProcessor调用用户配置的Seletor返回一个数组,然后依次调用这个数组中的sink.process(),直到某一次调用成功,则返回,如果全部失败会返回错误。
这个Processor有一个比较重要的配置【backoff】。这个配置决定了某个sink调用失败后的行为。如果backoff为true,则该sink失败后就会冷却一段时间。这段时间调用Selector返回的数组不会包含该sink。
Sink主要是实现从Channel众消费数据的逻辑,这里不一一描述。要注意的是SinkGroup中一个Sink如果卡住这个SinkGroup会卡住。
监控模块:
在启动流程章节里提到,在启动所有组件后会启动minotor监控进程。监控主要分为两部分,收集数据和上报数据。
收集数据的逻辑是夹杂在具体Channel,Sink,Source等的实现逻辑里面的。其中sink,source,channel等结构监控数据的数据结构在flume-ng-core文件夹下面的instrumententation里面。分别是SinkCounter,SourceCounter和ChannelCounter等。如果要自己实现这些组件,需要调用这几个类的接口上报监控信息。flume-ng-core下面还有一个CounterGroup,这个统计了一些Runner的事件。这个接口应该是老接口。
每一个Count都会在MBean里面注册。上报的数据存储在这个MBean对象里。
flume内置三种上报监控的方式,分别是zabbix,ganglia和http。其中http方式是在flume侧起一个server,由外部主动来拉取监控信息。
没看到flume有一段时间内的统计信息。所有统计信息都是统计整个组件生命周期的。
以上就是Flume源码剖析的全部内容。Flume本身提供了非常多的组件之外还有非常好的可扩展性。就整体架构而言还是非常清晰的。除了配置文件结构比较复杂,其他代码的可读性还是相当高的