介绍
实现了一个简单的从实时日志文件监听,写入socket服务器,再接入Storm计算的一个流程。
源码
日志监听实时写入socket服务器
 
- package socket;  
-   
- import java.io.BufferedReader;  
- import java.io.File;       
- import java.io.IOException;       
- import java.io.InputStreamReader;  
- import java.io.PrintWriter;  
- import java.io.RandomAccessFile;       
- import java.net.Socket;  
- import java.util.concurrent.Executors;       
- import java.util.concurrent.ScheduledExecutorService;       
- import java.util.concurrent.TimeUnit;       
-       
- public class LogViewToSocket {       
-     private long lastTimeFileSize = 0;  
-     
-       
-     public String getNewFile(File file)  
-     {  
-         File[] fs=file.listFiles();  
-         long maxtime=0;  
-         String newfilename="";  
-         for (int i=0;i<fs.length;i++)  
-         {  
-             if (fs[i].lastModified()>maxtime)  
-             {  
-                 maxtime=fs[i].lastModified();  
-                 newfilename=fs[i].getAbsolutePath();  
-                   
-             }  
-         }  
-         return newfilename;  
-     }  
-      RandomAccessFile randomFile=null;  
-      String newfile=null;  
-      String thisfile=null;  
-     public void realtimeShowLog(final File logFile,final PrintWriter out) throws IOException{       
-            newfile=getNewFile(logFile);  
-         
-             randomFile = new RandomAccessFile(new File(newfile),"r");       
-         
-         ScheduledExecutorService exec =        
-             Executors.newScheduledThreadPool(1);       
-         exec.scheduleWithFixedDelay(new Runnable(){       
-             public void run() {       
-                 try {       
-                     
-                     randomFile.seek(lastTimeFileSize);       
-                     String tmp = "";       
-                     while( (tmp = randomFile.readLine())!= null) {       
-                         System.out.println(new String(tmp.getBytes("ISO8859-1")));   
-                         out.println(new String(tmp.getBytes("ISO8859-1")));  
-                         out.flush();   
-                     }     
-                    thisfile=getNewFile(logFile);  
-                    if(!thisfile.equals(newfile))  
-                      
-                    {  
-                        randomFile = new RandomAccessFile(new File(newfile),"r");  
-                        lastTimeFileSize=0;  
-                    }  
-                    else  
-                          
-                     lastTimeFileSize = randomFile.length();       
-                      
-                 } catch (IOException e) {       
-                     throw new RuntimeException(e);       
-                 }       
-             }       
-         }, 0, 1, TimeUnit.SECONDS);       
-     }       
-            
-     public static void main(String[] args) throws Exception {       
-         LogViewToSocket view = new LogViewToSocket();       
-   
-             Socket socket=new Socket("192.168.27.100",5678);   
-      
-         PrintWriter out=new PrintWriter(socket.getOutputStream());      
-            
-             
-   
-         final File tmpLogFile = new File("/home/hadoop/test");       
-         view.realtimeShowLog(tmpLogFile,out);   
-        
-           
-     }       
-       
- }      
 
 
socket服务器处理
 
- import java.io.BufferedReader;    
- import java.io.IOException;    
- import java.io.InputStreamReader;    
- import java.io.PrintWriter;    
- import java.net.ServerSocket;    
- import java.net.Socket;    
- import java.net.SocketAddress;  
- import java.util.*;  
-     
- public class MyServerMulti {    
-     private static Socket socket1;  
-   
-     public static void main(String[] args) throws IOException {    
-         ServerSocket server = new ServerSocket(5678);    
-           int i=0;  
-           ArrayList<PrintWriter> outs=new ArrayList<PrintWriter>();  
-             
-           
-           Socket socket1=null;  
-         while (true) {  
-               
-             Socket socket = server.accept();    
-              i++;  
-              System.out.println(i);  
-              System.out.println(socket.getInetAddress());  
-                  PrintWriter out= new PrintWriter(socket.getOutputStream());  
-                  outs.add(out);  
-                  if(i==1)  
-                       socket1=socket;  
-                  if(i==2)  
-                        
-                  invoke(socket1,outs);  
-                    
-               
-         }    
-     }    
-         
-     private static void invoke(final Socket client, final ArrayList<PrintWriter> outs) throws IOException {    
-         new Thread(new Runnable() {    
-             public void run() {    
-                 BufferedReader in = null;    
-                 PrintWriter out = null;    
-                 PrintWriter out1 = null;  
-                 try {    
-                     in = new BufferedReader(new InputStreamReader(client.getInputStream()));    
-                     out = new PrintWriter(client.getOutputStream());    
-     
-                     while (true) {    
-                         String msg = in.readLine();    
-                         System.out.println(msg);    
-                         out.println("Server received " + msg);    
-                         out.flush();    
-                           
-                         
-                         for(int i=0;i<outs.size();i++)  
-                         {  
-                             out1=outs.get(i);  
-                             System.out.println(i);  
-                             System.out.println("send msg:"+msg);  
-                              out1.println(msg);  
-                             out1.flush();  
-                         }  
-                           
-                         System.out.println(client.getInetAddress());  
-                         if (msg.equals("bye")) {    
-                             break;    
-                         }    
-                     }    
-                 } catch(IOException ex) {    
-                     ex.printStackTrace();    
-                 } finally {    
-                     try {    
-                         in.close();    
-                     } catch (Exception e) {}    
-                     try {    
-                         out.close();    
-                     } catch (Exception e) {}    
-                     try {    
-                         client.close();    
-                     } catch (Exception e) {}    
-                 }    
-             }    
-         }).start();    
-     }    
- }    
 
storm topology
 
- import java.io.BufferedReader;  
- import java.io.BufferedWriter;  
- import java.io.File;  
- import java.io.FileNotFoundException;  
- import java.io.FileOutputStream;  
- import java.io.FileReader;  
- import java.io.FileWriter;  
- import java.io.IOException;  
- import java.io.InputStreamReader;  
- import java.io.OutputStreamWriter;  
- import java.io.PrintWriter;  
- import java.io.RandomAccessFile;  
- import java.net.Socket;  
- import java.net.UnknownHostException;  
- import java.util.Map;  
-    
-    
- import backtype.storm.Config;  
- import backtype.storm.LocalCluster;  
- import backtype.storm.StormSubmitter;  
- import backtype.storm.generated.AlreadyAliveException;  
- import backtype.storm.generated.InvalidTopologyException;  
- import backtype.storm.spout.SpoutOutputCollector;  
- import backtype.storm.task.OutputCollector;  
- import backtype.storm.task.TopologyContext;  
- import backtype.storm.topology.BasicOutputCollector;  
- import backtype.storm.topology.OutputFieldsDeclarer;  
- import backtype.storm.topology.TopologyBuilder;  
- import backtype.storm.topology.base.BaseBasicBolt;  
- import backtype.storm.topology.base.BaseRichBolt;  
- import backtype.storm.topology.base.BaseRichSpout;  
- import backtype.storm.tuple.Fields;  
- import backtype.storm.tuple.Tuple;  
- import backtype.storm.tuple.Values;  
- import backtype.storm.utils.Utils;  
-    
- public class SocketProcess {  
-          public static class  SocketSpout extends BaseRichSpout {  
-    
-                    
-               static Socket sock=null;  
-               static BufferedReader in=null;  
-               String str=null;  
-                    private static final long serialVersionUID = 1L;  
-                    private SpoutOutputCollector _collector;  
-                    private BufferedReader br;  
-                    private String dataFile;  
-                    private BufferedWriter bw2;  
-                     RandomAccessFile randomFile;  
-                     private long lastTimeFileSize = 0;   
-                     int cnt=0;  
-                    
-                     SocketSpout(){  
-                        
-                    }  
-    
-                    
-                    @Override  
-                    public void open(Map conf, TopologyContext context,  
-                                      SpoutOutputCollector collector) {  
-                             
-                             _collector = collector;  
-                             try {  
-                                 sock=new Socket("192.168.27.100",5678);  
-                                  in=     
-                                     new BufferedReader(new InputStreamReader(sock.getInputStream()));     
-                             } catch (UnknownHostException e) {  
-                                 
-                                 e.printStackTrace();  
-                             } catch (IOException e) {  
-                                 
-                                 e.printStackTrace();  
-                             }  
-                          
-                    }  
-    
-                    
-                    @Override  
-                    public void nextTuple() {  
-                             
-                        if(sock==null){  
-                              try {  
-                                 sock=new Socket("192.168.27.100",5678);  
-                                  in=     
-                                         new BufferedReader(new InputStreamReader(sock.getInputStream()));    
-                             } catch (UnknownHostException e) {  
-                                 
-                                 e.printStackTrace();  
-                             } catch (IOException e) {  
-                                 
-                                 e.printStackTrace();  
-                             }   
-                        }  
-                          
-                          
-                        while(true){      
-                             
-                         try {  
-                             str = in.readLine();  
-                         } catch (IOException e) {  
-                             
-                             e.printStackTrace();  
-                         }  
-                         System.out.println(str);    
-                         _collector.emit(new Values(str));  
-                         if(str.equals("end")){      
-                             break;      
-                             }   
-                         }  
-                          
-                          
-                          
-                          
-                          
-                          
-                          
-                               
-                               
-                    }  
-    
-    
-                    @Override  
-                    public void declareOutputFields(OutputFieldsDeclarer declarer) {  
-                             
-                             declarer.declare(new Fields("line"));  
-                    }  
-                     
-          }  
-           
-    
-          public static class Process extends BaseRichBolt{  
-    
-                    private String _seperator;  
-                    private String _outFile;  
-                    PrintWriter pw;  
-                    private OutputCollector _collector;  
-                    private BufferedWriter bw;  
-                     
-                    public Process(String outFile) {  
-                              
-                             this._outFile   = outFile;  
-                              
-                    }  
-                     
-                    
-                    @Override  
-                    public void prepare(Map stormConf, TopologyContext context,  
-                                      OutputCollector collector) {  
-                             
-                             this._collector = collector;  
-                             File out = new File(_outFile);  
-                             try {  
-                                      bw = new BufferedWriter(new OutputStreamWriter(   
-                              new FileOutputStream(out, true)));   
-                             } catch (IOException e1) {  
-                                      
-                                      e1.printStackTrace();  
-                             }                  
-                    }  
-                     
-                    
-                    @Override  
-                    public void execute(Tuple input) {  
-                             
-                             String line = input.getString(0);  
-                        
-                          
-                             try {  
-                                      bw.write(line+",bkeep"+"\n");  
-                                      bw.flush();  
-                             } catch (IOException e) {  
-                                      
-                                      e.printStackTrace();  
-                             }  
-                              
-                             _collector.emit(new Values(line));  
-                    }  
-    
-                    @Override  
-                    public void declareOutputFields(OutputFieldsDeclarer declarer) {  
-                             
-                             declarer.declare(new Fields("line"));  
-                    }  
-                     
-          }  
-           
-          public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{  
-                   
-                    String outFile   = argv[0]; 
-                    boolean distribute = Boolean.valueOf(argv[1]);       
-                    TopologyBuilder builder = new TopologyBuilder();  
-         builder.setSpout("spout", new  SocketSpout(), 1);   
-         builder.setBolt("bolt", new Process(outFile),1).shuffleGrouping("spout");  
-         Config conf = new Config();  
-         if(distribute){  
-             StormSubmitter.submitTopology("SocketProcess", conf, builder.createTopology());  
-         }else{  
-                  LocalCluster cluster = new LocalCluster();  
-                  cluster.submitTopology("SocketProcess", conf, builder.createTopology());  
-         }  
-          }         
- }  
 
 
最后执行
- storm jar stormtest.jar socket.SocketProcess /home/hadoop/out_socket.txt true  
 
spout接受从socket服务器实时发送过来的数据,经过topology处理,最终将数据写入out_socket.txt文件