介绍
实现了一个简单的从实时日志文件监听,写入socket服务器,再接入Storm计算的一个流程。
源码
日志监听实时写入socket服务器
- package socket;
-
- import java.io.BufferedReader;
- import java.io.File;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.PrintWriter;
- import java.io.RandomAccessFile;
- import java.net.Socket;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
-
- public class LogViewToSocket {
- private long lastTimeFileSize = 0;
-
-
- public String getNewFile(File file)
- {
- File[] fs=file.listFiles();
- long maxtime=0;
- String newfilename="";
- for (int i=0;i<fs.length;i++)
- {
- if (fs[i].lastModified()>maxtime)
- {
- maxtime=fs[i].lastModified();
- newfilename=fs[i].getAbsolutePath();
-
- }
- }
- return newfilename;
- }
- RandomAccessFile randomFile=null;
- String newfile=null;
- String thisfile=null;
- public void realtimeShowLog(final File logFile,final PrintWriter out) throws IOException{
- newfile=getNewFile(logFile);
-
- randomFile = new RandomAccessFile(new File(newfile),"r");
-
- ScheduledExecutorService exec =
- Executors.newScheduledThreadPool(1);
- exec.scheduleWithFixedDelay(new Runnable(){
- public void run() {
- try {
-
- randomFile.seek(lastTimeFileSize);
- String tmp = "";
- while( (tmp = randomFile.readLine())!= null) {
- System.out.println(new String(tmp.getBytes("ISO8859-1")));
- out.println(new String(tmp.getBytes("ISO8859-1")));
- out.flush();
- }
- thisfile=getNewFile(logFile);
- if(!thisfile.equals(newfile))
-
- {
- randomFile = new RandomAccessFile(new File(newfile),"r");
- lastTimeFileSize=0;
- }
- else
-
- lastTimeFileSize = randomFile.length();
-
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }, 0, 1, TimeUnit.SECONDS);
- }
-
- public static void main(String[] args) throws Exception {
- LogViewToSocket view = new LogViewToSocket();
-
- Socket socket=new Socket("192.168.27.100",5678);
-
- PrintWriter out=new PrintWriter(socket.getOutputStream());
-
-
-
- final File tmpLogFile = new File("/home/hadoop/test");
- view.realtimeShowLog(tmpLogFile,out);
-
-
- }
-
- }
socket服务器处理
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.PrintWriter;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.net.SocketAddress;
- import java.util.*;
-
- public class MyServerMulti {
- private static Socket socket1;
-
- public static void main(String[] args) throws IOException {
- ServerSocket server = new ServerSocket(5678);
- int i=0;
- ArrayList<PrintWriter> outs=new ArrayList<PrintWriter>();
-
-
- Socket socket1=null;
- while (true) {
-
- Socket socket = server.accept();
- i++;
- System.out.println(i);
- System.out.println(socket.getInetAddress());
- PrintWriter out= new PrintWriter(socket.getOutputStream());
- outs.add(out);
- if(i==1)
- socket1=socket;
- if(i==2)
-
- invoke(socket1,outs);
-
-
- }
- }
-
- private static void invoke(final Socket client, final ArrayList<PrintWriter> outs) throws IOException {
- new Thread(new Runnable() {
- public void run() {
- BufferedReader in = null;
- PrintWriter out = null;
- PrintWriter out1 = null;
- try {
- in = new BufferedReader(new InputStreamReader(client.getInputStream()));
- out = new PrintWriter(client.getOutputStream());
-
- while (true) {
- String msg = in.readLine();
- System.out.println(msg);
- out.println("Server received " + msg);
- out.flush();
-
-
- for(int i=0;i<outs.size();i++)
- {
- out1=outs.get(i);
- System.out.println(i);
- System.out.println("send msg:"+msg);
- out1.println(msg);
- out1.flush();
- }
-
- System.out.println(client.getInetAddress());
- if (msg.equals("bye")) {
- break;
- }
- }
- } catch(IOException ex) {
- ex.printStackTrace();
- } finally {
- try {
- in.close();
- } catch (Exception e) {}
- try {
- out.close();
- } catch (Exception e) {}
- try {
- client.close();
- } catch (Exception e) {}
- }
- }
- }).start();
- }
- }
storm topology
- import java.io.BufferedReader;
- import java.io.BufferedWriter;
- import java.io.File;
- import java.io.FileNotFoundException;
- import java.io.FileOutputStream;
- import java.io.FileReader;
- import java.io.FileWriter;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.OutputStreamWriter;
- import java.io.PrintWriter;
- import java.io.RandomAccessFile;
- import java.net.Socket;
- import java.net.UnknownHostException;
- import java.util.Map;
-
-
- import backtype.storm.Config;
- import backtype.storm.LocalCluster;
- import backtype.storm.StormSubmitter;
- import backtype.storm.generated.AlreadyAliveException;
- import backtype.storm.generated.InvalidTopologyException;
- import backtype.storm.spout.SpoutOutputCollector;
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.BasicOutputCollector;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.topology.base.BaseBasicBolt;
- import backtype.storm.topology.base.BaseRichBolt;
- import backtype.storm.topology.base.BaseRichSpout;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
- import backtype.storm.utils.Utils;
-
- public class SocketProcess {
- public static class SocketSpout extends BaseRichSpout {
-
-
- static Socket sock=null;
- static BufferedReader in=null;
- String str=null;
- private static final long serialVersionUID = 1L;
- private SpoutOutputCollector _collector;
- private BufferedReader br;
- private String dataFile;
- private BufferedWriter bw2;
- RandomAccessFile randomFile;
- private long lastTimeFileSize = 0;
- int cnt=0;
-
- SocketSpout(){
-
- }
-
-
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
-
- _collector = collector;
- try {
- sock=new Socket("192.168.27.100",5678);
- in=
- new BufferedReader(new InputStreamReader(sock.getInputStream()));
- } catch (UnknownHostException e) {
-
- e.printStackTrace();
- } catch (IOException e) {
-
- e.printStackTrace();
- }
-
- }
-
-
- @Override
- public void nextTuple() {
-
- if(sock==null){
- try {
- sock=new Socket("192.168.27.100",5678);
- in=
- new BufferedReader(new InputStreamReader(sock.getInputStream()));
- } catch (UnknownHostException e) {
-
- e.printStackTrace();
- } catch (IOException e) {
-
- e.printStackTrace();
- }
- }
-
-
- while(true){
-
- try {
- str = in.readLine();
- } catch (IOException e) {
-
- e.printStackTrace();
- }
- System.out.println(str);
- _collector.emit(new Values(str));
- if(str.equals("end")){
- break;
- }
- }
-
-
-
-
-
-
-
-
-
- }
-
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- declarer.declare(new Fields("line"));
- }
-
- }
-
-
- public static class Process extends BaseRichBolt{
-
- private String _seperator;
- private String _outFile;
- PrintWriter pw;
- private OutputCollector _collector;
- private BufferedWriter bw;
-
- public Process(String outFile) {
-
- this._outFile = outFile;
-
- }
-
-
- @Override
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
-
- this._collector = collector;
- File out = new File(_outFile);
- try {
- bw = new BufferedWriter(new OutputStreamWriter(
- new FileOutputStream(out, true)));
- } catch (IOException e1) {
-
- e1.printStackTrace();
- }
- }
-
-
- @Override
- public void execute(Tuple input) {
-
- String line = input.getString(0);
-
-
- try {
- bw.write(line+",bkeep"+"\n");
- bw.flush();
- } catch (IOException e) {
-
- e.printStackTrace();
- }
-
- _collector.emit(new Values(line));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- declarer.declare(new Fields("line"));
- }
-
- }
-
- public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{
-
- String outFile = argv[0];
- boolean distribute = Boolean.valueOf(argv[1]);
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("spout", new SocketSpout(), 1);
- builder.setBolt("bolt", new Process(outFile),1).shuffleGrouping("spout");
- Config conf = new Config();
- if(distribute){
- StormSubmitter.submitTopology("SocketProcess", conf, builder.createTopology());
- }else{
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("SocketProcess", conf, builder.createTopology());
- }
- }
- }
最后执行
- storm jar stormtest.jar socket.SocketProcess /home/hadoop/out_socket.txt true
spout接受从socket服务器实时发送过来的数据,经过topology处理,最终将数据写入out_socket.txt文件