Overview
source采集的日志首先会传入ChannelProcessor, 在其内首先会通过Interceptors进行过滤加工,然后通过ChannelSelector选择channel。
Source和Sink之间是异步的,sink只需要监听自己关系的Channel的变化即可。
-
sink存在写失败的情况,flume提供了如下策略:
默认是一个sink,若写入失败,则该事务失败,稍后重试。
-
故障转移策略:给多个sink定义优先级,失败时会路由到下一个优先级的sink。sink只要抛出一次异常就会被认为是失败了,则从存活Sink中移除,然后指数级时间等待重试,默认是等待1s开始重试,最大等待重试时间是30s.
flume还提供了负载均衡策略:默认提供轮训和随机两种算法。通过抽象一个类似ChannelSelector的SinkSelector进行选择。
以上,对于Source和sink如何异步、channel如何实现事务机制,详见后面的具体源码分析。
The whole process
首先是flume的启动, 提供了两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程。 一般使用Application起一个进程比较多,我们这里也主要分析这种方式。
程序入口:org.apache.flume.node.Application的main方法。
注:因为暂时还没有了解到Zookeeper原理,所以这里关于ZK的部分就跳过了。
-
flume启动流程大致如下:
-
设置默认值启动参数,参数是否是必须的
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);
option = new Option("f", "conf-file", true,
"specify a config file (required if -z missing)");
option.setRequired(false);
options.addOption(option);
...... -
解析命令行参数
if (commandLine.hasOption('h')) {
new HelpFormatter().printHelp("flume-ng agent", options, true);
return;
}
String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf"); // 是否reload配置文件
if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
isZkConfigured = true;
} Zookepper相关:暂时略
-
打开配置文件
if (isZkConfigured) {
... // 若配置了zk,则使用zk参数启动
} else {
// 打开配置文件,如果不存在则快速失败
File configurationFile = new File(commandLine.getOptionValue('f'));
// 确保没有配置文件的时候agent会启动失败
if (!configurationFile.exists()) {
...// If command line invocation, then need to fail fast
}
List<LifecycleAware> components = Lists.newArrayList();
// 若需要定期reload配置文件
if (reload) {
// 使用EventBus事件总线, to allow publish-subscribe-style communication
EventBus eventBus = new EventBus(agentName + "-event-bus");
// 读取配置文件,使用定期轮训拉起策略,默认30s拉取一次
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
// 向Application注册组件
application = new Application(components);
// 向EventBus注册本应用,EB会自动注册Application中使用@Subscribe声明的方法
// TODO: EventBus, and why reload configuration
eventBus.register(application);
} else {
// 若配置文件不支持定期reload
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName, configurationFile);
application = new Application();
// 直接使用配置文件初始化Flume组件
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
} reload conf:若需要reload,则使用事件总线EventBus实现,Application的handleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化。
-
handleConfigurationEvent:
@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
// MaterializedConfiguration存储Flume运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner等。其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。
stopAllComponents(); // 停止所有组件
startAllComponents(conf);// 使用配置文件初始化所有组件
} -
startAllComponents
要首先启动channels,等待所有channels启动才能继续。然后启动SinkRunner,准备好消费者。最后启动SourceRunner开始进行采集日志。
LifecycleSupervisor是组件守护哨兵,对这些组件进行守护,出问题时默认策略是自动重启。
这里的启动都是supervisor.supervise(entry.getValue(),new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); 这是如何启动的,我们后面再介绍。
private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
logger.info("Starting new configuration:{}", materializedConfiguration);
this.materializedConfiguration = materializedConfiguration;
// 启动channels。
for (Entry<String, Channel> entry :
materializedConfiguration.getChannels().entrySet()) {
try {
logger.info("Starting Channel " + entry.getKey());
// TODO: LifecycleSupervisor启动
// new SupervisorPolicy.AlwaysRestartPolicy():使用失败时总是重启的策略
// LifecycleState.START: 初始化组件默认状态为START
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e) {
logger.error("Error while starting {}", entry.getValue(), e);
}
}
/*
* Wait for all channels to start.
*/
for (Channel ch : materializedConfiguration.getChannels().values()) {
while (ch.getLifecycleState() != LifecycleState.START
&& !supervisor.isComponentInErrorState(ch)) {
try {
logger.info("Waiting for channel: " + ch.getName() +
" to start. Sleeping for 500 ms");
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error("Interrupted while waiting for channel to start.", e);
Throwables.propagate(e);
}
}
}
// 启动sinkRunner
for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
try {
logger.info("Starting Sink " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e) {
logger.error("Error while starting {}", entry.getValue(), e);
}
}
// 启动SourceRunner TODO: SourceRunner & SinkRunner
for (Entry<String, SourceRunner> entry :
materializedConfiguration.getSourceRunners().entrySet()) {
try {
logger.info("Starting Source " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e) {
logger.error("Error while starting {}", entry.getValue(), e);
}
}
this.loadMonitoring();
} -
之后main函数调用了
application.start();
/**
其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启
**/
public synchronized void start() {
// private final List<LifecycleAware> components;
for (LifecycleAware component : components) {
// private final LifecycleSupervisor supervisor;
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
}相应的stop函数。首先是main函数中:
final Application appReference = application;
// Runtinme.getRuntime(): Returns the runtime object associated with the current Java application.
/**
addShutdownHook: 注册一个新的虚拟机关闭钩子。
虚拟机shutdown有两种情况:1)当最后一个非守护进行户外那个退出或调用system.exit时,程序正常退出;2)JVM通过ctrl-c等被用户中断。
**/
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});
public synchronized void stop() {
// 关闭守护哨兵和监控服务。
supervisor.stop();
if (monitorServer != null) {
monitorServer.stop();
}
} 至此,Application整个流程就分析完了。
-
-
整体流程可以总结为:
首先初始化命令行配置;
接着读取配置文件;
根据是否需要reload初始化配置文件中的组件;如果需要reload会使用EventBus进行发布订阅变化;
接着创建Application,创建守护哨兵LifecycleSupervisor,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务MonitorService;停止顺序:SourceRunner、SinkRunner、Channel;
如果配置文件需要定期reload,则需要注册PollingPropertiesFileConfigurationProvider到守护哨兵;
最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。
LifecycleSupervisor
守护哨兵,负责监控和重启组件
-
My: 所有需要被监控和重启的组件都应implements LifecycleAware
public class LifecycleSupervisor implements LifecycleAware {
public LifecycleSupervisor() {
lifecycleState = LifecycleState.IDLE;
// 存放被监控的组件
supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
// 存放正在被监控的组件
monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
// 创建监控服务线程池
monitorService = new ScheduledThreadPoolExecutor(10,
new ThreadFactoryBuilder().setNameFormat(
"lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
.build());
monitorService.setMaximumPoolSize(20);
monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
// 定期清理被取消的组件
purger = new Purger();
// 默认不进行清理
needToPurge = false;
}
... // start() & stop()... // 进行组件守护
public synchronized void supervise(LifecycleAware lifecycleAware,
SupervisorPolicy policy, LifecycleState desiredState) {
if (this.monitorService.isShutdown()
|| this.monitorService.isTerminated()
|| this.monitorService.isTerminating()) {
...// 如果哨兵已停止则抛出异常
}
// 初始化守护组件
Supervisoree process = new Supervisoree();
process.status = new Status();
// 默认策略是失败重启
process.policy = policy;
process.status.desiredState = desiredState; // 初始化组件默认状态,一般为START
process.status.error = false;
// 组件监控器,用于定时获取组件的最新状态,或重启组件。后面会介绍MonitorRunnable具体做什么。
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
// 以固定时间间隔执行monitorRunnable线程
// scheduleWithFixedDelay: Creates and executes a periodic action. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor.
// 所以需要把所有异常捕获,才能保证定时任务继续执行。
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
} -
MonitorRunnable:负责进行组件状态迁移或组件故障恢复
public static class MonitorRunnable implements Runnable {
public ScheduledExecutorService monitorService;
public LifecycleAware lifecycleAware;
public Supervisoree supervisoree;
@Override
public void run() {
long now = System.currentTimeMillis();
try {
if (supervisoree.status.firstSeen == null) {
logger.debug("first time seeing {}", lifecycleAware);
supervisoree.status.firstSeen = now; // 记录第一次状态查看时间
}
supervisoree.status.lastSeen = now; // 记录最后一次状态查看时间
synchronized (lifecycleAware) {
// 如果守护组件被丢弃或出错了,则直接返回
if (supervisoree.status.discard) {
// 也就是此时已经调用了unsupervise
logger.info("Component has already been stopped {}", lifecycleAware);
return;
} else if (supervisoree.status.error) {
logger.info("Component {} is in error state, and Flume will not"
+ "attempt to change its state", lifecycleAware);
return;
}
// 更新最后一次查看到的状态
supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
// 如果组件的状态和守护组件看到的状态不一致,则以守护组件的状态为准,然后进行初始化
if (!lifecycleAware.getLifecycleState().equals(
supervisoree.status.desiredState)) {
switch (supervisoree.status.desiredState) {
// 如果是启动状态,则启动组件。# 最开始的时候组件应该就是这么启动的
case START:
try {
lifecycleAware.start();
} catch (Throwable e) {
logger.error("Unable to start " + lifecycleAware
+ " - Exception follows.", e);
if (e instanceof Error) {
// This component can never recover, shut it down.
supervisoree.status.desiredState = LifecycleState.STOP;
try {
lifecycleAware.stop();
logger.warn("Component {} stopped, since it could not be"
+ "successfully started due to missing dependencies",
lifecycleAware);
} catch (Throwable e1) {
logger.error("Unsuccessful attempt to "
+ "shutdown component: {} due to missing dependencies."
+ " Please shutdown the agent"
+ "or disable this component, or the agent will be"
+ "in an undefined state.", e1);
supervisoree.status.error = true;
if (e1 instanceof Error) {
throw (Error) e1;
}
// Set the state to stop, so that the conf poller can
// proceed.
}
}
supervisoree.status.failures++;
}
break;
case STOP:
try {
lifecycleAware.stop();
} catch (Throwable e) {
logger.error("Unable to stop " + lifecycleAware
+ " - Exception follows.", e);
if (e instanceof Error) {
throw (Error) e;
}
supervisoree.status.failures++;
}
break;
default:
logger.warn("I refuse to acknowledge {} as a desired state",
supervisoree.status.desiredState);
}
if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
logger.error(
"Policy {} of {} has been violated - supervisor should exit!",
supervisoree.policy, lifecycleAware);
}
}
}
} catch (Throwable t) {
logger.error("Unexpected error", t);
}
logger.debug("Status check complete");
}
}
Source
SourceRunner
-
首先是SourceRunner,它控制how a source is driven。
-
它是一个用来实例化derived classes(派生类)的抽象类。 根据指定的source,来通过其内的static factory method 来实例化runner。
// 根据指定source的类型来实例化一个source runner的静态工厂方法
// 输入是要运行的source,返回可以运行指定source的runner
public static SourceRunner forSource(Source source) {
SourceRunner runner = null;
if (source instanceof PollableSource) {
runner = new PollableSourceRunner();
((PollableSourceRunner) runner).setSource((PollableSource) source);
} else if (source instanceof EventDrivenSource) {
runner = new EventDrivenSourceRunner();
((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
} else {
throw new IllegalArgumentException("No known runner type for source "
+ source);
}
return runner;
}
EventDrivenSourceRunner
starts、stops and manages EventDrivenSource event-driven sources
-
其内有如下几个方法:
-
构造方法
public EventDrivenSourceRunner() {
lifecycleState = LifecycleState.IDLE;
} -
start()
@Override
public void start() {
Source source = getSource(); //获取Source
ChannelProcessor cp = source.getChannelProcessor(); //Channel处理器
cp.initialize(); //初始化Channel处理器
source.start(); //启动Source
lifecycleState = LifecycleState.START; //本组件状态改成启动状态
} stop()、toString()、getLifecycleState()
-
PollableSourceRunner
public class PollableSourceRunner extends SourceRunner {
@Override
public void start() {
PollableSource source = (PollableSource) getSource(); //获取Source
ChannelProcessor cp = source.getChannelProcessor(); //Channel处理器
cp.initialize(); //初始化channel处理器
source.start(); //启动source
runner = new PollingRunner(); //新建一个PollingRunner线程来拉取数据
runner.source = source;
runner.counterGroup = counterGroup;
runner.shouldStop = shouldStop;
runnerThread = new Thread(runner);
runnerThread.setName(getClass().getSimpleName() + "-" +
source.getClass().getSimpleName() + "-" + source.getName());
runnerThread.start();
lifecycleState = LifecycleState.START;
}
}
PollingRunner线程
@Override
public void run() {
while (!shouldStop.get()) { //如果没有停止,则一直在死循环运行
counterGroup.incrementAndGet("runner.polls"); //原子操作
try {
//调用PollableSource的process方法进行轮训拉取,然后判断是否遇到了失败补偿
if (source.process().equals(PollableSource.Status.BACKOFF)) {/
counterGroup.incrementAndGet("runner.backoffs");
//失败补偿时暂停线程处理,等待超时时间之后重试
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
}
}
}
}
}
TODO
Source
public interface Source extends LifecycleAware, NamedComponent {
public void setChannelProcessor(ChannelProcessor channelProcessor);
public ChannelProcessor getChannelProcessor();
}
-
继承了LifecycleAware接口,然后只提供了ChannelProcessor的setter和getter接口。其中:
它的的所有逻辑的实现应该在LifecycleAware接口的start和stop中实现;
ChannelProcessor用来进行日志流的过滤和Channel的选择及调度。
由上述的Runner我们知道,Source 提供了两种机制: PollableSource (轮训拉取)和 EventDrivenSource (事件驱动)
Source作用就是监听日志,采集,然后交给ChannelProcessor处理。
EventDrivenSource
事件驱动型source不需要外部driver来获取event,EventDriven是一个implement Source的空接口。
从这里开始~~~‘
Channel
通过 Channel 实现了 Source 和 Sink 的解耦,可以实现多对多的关联,和 Source 、 Sink 的异步化
Channel exposes a transaction interface that can be used by its clients to ensure automic put(Event) and take() semantics.
ChannelProcesoor
-
前面我们了解到Source采集日志后会交给ChannelProcessor处理,so接下来我们从ChannelProcessor入手,其依赖如下组件:
private final ChannelSelector selector; //Channel选择器,.flume.ChannelSelector
private final InterceptorChain interceptorChain; //拦截器链,.flume.interceptor.InterceptorChain
private ExecutorService execService; //用于实现可选Channel的ExecutorService,默认是单线程实现 [注:这个我在某个博客上看到的,但这个组件我在ChannelProcessor中没有搜到] -
我们来看ChannelProcessor是如何处理Event的:
// Attempts to put the given event into each configured channel
public void processEvent(Event event) {
event = interceptorChain.intercept(event); //首先进行拦截器链过滤,TODO:intercep...
// InterceptorChain实现了Interceptor接口,调用a list of other Interceptors. 实现event的过滤和加工。具体见后面
if (event == null) {
return;
}
// Process required channels
//通过Channel选择器获取必须成功处理的Channel,然后事务中执行.
List<Channel> requiredChannels = selector.getRequiredChannels(event);
for (Channel reqChannel : requiredChannels) {
Transaction tx = reqChannel.getTransaction(); // 继承自Channel接口的类要实现getTransaction()方法,TODO:getTransaction
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin(); //开始事务
reqChannel.put(event); // 将event放到reqChannel
tx.commit(); //提交事务
} catch (Throwable t) {
tx.rollback(); // 如果捕捉到throwable(including Error & Exception),则回滚事务
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put event on required " +
"channel: " + reqChannel, t); //TODO: Channelexception可能会被handle,不然如何保证RequiredChannel的成功处理?
}
} finally {
if (tx != null) {
tx.close(); // 最后如果事务非空,还得关闭该事务
}
}
}
// Process optional channels
//通过Channel选择器获取可选的Channel,这些Channel失败是可以忽略,不影响其他Channel的处理
List<Channel> optionalChannels = selector.getOptionalChannels(event);
for (Channel optChannel : optionalChannels) {
Transaction tx = null;
try {
tx = optChannel.getTransaction();
tx.begin();
optChannel.put(event);
tx.commit();
} catch (Throwable t) {
tx.rollback();
LOG.error("Unable to put event on optional channel: " + optChannel, t);
if (t instanceof Error) {
throw (Error) t;
}
} finally {
if (tx != null) {
tx.close();
}
}
}
} -
看下flume内实现的channel类
Channel接口
public interface Channel extends LifecycleAware, NamedComponent {
// put() and get() must be invoked within an active Transaction boundary
public void put(Event event) throws ChannelException;
public Event take() throws ChannelException;
// @return: the transaction instance associated with this channel
public Transaction getTransaction();
}
AbstractChannel
abstract class AbstractChannel implements Channel, LifecycleAware, Configurable
实现了lifecycleStatus的改变(在构造、start()和stop()方法中),实现了空configure()方法。没有做什么具体的channel相关的处理。
BasicChannelSemantics
基本Channel语义的实现,包括Transaction类的thread-local语义的实现。
public abstract class BasicChannelSemantics extends AbstractChannel {
// 1. 事务使用ThreadLocal存储,保证事务线程安全
private ThreadLocal<BasicTransactionSemantics> currentTransaction
= new ThreadLocal<BasicTransactionSemantics>();
private boolean initialized = false;
protected void initialize() {} // 2. 进行一些初始化工作
// 3.提供给实现类(子类)的创建事务的回调
// 用于new Transaction对象,该对象必须继承自BasicTransactionSemantics
// 比如MemoryChannel覆盖了该方法,方法体内new了一个实例,该实例为其内私有类MemoryTransaction,该私有类继承了BasicTransactionSemantics。
// MemoryTransaction内部用两条双向并发阻塞队列LinkedBlockingDeque实现putList和takeList。具体的稍后看,会介绍到MemoryChannel TODO
protected abstract BasicTransactionSemantics createTransaction();
// 4. 往Channel中放Event,其直接委托给事务的put方法
// 确保该thread存在一个事务,然后将put方法委托给该线程的BasicTransactionSemantics实例
@Override
public void put(Event event) throws ChannelException {
// ThreadLocal<BasicTransactionSemantics>的实例currentTransaction
// 即取得当前线程的事务实例
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
transaction.put(event);
}
// 5.从Channel获取Event,也是直接委托给事务的take方法实现
@Override
public Event take() throws ChannelException {
BasicTransactionSemantics transaction = currentTransaction.get();
Preconditions.checkState(transaction != null,
"No transaction exists for this thread");
return transaction.take();
}
@Override
public Transaction getTransaction() {
// 1. 如果channel is not ready, then 初始化该channel
if (!initialized) {
synchronized (this) {
if (!initialized) {
initialize();
initialized = true;
}
}
}
// 2. 如果当前线程没有open的事务(无事务或已关闭),则创建一个,并绑定到currentTransaction中
BasicTransactionSemantics transaction = currentTransaction.get();
if (transaction == null || transaction.getState().equals(
BasicTransactionSemantics.State.CLOSED)) {
transaction = createTransaction();
currentTransaction.set(transaction);
}
return transaction;
}
}
MemoryChannel
当写入硬盘不实际或不需要数据持久化时,推荐使用。或在单元测试时使用。
大部分channel会把put和take委托给事务去完成。
纯内存的Channel实现,整个事务操作都是在内存中完成的。
-
每个事务都有一个TakeList和PutList,分别用于存储事务相关的取数据和放数据,等事务提交时才完全同步到Channel Queue,或者失败把取数据回滚到Channel Queue。 TODO:整体理解何时commit、rollback。
public class MemoryChannel extends BasicChannelSemantics {
// TODO: about factory
private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
...//一些常量定义:缺省值defaultCapacity、defaultTransCapacity、byteCapacitySlotSize..
// 内部类,继承自BasicTransactionSemantics。TODO: About BasicTransactionSemantics
private class MemoryTransaction extends BasicTransactionSemantics {
// 每个事务都有两条双向并发阻塞队列,TODO: LinkedBlockingDeque
private LinkedBlockingDeque<Event> takeList;
private LinkedBlockingDeque<Event> putList;
private final ChannelCounter channelCounter;
...//
public MemoryTransaction(int transCapacity, ChannelCounter counter) {
putList = new LinkedBlockingDeque<Event>(transCapacity);
takeList = new LinkedBlockingDeque<Event>(transCapacity);
channelCounter = counter;
}
// 将event放到putList中
// 整个doPut操作相对来说比较简单,就是往事务putList队列放入Event,如果满了则直接抛异常回滚事务;否则放入putList暂存,等事务提交时转移到Channel Queue。另外需要增加放入队列的字节数计数器,以便之后做字节容量限制
@Override
protected void doPut(Event event) throws InterruptedException {
// channelCounter是一个计数器,记录当前队列放入Event数、取出event数、成功数等。
channelCounter.incrementEventPutAttemptCount(); // 增加放入event计数器
// estimateEventSize计算当前Event body大小,ceil():向上取整
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
// 往事务队列的putList中放入Event,如果满了,则抛异常回滚事务
if (!putList.offer(event)) {
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
putByteCounter += eventByteSize; // 增加放入队列字节数计数器
}
// 从Channel Queue中取event放到takeList中
@Override
protected Event doTake() throws InterruptedException {
channelCounter.incrementEventTakeAttemptCount();
// 如果takeList队列没有剩余容量,即当前事务已经消费了最大容量的Event
if (takeList.remainingCapacity() == 0) {
throw new ChannelException("Take list for MemoryTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count");
}
// queueStored试图获取一个信号量,超时直接返回null
if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
return null;
}
Event event;
// 从Channel Queue获取一个Event, 对Channel Queue的操作必须加queueLock
synchronized (queueLock) {
event = queue.poll();
}
// 因为信号量的保证,Channel Queue不应该返回null,出现了就不正常了
Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
"signalling existence of entry");
// 暂存到事务的takeList队列
takeList.put(event);
// 计算当前Event body大小并增加取出队列字节数计数器
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
takeByteCounter += eventByteSize;
return event;
}
// 等事务提交时,才将当前事务的put list同步到Channel Queue
@Override
protected void doCommit() throws InterruptedException {
// /1、计算改变的Event数量,即取出数量-放入数
int remainingChange = takeList.size() - putList.size();
if (remainingChange < 0) {
// bytesRemaining是字节容量信号量,超出容量则回滚事务
if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
throw new ChannelException("Cannot commit transaction. Byte capacity " +
"allocated to store event body " + byteCapacity * byteCapacitySlotSize +
"reached. Please increase heap space/byte capacity allocated to " +
"the channel as the sinks may not be keeping up with the sources");
}
if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
bytesRemaining.release(putByteCounter);
throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
" Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
}
int puts = putList.size();
int takes = takeList.size();
synchronized (queueLock) {
if (puts > 0) {
while (!putList.isEmpty()) {
if (!queue.offer(putList.removeFirst())) { // offer:添加一个元素并返回true
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
putList.clear();
takeList.clear();
}
bytesRemaining.release(takeByteCounter);
takeByteCounter = 0;
putByteCounter = 0;
queueStored.release(puts);
if (remainingChange > 0) {
queueRemaining.release(remainingChange);
}
if (puts > 0) {
channelCounter.addToEventPutSuccessCount(puts);
}
if (takes > 0) {
channelCounter.addToEventTakeSuccessCount(takes);
}
channelCounter.setChannelSize(queue.size());
}
// 事务失败时,将take list数据回滚到Channel Queue
// 在回滚时,需要把takeList中暂存的事件回滚到Channel Queue,并回滚queueStored信号量。
@Override
protected void doRollback() {
int takes = takeList.size();
synchronized (queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
"Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
while (!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
putList.clear();
}
bytesRemaining.release(putByteCounter);
putByteCounter = 0;
takeByteCounter = 0;
queueStored.release(takes);
channelCounter.setChannelSize(queue.size());
}
}
private Object queueLock = new Object();
// 在操作Channel Queue时都需要锁定,因为Channel Queue可能动态扩容(会被重新new)。用法就是synchronized(queueLock){...操作queue}
@GuardedBy(value = "queueLock") // 用@GuardedBy注解告诉维护者这个变量被哪个锁保护着
private LinkedBlockingDeque<Event> queue; // 由一个Channel Queue存储整个Channel的Event数据
// Semaphore可控制某资源可被同时访问的个数,acquire()获取一个许可,若无等待,而release()释放一个许可
// queueRemaining表示可存储事件容量。在提交事务时增加或减少该信号量
// 1. 首先在configure()函数中初始化为一个capacity大小的信号量
// 2. 在resize的时候,如果要缩容则要看是否还能acquire到oldCapacity - capacity个许可,不能则不允许缩容(很合理啊,不然就丢失数据了)。若是扩容,则queueRemaining.release(capacity - oldCapacity)
// 3. 提交事务时,如果takeList.size() < putList.size(),则要检查是否有足够的queueRemaining
private Semaphore queueRemaining;
// 表示ChannelQueue已存储事件容量
// 2. 在configure()中初始化为一个大小为0的信号量
// 3. 在doTake()时tryAcquire是否有许可
// 4. 在commit()时release(puts)增加puts个许可
// 5. 在rollback()时release(takes)个许可
private Semaphore queueStored;
// maximum items in a transaction queue
private volatile Integer transCapacity;
private volatile int keepAlive;
private volatile int byteCapacity;
private volatile int lastByteCapacity;
private volatile int byteCapacityBufferPercentage;
private Semaphore bytesRemaining;
private ChannelCounter channelCounter;
public MemoryChannel() {
super();
}
@Override
public void configure(Context context) {
// Read parameters from context
// capacity、transactionCapacity、byteCapacity、byteCapacityBufferPercentage...
}
// 因为多个事务要操作ChannelQueue,还要考虑ChannelQueue的扩容问题,因此MemoryChannel使用了锁来实现;而容量问题则使用了信号量来实现。
// 改变queue的容量,是通过新建一个LinkedBlockingDeque来实现的,并将原queue的东西加进来。
private void resizeQueue(int capacity) throws InterruptedException {
int oldCapacity;
// 计算原queue的capacity,注意该方法需加锁
synchronized (queueLock) {
oldCapacity = queue.size() + queue.remainingCapacity();
}
if (oldCapacity == capacity) {
return;
} else if (oldCapacity > capacity) {
// tryAcquire():从该信号量中获取指定数量的许可
//首先要预占老容量-新容量的大小,以便缩容容量。如果获取失败,默认是记录日志,然后忽略
if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {
LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted");
} else {
//否则,直接缩容,然后复制老Queue的数据,缩容时需要锁定queueLock,这一系列操作要线程安全
synchronized (queueLock) {
LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
newQueue.addAll(queue);
queue = newQueue;
}
}
}
Interceptor
-
flume内部实现了很多自定义的Interceptor,如下图:
同时还实现了InterceptorChain用来链式处理event。
InterceptorChain
Implementation of Interceptor that calls a list of other Interceptors
Interptor接口: 用于过滤、加工Event,然后返回一个新的Event。
-
相比之下,InterceptorChain就是对event逐个(链式)调用其内的Interceptor(接口子类)实例的各个方法。
public class InterceptorChain implements Interceptor {
// list of interceptors that will be traversed, in order
private List<Interceptor> interceptors;
public InterceptorChain() {
interceptors = Lists.newLinkedList(); // 构造方法,type LinkedList
}
public void setInterceptors(List<Interceptor> interceptors) {
this.interceptors = interceptors; // set方法
}
// Interceptor接口的intercept方法: Interception of a single Event.事件拦截
// @return: Original or modified event, or null if the Event is to be dropped.
@Override
public Event intercept(Event event) {
for (Interceptor interceptor : interceptors) {
if (event == null) {
return null;
}
event = interceptor.intercept(event); // 注意:该类的实例会调用上面的set方法初始化intercptors,其中的intercptor是Interceptor接口子类的实例。所以这里的intercept()方法调用的是Interceptor的某个接口所覆盖的方法。[Interceptor有很多子类,下面有一个demo子类的分析,可以往下看HostInterceptor]
}
return event;
}
// Interceptor接口: Interception of a batch of events
// @return: Output list of events
@Override
public List<Event> intercept(List<Event> events) {
... // 基本同上面的方法,不过调用的是interceptor.intercept(events);
}
// Interceptor: Any initialization / startup needed by the Interceptor.
@Override
public void initialize() {
Iterator<Interceptor> iter = interceptors.iterator();
while (iter.hasNext()) {
Interceptor interceptor = iter.next();
interceptor.initialize(); // 挨个对linkedlist中的interceptor实例进行initialize
}
}
@Override
public void close() {
...// 挨个对linkedlist中的interceptor实例进行close
}
HostInterceptor
implements Interceptor
-
功能:在所有拦截的events的header中上加上本机的host name或IP
public class HostInterceptor implements Interceptor {
... // 一些private变量
/**
* Only {@link HostInterceptor.Builder} can build me
*/
// private的构造方法,so只能通过下面的静态方法Builder实例化
private HostInterceptor(boolean preserveExisting,
boolean useIP, String header) {
// 用xx.conf内的值初始化这些变量
this.preserveExisting = preserveExisting;
this.header = header;
InetAddress addr;
try {
addr = InetAddress.getLocalHost(); //Returns the address of the local host.
if (useIP) {
//Returns the IP address string in textual presentation
host = addr.getHostAddress();
} else {
// Gets the fully qualified domain name for this IP address.
host = addr.getCanonicalHostName();
}
} catch (UnknownHostException e) {
logger.warn("Could not get local host address. Exception follows.", e);
}
}
@Override
public void initialize() {
// no-op
}
/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
// 如果要要保存当前的'host‘值并且当前已有头部,那么就不处理直接返回。
if (preserveExisting && headers.containsKey(header)) {
return event;
}
if (host != null) {
headers.put(header, host); //将host添加到头部
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
... // 为events中的每一个event调用intercept(Event event)
}
@Override
public void close() {
// no-op
}
/**
* Builder which builds new instances of the HostInterceptor.
*/
public static class Builder implements Interceptor.Builder {
private boolean preserveExisting = PRESERVE_DFLT;
private boolean useIP = USE_IP_DFLT;
private String header = HOST;
@Override
public Interceptor build() {
return new HostInterceptor(preserveExisting, useIP, header);
}
@Override
public void configure(Context context) {
preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
useIP = context.getBoolean(USE_IP, USE_IP_DFLT);
header = context.getString(HOST_HEADER, HOST);
}
}
public static class Constants {
public static String HOST = "host";
... // 一些配置的缺省值
}
} -
demo Usage in xx.conf: more details see User Guide
agent.sources.r1.interceptors = i1
agent.sources.r1.interceptors.i1.type = host
# preserveExisting: 是否保存当前已存在的'host'值,缺省是不保存
agent.sources.r1.interceptors.i1.preserveExisting = true
agent.sources.r1.interceptors.i1.useIP = false
agent.sources.r1.interceptors.i1.hostHeader = hostname
Selector
-
先上一张所有selector的继承关系图
可见ChannelSelector默认提供了两种实现:复制和多路复用。默认实现是ReplicatingChannelSelector。
ChannelSelector
interface
-
基于不同实现政策,允许在channels的集合中选取channels子集。
// NamedComponent接口:用于给component附加一个名字,包括setName()和getName()方法
public interface ChannelSelector extends NamedComponent, Configurable {
// @param channels:all channels the selector could select from.
public void setChannels(List<Channel> channels);
/**
* Returns a list of required channels. 这些channels的写入失败会传达回接收事件的source.
* @param: event
* @return: the list of required channels that this selector has selected for
* the given event.
*/
public List<Channel> getRequiredChannels(Event event);
/**
* Returns a list of optional channels. 这些channels的写入失败会被忽略。
* @param: event
* @return: the list of optional channels that this selector has selected for
* the given event.
*/
public List<Channel> getOptionalChannels(Event event);
/**
* @return the list of all channels that this selector is configured to work
* with.
*/
public List<Channel> getAllChannels();
}
## AbstractChannelSelector
* abstract class
```java
public abstract class AbstractChannelSelector implements ChannelSelector {
private List<Channel> channels;
private String name; ...// override ChannelSelctor的getAllChannels()、setChannels(List<Channel> channels)、setName(String name)、getName()方法。 //@return: A map of name to channel instance.
protected Map<String, Channel> getChannelNameMap() {
Map<String, Channel> channelNameMap = new HashMap<String, Channel>();
for (Channel ch : getAllChannels()) {
// 对每一个Channel, 将Channel和其名字放到HashMap中
channelNameMap.put(ch.getName(), ch);
}
return channelNameMap;
}
/**
* Given a list of channel names as space delimited string,
* returns list of channels.
* @return List of {@linkplain Channel}s represented by the names.
*/
// 根据(space分隔的channel名字的)字符串, 返回相应的channel,利用名字-channel的HashMap
protected List<Channel> getChannelListFromNames(String channels,
Map<String, Channel> channelNameMap) {
List<Channel> configuredChannels = new ArrayList<Channel>();
if (channels == null || channels.isEmpty()) { // 判空
return configuredChannels;
}
String[] chNames = channels.split(" ");
for (String name : chNames) {
Channel ch = channelNameMap.get(name);
if (ch != null) {
configuredChannels.add(ch);
} else {
throw new FlumeException("Selector channel not found: "
+ name);
}
}
return configuredChannels;
}
}
ReplicatingChannelSelector
ChannelSelector的一个具体实现,即把接收到的消息复制到每一个Channel。【与之对应的,MultiplexingChannelSelector会根据 Event Header 中的参数进行选择,以此来选择使用哪个 Channel】
Replicating channel selector. 允许event被放置到source所配置的所有channels中。
-
实际的实现方式是,默认将所有channel加入requiredChannels中,optionalChannels为空。然后根据配置的"optional"将该配置对应的channel加入optionalChannels,并从requiredChannels中移除(添加和移除是在configure方法中实现的)。 TODO:看一下这个配置如何实现
public class ReplicatingChannelSelector extends AbstractChannelSelector {
// Configuration to set a subset of the channels as optional.
public static final String CONFIG_OPTIONAL = "optional";
List<Channel> requiredChannels = null; // 在configure()中被设置为getAllChannels()的返回值,即所有配置的channels
List<Channel> optionalChannels = new ArrayList<Channel>();
@Override
public List<Channel> getRequiredChannels(Event event) {
/*
* Seems like there are lot of components within flume that do not call
* configure method. It is conceiveable that custom component tests too
* do that. So in that case, revert to old behavior.
*/
// 如果component没有调用configure(),那么requiredChannels为null,此时再调用一次。
// TODO: configure()方法是在哪里调用的? 同样的问题在很多class中都存在
if (requiredChannels == null) {
return getAllChannels();
}
return requiredChannels;
}
@Override
public List<Channel> getOptionalChannels(Event event) {
return optionalChannels;
}
@Override
public void configure(Context context) {
String optionalList = context.getString(CONFIG_OPTIONAL);
requiredChannels = new ArrayList<Channel>(getAllChannels());
Map<String, Channel> channelNameMap = getChannelNameMap();
// 根据OptionList(String, 是空格分隔的channel名字),得到相应的Channel,并将channel放到optionalChannel&& 从requiredChannels中移除。
if (optionalList != null && !optionalList.isEmpty()) {
for (String optional : optionalList.split("\\s+")) {
Channel optionalChannel = channelNameMap.get(optional);
requiredChannels.remove(optionalChannel);
if (!optionalChannels.contains(optionalChannel)) {
optionalChannels.add(optionalChannel);
}
}
}
}
}
Sink
Sink Runner
-
A driver for sinks that polls them, attempting to process events if any are available in the Channel. All sinks are polled.
public class SinkRunner implements LifecycleAware {
private PollingRunner runner; // 内部类,实现了Runnable接口
private SinkProcessor policy; //
}
Sink Processor
-
分为两类:
-
DefaultSinkProcessor处理单sink,直接传送不附加任何处理。
public void start() {
Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
sink.start(); // start()方法直接启动single sink
lifecycleState = LifecycleState.START;
}
// stop()方法类似,configure()方法为空
public Status process() throws EventDeliveryException {
return sink.process(); // 直接调用sink的process()
}
public void setSinks(List<Sink> sinks) {
Preconditions.checkNotNull(sinks);
Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
+ "only handle one sink, "
+ "try using a policy that supports multiple sinks");
sink = sinks.get(0);
} 多sink处理(AbstractSinkProcessor),其中又包括两种:
-
FailoverSinkProcessor:故障切换—>通过维持一个sinks的优先级list —> 把故障sinks降级放到一个pool中被赋予一个冷冻周期。必须先调用setSinks()再configure()
public void setSinks(List<Sink> sinks) {
// needed to implement the start/stop functionality
super.setSinks(sinks);
this.sinks = new HashMap<String, Sink>();
for (Sink sink : sinks) {
this.sinks.put(sink.getName(), sink);
}
}
private Sink moveActiveToDeadAndGetNext() {
Integer key = liveSinks.lastKey();
failedSinks.add(new FailedSink(key, activeSink, 1)); // 把当前liveSinks里的第一优先级key移除到failedSinks中
liveSinks.remove(key);
if (liveSinks.isEmpty()) return null;
if (liveSinks.lastKey() != null) {
return liveSinks.get(liveSinks.lastKey());
} else {
return null;
}
}
... -
LoadBalancingSinkProcessor: 提供在多个sinks之间负载均衡的能力—> 维持一个active sinks的索引序列(load需分布在这些sinks上) —> 算法包括ROUND_ROBIN(default)和RANDOM选择机制。
内部通过一个
private interface SinkSelector
实现。该接口下实现了两个私有静态类RoundRobinSinkSelector
和RandomOrderSinkSelector
.
-