目的
实时监听某目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)
源码:
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.LineNumberReader; import java.io.PrintWriter; import java.io.RandomAccessFile; import java.net.NoRouteToHostException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /* * 自己在源服务器写生产者往kafka插入数据,注意文件"producer.properties放在linux下该jar文件同一目录 * 监听某个目录下的文件数据然后写入kafka * nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position >/home/sre/portalhandler/handler.log 2>&1 & * * */ public class PortalLogTail_Line { private Producer<String,String> inner; java.util.Random ran = new Random(); public PortalLogTail_Line() throws FileNotFoundException, IOException { Properties properties = new Properties(); // properties.load(ClassLoader.getSystemResourceAsStream("producer.properties")); properties.load(new FileInputStream("producer.properties")); ProducerConfig config = new ProducerConfig(properties); inner = new Producer<String, String>(config); } public void send(String topicName,String message) { if(topicName == null || message == null){ return; } // KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message); //随机作为key,hash分散到各个分区 KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message); // KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message); inner.send(km); } public void send(String topicName,Collection<String> messages) { if(topicName == null || messages == null){ return; } if(messages.isEmpty()){ return; } List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(); for(String entry : messages){ KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry); kms.add(km); } inner.send(kms); } public void close(){ inner.close(); } public String getNewFile(File file) { File[] fs=file.listFiles(); long maxtime=0; String newfilename=""; for (int i=0;i<fs.length;i++) { if (fs[i].lastModified()>maxtime && fs[i].getName().contains("access")) { maxtime=fs[i].lastModified(); newfilename=fs[i].getAbsolutePath(); } } return newfilename; } //写入文件名及行号 public void writePosition(String path,int rn,String positionpath) { try { BufferedWriter out = new BufferedWriter(new FileWriter(positionpath)); out.write(path+","+rn); out.close(); } catch (IOException e) { } } LineNumberReader randomFile=null; String newfile=null; String thisfile=null; String prefile=null; int ln=0; int beginln=0; public void realtimeShowLog(final File file,final String topicname, final String positionpath) throws IOException{ //启动一个线程每1秒钟读取新增的日志信息 new Thread(new Runnable(){ public void run() { thisfile=getNewFile(file); prefile=thisfile; //访问position文件,如果记录了文件路径,及行号,则定位,否则使用最新的文件 try { BufferedReader br=new BufferedReader(new FileReader(positionpath)); String line=br.readLine(); if (line!=null &&line.contains(",")) { thisfile=line.split(",")[0]; prefile=thisfile; beginln=Integer.parseInt(line.split(",")[1]); } } catch (FileNotFoundException e2) { // TODO Auto-generated catch block e2.printStackTrace(); } catch (IOException e2) { // TODO Auto-generated catch block e2.printStackTrace(); } //指定文件可读可写 try { randomFile = new LineNumberReader(new FileReader(thisfile)); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } while (true) { try { Thread.sleep(100); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } try { //获得变化部分的 // randomFile.seek(lastTimeFileSize); String tmp = ""; while( (tmp = randomFile.readLine())!= null) { int currln=randomFile.getLineNumber(); //beginln默认为0 if (currln>beginln) send(topicname,new String(tmp.getBytes("utf8"))); ln++; //每发生一条写一次影响效率,连续发100次后再记录位置 if (ln>100) { writePosition(thisfile,currln,positionpath); ln=0; } } thisfile=getNewFile(file); if(!thisfile.equals(prefile)) { randomFile.close(); randomFile = new LineNumberReader(new FileReader(thisfile)); prefile=thisfile; beginln=0; } } catch (IOException e) { throw new RuntimeException(e); } } }}).start(); } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { PortalLogTail_Line producer = new PortalLogTail_Line(); if (args.length!=3) { System.out.println("usage:topicname pathname positionpath"); System.exit(1); } String topicname=args[0]; String pathname=args[1]; String positionpath=args[2]; final File tmpLogFile = new File(pathname); producer.realtimeShowLog(tmpLogFile,topicname,positionpath); } }
producer.properties文件放在同级目录下
metadata.broker.list=xxx:10909,xxx:10909 # name of the partitioner class for partitioning events; default partition spreads data randomly #partitioner.class= # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync #producer.type=async # specify the compression codec for all data generated: none , gzip, snappy. # the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally compression.codec=none #compression.codec=gzip # message encoder serializer.class=kafka.serializer.StringEncoder
测试
最后执行:nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position >/home/sre/portalhandler/handler.log 2>&1 &