目的
实时监听多个目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)
源码
- import java.io.BufferedReader;
- import java.io.BufferedWriter;
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.FileNotFoundException;
- import java.io.FileReader;
- import java.io.FileWriter;
- import java.io.IOException;
- import java.io.LineNumberReader;
-
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Properties;
- import java.util.Random;
-
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- ;
-
- public class XTail_Line {
-
-
-
-
- public static class TailFileThread extends Thread
- {
- File file;
- LineNumberReader randomFile=null;
- String newfile=null;
- String thisfile=null;
- String prefile=null;
- private long lastTimeFileSize = 0;
- private String drname=null;
- int ln=0;
- int beginln=0;
- private Producer<String,String> inner;
- java.util.Random ran = new Random();
- String topicname=null;
-
- public TailFileThread(String path,String drname,String topicname) throws FileNotFoundException, IOException
- {
- file=new File(path);
- this.drname=drname;
- this.topicname=topicname;
-
- Properties properties = new Properties();
-
-
- properties.load(new FileInputStream("producer.properties"));
-
- ProducerConfig config = new ProducerConfig(properties);
-
- inner = new Producer<String, String>(config);
- }
-
- public void send(String topicName,String message) {
- if(topicName == null || message == null){
- return;
- }
-
-
- KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message);
-
- inner.send(km);
-
- }
-
- public void send(String topicName,Collection<String> messages) {
- if(topicName == null || messages == null){
- return;
- }
- if(messages.isEmpty()){
- return;
- }
- List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
- for(String entry : messages){
- KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
- kms.add(km);
- }
- inner.send(kms);
- }
-
- public void close(){
- inner.close();
- }
-
- public String getNewFile(File file)
- {
- File[] fs=file.listFiles();
- long maxtime=0;
- String newfilename="";
- for (int i=0;i<fs.length;i++)
- {
- if (fs[i].isFile()&&fs[i].lastModified()>maxtime)
- {
- maxtime=fs[i].lastModified();
- newfilename=fs[i].getAbsolutePath();
-
- }
- }
- return newfilename;
- }
-
- public void writePosition(String path,int rn)
- {
- try {
- BufferedWriter out = new BufferedWriter(new FileWriter(drname+".position"));
- out.write(path+","+rn);
- out.close();
- } catch (IOException e) {
- }
- }
-
- public void run()
- {
-
- thisfile=getNewFile(file);
- prefile=thisfile;
-
- try {
- BufferedReader br=new BufferedReader(new FileReader(drname+".position"));
- String line=br.readLine();
- if (line!=null &&line.contains(","))
- {
- thisfile=line.split(",")[0];
- prefile=thisfile;
- beginln=Integer.parseInt(line.split(",")[1]);
- }
-
-
- } catch (FileNotFoundException e2) {
-
- e2.printStackTrace();
- }
- catch (IOException e2) {
-
- e2.printStackTrace();
- }
-
-
- try {
- randomFile = new LineNumberReader(new FileReader(thisfile));
- } catch (FileNotFoundException e) {
-
- e.printStackTrace();
- }
- while (true)
- {
- try {
- Thread.sleep(100);
-
- if(isInterrupted())
- {
- System.out.println("Interrupted...");
- break;
- }
- } catch (InterruptedException e1) {
-
- e1.printStackTrace();
- }
- try {
-
-
- String tmp = "";
- while( (tmp = randomFile.readLine())!= null) {
- int currln=randomFile.getLineNumber();
-
- if (currln>beginln)
- send(topicname,new String(tmp.getBytes("utf8")));
-
- ln++;
-
- if (ln>100)
- {
- writePosition(thisfile,currln);
- ln=0;
- }
-
- }
- thisfile=getNewFile(file);
- if(!thisfile.equals(prefile))
-
- {
- randomFile.close();
- randomFile = new LineNumberReader(new FileReader(thisfile));
- prefile=thisfile;
- beginln=0;
- }
-
-
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- public static void main(String[] args) throws Exception {
-
- if (args.length!=2)
- {
- System.out.println("usage:topicname pathname");
- System.exit(1);
- }
- String topicname=args[0];
- String pathname=args[1];
-
-
- HashMap<String,TailFileThread> hm=new HashMap<String,TailFileThread>();
- File tmpLogFile = new File(pathname);
- File[] fs=tmpLogFile.listFiles();
- while (true)
- {
- fs=tmpLogFile.listFiles();
- for (int i=0;i<fs.length;i++)
- {
- if(fs[i].isDirectory())
- {
- String path=fs[i].getAbsolutePath();
-
- String drname=fs[i].getName();
-
-
- if (drname.contains("xx") || drname.contains("yy") || drname.contains("zz") || drname.contains("aa")
- )
- {
- if (hm.containsKey(path))
- {
- if (!hm.get(path).isAlive())
- {
- hm.get(path).interrupt();
- TailFileThread tt=new TailFileThread(path,drname,topicname);
- tt.start();
- hm.put(path, tt);
- }
-
- }
-
- else
- {
- TailFileThread tt=new TailFileThread(path,drname,topicname);
- tt.start();
- hm.put(path, tt);
-
-
- }
-
- }
-
- }
- }
- Thread.sleep(100);
- }
-
- }
-
- }
转:http://blog.csdn.net/u011750989/article/details/21957741