一、flume简单了解推荐网站(简介包括简单案例部署): http://www.aboutyun.com/thread-8917-1-1.html
二、我的需求是实现从ftp目录下采集数据,目录下文件名称要符合特定正则,要求文件要一行一行读取并解析后写入数据库。且实现断点续传(服务重启后会从上次读的位置继续)。
flume1.7.0中taildirSource实现的是监控目录下文件并且一行一行的读取,我只需选用这个source就可以实现。但是服务并不能直接部署在数据所在的服务器上,所以涉及到ftp的问题。这样就需要重写taildirSource,由于本人能力有限,并没有在源码的基础上修改。使用的别的实现方式:用taildirSource配置,但是使用流下载的方式读取数据。
三、直接走步骤,不足之处请指出,目前服务已经部署在使用,有改进优化之处欢迎留言(服务器1,服务器2)
1.下载:从flume官网下载flume1.7.0 http://flume.apache.org/download.html
2.安装:上传至服务器1目录/home/hadoop下,解压到apache-flume-1.7.0-bin(安装路径/home/hadoop自行定义)
3.配置jdk:修改apache-flume-1.7.0-bin/conf/flume-env.sh.template中jdk的路径:eg: export JAVA_HOME=/usr/java/jdk1.8.0_92
4.配置flume环境,在/etc/profile中增加:
export FLUME_HOME=/安装路径/apache-flume-1.7.0-bin
export PATH=$PATH:$FLUME_HOME/bin
5.重启文件/etc/profile,使用命令source /etc/profile
6.执行命令flume-ng version ,出现版本号说明安装成功
-------以上步骤完成flume的安装,下面开始自定义source------
7.写flume配置文件,在apache-flume-1.7.0-bin/conf/下新建taildirsource.conf
agent1.sources = tail_dir_source
agent1.sinks = taildirsink
agent1.channels = channel1
# source: tail_dir_source -----------------
agent1.sources.tail_dir_source.type = com.esa.source.ftp.FtpSource3 //自定义类
agent1.sources.tail_dir_source.channels = channel1
agent1.sources.tail_dir_source.positionFile =/opt/flume/taildir_position.json //json文件位置
agent1.sources.tail_dir_source.filegroups =f1 //监控文件,配置多个以空格隔开(我是自己写的路径,这个属性没有用到)
agent1.sources.tail_dir_source.filegroups.f1 =/Desktop/studyflume3/.* //监控文件路径和正则(我是自己写的路径,这个属性没有用到)
agent1.sources.tail_dir_source.filePath =/Desktop/studyflume3 //监控ftp的目录 (自己加的)
agent1.sources.tail_dir_source.regex =.* //文件夹正则(自己加的)
agent1.sources.tail_dir_source.batchSize = 100
agent1.sources.tail_dir_source.backoffSleepIncrement = 1000
agent1.sources.tail_dir_source.maxBackoffSleep = 5000
agent1.sources.tail_dir_source.recursiveDirectorySearch = true
agent1.sources.tail_dir_source.yarnApplicationHeader = true
agent1.sources.tail_dir_source.yarnContainerHeader = true
agent1.sources.tail_dir_source.ftpip = 192.160.111.211 //ftp地址
agent1.sources.tail_dir_source.ftpport = 21 //ftp端口
agent1.sources.tail_dir_source.username = root //用户名
agent1.sources.tail_dir_source.password = root //密码
# Describe taildirsink
agent1.sinks.taildirsink.type =com.esa.sink.db.OracleSink //数据库驱动
agent1.sinks.taildirsink.url=url //数据库url
agent1.sinks.taildirsink.user=root //数据库用户名
agent1.sinks.taildirsink.password=root //数据库密码
agent1.sinks.taildirsink.regex=.* //数据解析规则
agent1.sinks.taildirsink.sql=insert into table values(?,?,?) //入库sql
agent1.sinks.taildirsink.channel = channel1
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100
8.新建maven工程,自定义taildirsource类FtpSource3继承AbstractSource,实现PollableSource, Configurable
public class FtpSource3 extends AbstractSource implements PollableSource, Configurable {
private static final Logger logger = LoggerFactory.getLogger(FtpSource3.class);
private String filePath;// 文件目录
private String regex;// 匹配的正则
private Table<String, String, String> headerTable;
private int batchSize;
private static String positionFilePath;// 校验文件目录
private boolean skipToEnd;
private boolean byteOffsetHeader;
private SourceCounter sourceCounter;
private FtpReliableTaildirEventReader reader;
private ScheduledExecutorService idleFileChecker;
private ScheduledExecutorService positionWriter;
private int retryInterval = 1000;
private int maxRetryInterval = 5000;
private int idleTimeout;
private int checkIdleInterval = 5000;
private int writePosInitDelay = 5000;
private int writePosInterval;
private boolean cachePatternMatching;
private Long backoffSleepIncrement;
private Long maxBackOffSleepInterval;
private boolean fileHeader;
private String fileHeaderKey;
private FTPClient ftp;// ftp对象
private String ftpip;// 需要连接到的ftp端的ip
private int ftpport;// 连接端口,默认21
private String username;// 要连接到的ftp端的名字
private String password;// 要连接到的ftp端的对应得密码
private Map<String, Long> map;
//获取配置文件中的配置
@Override
public void configure(Context context) {
String fileGroups = context.getString(TaildirSourceConfigurationConstants.FILE_GROUPS);
Preconditions.checkState(fileGroups != null,
"Missing param: " + TaildirSourceConfigurationConstants.FILE_GROUPS);
filePath = context.getString("filePath");// 文件目录
regex = context.getString("regex");
Preconditions.checkState(!filePath.isEmpty(), "Mapping for tailing files is empty or invalid: '"
+ TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX + "'");
positionFilePath = context.getString("positionFile");// 写绝对路径
headerTable = getTable(context, TaildirSourceConfigurationConstants.HEADERS_PREFIX);
batchSize = context.getInteger(TaildirSourceConfigurationConstants.BATCH_SIZE,
TaildirSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
skipToEnd = context.getBoolean(TaildirSourceConfigurationConstants.SKIP_TO_END,
TaildirSourceConfigurationConstants.DEFAULT_SKIP_TO_END);
byteOffsetHeader = context.getBoolean(TaildirSourceConfigurationConstants.BYTE_OFFSET_HEADER,
TaildirSourceConfigurationConstants.DEFAULT_BYTE_OFFSET_HEADER);
idleTimeout = context.getInteger(TaildirSourceConfigurationConstants.IDLE_TIMEOUT,
TaildirSourceConfigurationConstants.DEFAULT_IDLE_TIMEOUT);
writePosInterval = context.getInteger(TaildirSourceConfigurationConstants.WRITE_POS_INTERVAL,
TaildirSourceConfigurationConstants.DEFAULT_WRITE_POS_INTERVAL);
cachePatternMatching = context.getBoolean(TaildirSourceConfigurationConstants.CACHE_PATTERN_MATCHING,
TaildirSourceConfigurationConstants.DEFAULT_CACHE_PATTERN_MATCHING);
backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,
PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
fileHeader = context.getBoolean(TaildirSourceConfigurationConstants.FILENAME_HEADER,
TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER);
fileHeaderKey = context.getString(TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY,
TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY);
if (sourceCounter == null) {
sourceCounter = new SourceCounter(getName());
}
ftpip = context.getString("ftpip");
logger.info(ftpip + "---ftpip---");
Preconditions.checkNotNull(ftpip, "ftpip must be set!!");
ftpport = context.getInteger("ftpport");
logger.info(ftpport + "---ftpport---");
Preconditions.checkNotNull(ftpport, "ftpport must be set!!");
username = context.getString("username");
logger.info(username + "---username---");
Preconditions.checkNotNull(username, "username must be set!!");
password = context.getString("password");
logger.info(password + "---password---");
}
//服务开始时执行一遍
@Override
public void start() {
logger.info("{} TaildirSource source starting with directory: {}", getName(), filePath);
logger.info("filePath==" + filePath);
logger.info("positionFilePath==" + positionFilePath);
// 先连接ftp
ftp = new FTPClient();
// 验证登录
try {
ftp.connect(ftpip, ftpport);
System.out.println(ftp.login(username, password));
ftp.setCharset(Charset.forName("UTF-8"));
ftp.setControlEncoding("UTF-8");
logger.info("fileHeaderKey==" + fileHeaderKey);
logger.info("ftp==" + ftp);
ftp.enterLocalActiveMode();//ftp客户端一定要切换主动模式,不然每次都会请求一个端口,ftp服务器端要设置成被动模式
logger.info("转换路径是否成功" + ftp.changeWorkingDirectory("/"));
} catch (IOException e) {
e.printStackTrace();
}
// 创建json文件(用dom4j写的xml格式的文件,每个节点记录一个文件及位置)
try {
create();
} catch (Exception e1) {
e1.printStackTrace();
logger.error("----创建文件失败----");
}
super.start();
logger.debug("TaildirSource started");
sourceCounter.start();
}
@Override
public Status process() throws EventDeliveryException {
// 从ftp下载文件到本地
Status status = Status.READY;
logger.info("--进入主程序--");
logger.info("--filePath--" + filePath);
if (ftp.isConnected()) {
logger.info("--ftp--" + ftp);
try {
FTPFile[] ftpFiles = null;
// ftp.enterLocalPassiveMode();
ftp.enterLocalActiveMode();
boolean a = ftp.changeWorkingDirectory(filePath);
logger.info("转换路径是否成功" + a);
if (a) {
logger.info("---开始ftp.listFiles()---");
ftpFiles = ftp.listFiles(filePath);
logger.info("目录下文件个数==" + ftpFiles.length);
//我这里规定好了文件结构,所以直接找结构里的文件。
for (int i = 0; i < ftpFiles.length; i++) {//ANH
if(ftpFiles[i].isDirectory()){
System.out.println("---isdirectory---" + ftpFiles[i]);
try {
System.out.println("---开始ftp.listFiles()---" + filePath + "/" + ftpFiles[i].getName());
FTPFile[] files = ftp.listFiles(filePath + "/" + ftpFiles[i].getName());
System.out.println("目录下文件个数---" + files.length);
for (int j = 0; j < files.length; j++) {//SESSIONLOG
if(files[j].getName().equals("SESSIONLOG")){
System.out.println("---isdirectory---" + "SESSIONLOG");
System.out.println(ftp.changeWorkingDirectory(filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG"));
FTPFile[] files1 = ftp.listFiles(filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG");
for(FTPFile sFile : files1){
System.out.println(filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG" + "下面的文件" + sFile);
downloderFile(sFile, filePath + "/" + ftpFiles[i].getName() + "/SESSIONLOG");
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
} else {
logger.error("连接的fpt上没有制定下载路径,没法下载数据!");
}
} catch (Exception e) {
logger.error("下载失败!");
logger.error("详情", e);
status = Status.BACKOFF;
}
}
return status;
}
//下载文件
private void downloderFile(FTPFile ftpFile, String ftpFileDir) {
// 筛选文件,下载那些文件
// if (ftpFile.isDirectory()) {
// logger.info("----isDirectory----");
// try {
// FTPFile[] files = ftp.listFiles(ftpFileDir + "/" + ftpFile.getName());
// logger.info("---递归找文件---" + files.length);
// for (int i = 0; i < files.length; i++) {
// System.out.println(files[i]);
// System.out.println(ftp.changeWorkingDirectory(ftpFileDir + "/" + ftpFile.getName()));
// ;
// downloderFile(files[i], ftpFileDir + "/" + ftpFile.getName());
// }
// } catch (IOException e) {
// e.printStackTrace();
// }
// } else
if(ftpFile.isFile()) {
logger.info("---isfile---");
if (ftpFile.getName().matches(regex)) {
logger.info("---匹配成功---");
logger.info("---ftpfile--" + ftpFile);
map = new HashMap<>();
// 把文件中数据加载到map中
try {
SAXReader reader = new SAXReader();
Document doc = reader.read(new File(positionFilePath));
Element root = doc.getRootElement();
Element foo;
for (Iterator i = root.elementIterator("files"); i.hasNext();) {
foo = (Element) i.next();
if (!foo.elementText("name").isEmpty() && !foo.elementText("size").isEmpty()) {
long size = Long.parseLong(foo.elementText("size"));
map.put(foo.elementText("name"), size);
}
}
} catch (Exception e) {
e.printStackTrace();
}
logger.info("map====" + map);
if (map.containsKey(ftpFile.getName())) {
logger.info("--- 进入包含---" + ftpFile.getName());
// 要修改文件中的值
readAppointedLineNumber(ftpFile, map.get(ftpFile.getName()));
} else {
logger.info("--- 进入不包含---" + ftpFile.getName());
// 要写文件增加节点
InputStream iStream = null;
InputStreamReader inputStreamReader = null;
BufferedReader br = null;
try {
// 这里定义一个字符流的输入流的节点流,用于读取文件(一个字符一个字符的读取)
iStream = ftp.retrieveFileStream(new String(ftpFile.getName().getBytes("gbk"),"utf-8"));
if (iStream != null) {
logger.info("---流不为空---");
inputStreamReader = new InputStreamReader(iStream);
br = new BufferedReader(inputStreamReader); // 在定义好的流基础上套接一个处理流,用于更加效率的读取文件(一行一行的读取)
long x = 0; // 用于统计行数,从0开始
String string = br.readLine();
while (string != null) { // readLine()方法是按行读的,返回值是这行的内容
x++; // 每读一行,则变量x累加1
getChannelProcessor().processEvent(EventBuilder.withBody(string.getBytes()));
string = br.readLine();
}
System.out.println(x);
try {
// 增加节点
add(ftpFile.getName(), x + "");
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (iStream != null) {
try {
iStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (inputStreamReader != null) {
try {
inputStreamReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
ftp.getReply();// 加上这个防止第二次获取文件流为空的问题
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
/**
* 创建文件,存储文件及文件位置
*
* @param file
* @throws Exception
*/
public static void create() throws Exception {
File file = new File(positionFilePath);
if(file.exists()){
logger.info("---json文件已存在---");
}else{
Document document = DocumentHelper.createDocument();
Element root = document.addElement("rss");
root.addAttribute("version", "2.0");
XMLWriter writer = new XMLWriter(new FileOutputStream(new File(positionFilePath)), OutputFormat.createPrettyPrint());
writer.setEscapeText(false);// 字符是否转义,默认true
writer.write(document);
writer.close();
logger.info("---创建文件成功---");
}
}
/**
* 增加文件节点
*/
public void add(String name, String length) throws Exception {
logger.info("---开始增加节点---");
SAXReader reader = new SAXReader();
Document document = reader.read(new File(positionFilePath));
Element root = document.getRootElement();
try {
Element resource = root.addElement("files");
Element nameNode = resource.addElement("name");
Element sizeNode = resource.addElement("size");
nameNode.setText(name);
sizeNode.setText(length);
XMLWriter writer = new XMLWriter(new FileOutputStream(positionFilePath), OutputFormat.createPrettyPrint());
writer.setEscapeText(false);// 字符是否转义,默认true
writer.write(document);
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
logger.info("---结束增加节点---");
}
/**
* 更新节点
*
* @throws IOException
*/
public void Update(String name, String size) throws IOException {
logger.info("---开始文件更新---");
SAXReader reader = new SAXReader();
Document document = null;
try {
document = reader.read(new File(positionFilePath));
Element root = document.getRootElement();
Element foo;
for (Iterator i = root.elementIterator("files"); i.hasNext();) {
foo = (Element) i.next();
if (name.equals(foo.elementText("name"))) {
foo.element("size").setText(size);
}
}
XMLWriter writer = new XMLWriter(new FileOutputStream(positionFilePath), OutputFormat.createPrettyPrint());
writer.setEscapeText(false);// 字符是否转义,默认true
writer.write(document);
writer.close();
} catch (Exception e) {
e.printStackTrace();
}
OutputFormat format = OutputFormat.createPrettyPrint();
format.setEncoding("UTF-8");
XMLWriter writer = new XMLWriter(new OutputStreamWriter(new FileOutputStream(positionFilePath)), format);
writer.write(document);
writer.close();
logger.info("---文件更新成功---");
}
@Override
public void stop() {
logger.info("----执行结束---");
super.stop();
if (ftp != null) {
try {
ftp.disconnect();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private Table<String, String, String> getTable(Context context, String prefix) {
Table<String, String, String> table = HashBasedTable.create();
for (Entry<String, String> e : context.getSubProperties(prefix).entrySet()) {
String[] parts = e.getKey().split("\\.", 2);
table.put(parts[0], parts[1], e.getValue());
}
return table;
}
}
9.重写sink,新建类OracleSink 继承AbstractSink 实现Configurable
public class OracleSink extends AbstractSink implements Configurable {
private Logger logger = LoggerFactory.getLogger(OracleSink.class);
private String driverClassName;
private String url;
private String user;
private String password;
private PreparedStatement preparedStatement;
private Connection conn;
private int batchSize;
private String regex;
private String sql;
public OracleSink() {
logger.info("---start---");
}
@Override
public void configure(Context context) {
url = context.getString("url");
logger.info(url + "---url---");
Preconditions.checkNotNull(url, "url must be set!!");
driverClassName = context.getString("driverClassName");
logger.info(driverClassName + "---driverClassName---");
Preconditions.checkNotNull(driverClassName, "driverClassName must be set!!");
user = context.getString("user");
logger.info(user + "---user---");
Preconditions.checkNotNull(user, "user must be set!!");
password = context.getString("password");
logger.info(password + "---password---");
Preconditions.checkNotNull(password, "password must be set!!");
regex = context.getString("regex");
logger.info(regex + "---regex---");
Preconditions.checkNotNull(regex, "regex must be set!!");
sql = context.getString("sql");
logger.info(sql + "---sql---");
Preconditions.checkNotNull(sql, "sql must be set!!");
batchSize = context.getInteger("batchSize", 100);
logger.info(batchSize + "---batchSize---");
Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
}
@Override
public void start() {
super.start();
try {
//调用Class.forName()方法加载驱动程序
Class.forName(driverClassName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
// String url = "jdbc:oracle:thin:@" + hostname + ":" + port + ":" + databaseName;
//调用DriverManager对象的getConnection()方法,获得一个Connection对象
try {
conn = DriverManager.getConnection(url, user, password);
conn.setAutoCommit(false);
//创建一个Statement对象
preparedStatement = conn.prepareStatement(sql);
} catch (SQLException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void stop() {
logger.info("----执行结束---");
super.stop();
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event;
String content;
List<Info> infos = Lists.newArrayList();
transaction.begin();
try {
for (int i = 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {//对事件进行处理
logger.info("---读取数据---");
logger.info("--数据库连接是否关闭--" + conn.isClosed());
content = new String(event.getBody());
Info info=new Info();
if(content != null && content != ""){
logger.info("---" + content + "---");
logger.info("---" + regex + "---");
//文件的字段顺序固定且有值?
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(content);
if (m.find()) {
logger.info("匹配上的长度==" + m.groupCount());
for (int j = 1; j <= m.groupCount(); j++) {
logger.info("---每个匹配后的值" + j + "=" + m.group(j) + "---");
switch (j) {
case 1:
info.setProvinceid(m.group(j));
break;
case 2:
info.setProvincename(m.group(j));
break;
case 3:
info.setSessionid(m.group(j));
break;
case 4:
info.setMainacctid(m.group(j));
break;
case 5:
info.setMainacctname(m.group(j));
break;
case 6:
info.setSlaveacctid(m.group(j));
break;
case 7:
info.setOrgid(m.group(j));
break;
case 8:
info.setOrgname(m.group(j));
break;
case 9:
info.setClientaddr(m.group(j));
break;
case 10:
info.setResid(m.group(j));
break;
case 11:
info.setResip(m.group(j));
break;
case 12:
info.setResport(m.group(j));
break;
case 13:
info.setBelongsysid(m.group(j));
break;
case 14:
info.setBelongsysname(m.group(j));
break;
case 15:
info.setLogintime(m.group(j));
break;
case 16:
info.setLogouttime(m.group(j));
break;
case 17:
info.setVideoname(m.group(j));
break;
case 18:
info.setVideofiledir(m.group(j));
break;
case 19:
info.setReserve(m.group(j));
break;
default:
break;
}
}
infos.add(info);
}
}
} else {
result = Status.BACKOFF;
break;
}
}
if (infos.size() > 0) {
logger.info("infos的长度==" + infos.size());
preparedStatement.clearBatch();
for (Info temp : infos) {
preparedStatement.setString(1, temp.getProvinceid() != null ? temp.getProvinceid() : "");
preparedStatement.setString(2, temp.getProvincename() != null ? temp.getProvincename() : "");
preparedStatement.setString(3, temp.getSessionid() != null ? temp.getSessionid() : "");
preparedStatement.setString(4, temp.getMainacctid() != null ? temp.getMainacctid() : "");
preparedStatement.setString(5, temp.getMainacctname() != null ? temp.getMainacctname() : "");
preparedStatement.setString(6, temp.getSlaveacctid() != null ? temp.getSlaveacctid() : "");
preparedStatement.setString(7, temp.getOrgid() != null ? temp.getOrgid() : "");
preparedStatement.setString(8, temp.getOrgname() != null ? temp.getOrgname() : "");
preparedStatement.setString(9, temp.getClientaddr() != null ? temp.getClientaddr() : "");
preparedStatement.setString(10, temp.getResid() != null ? temp.getResid() : "");
preparedStatement.setString(11, temp.getResip() != null ? temp.getResip() : "");
preparedStatement.setString(12, temp.getResport() != null ? temp.getResport() : "");
preparedStatement.setString(13, temp.getBelongsysid() != null ? temp.getBelongsysid() : "");
preparedStatement.setString(14, temp.getBelongsysname() != null ? temp.getBelongsysname() : "");
preparedStatement.setString(15, temp.getLogintime() != null ? temp.getLogintime() : "");
preparedStatement.setString(16, temp.getLogouttime() != null ? temp.getLogouttime() : "");
preparedStatement.setString(17, temp.getVideoname() != null ? temp.getVideoname() : "");
preparedStatement.setString(18, temp.getVideofiledir() != null ? temp.getVideofiledir() : "");
preparedStatement.setString(19, temp.getReserve() != null ? temp.getReserve() : "");
preparedStatement.addBatch();
}
try{
preparedStatement.executeBatch();
}catch(SQLException e){
logger.error("------批量执行sql错误");
e.printStackTrace();
}
conn.commit();
}
transaction.commit();
} catch (Exception e) {
e.printStackTrace();
try {
conn.rollback();
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
transaction.rollback();
} catch (Exception e2) {
logger.error("Exception in rollback. Rollback might not have been" +
"successful.", e2);
}
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
Throwables.propagate(e);
} finally {
transaction.close();
}
return result;
}
}
10.把maven打包,放在apache-flume-1.7.0-bin/lib下,把依赖的包比如dom4j、ojdbc6也放进去。
11.运行flume,在apache-flume-1.7.0-bin写命令:
//运行flume(按ctrl+c可以停止服务)
flume-ng agent -c conf -f conf/taildirsource.conf -n agent1 -Dflume.root.logger=info,console
//后台运行flume(后台运行的flume,写命令ps -ef|grep taildirsource.conf或者ps -ef|grep flume ,获取进程id,然后kill掉)
flume-ng agent -c conf -f conf/taildirsource.conf -n agent1 -Dflume.root.logger=info,console &