介绍
实现了一个简单的从实时日志文件监听,写入socket服务器,再接入Storm计算的一个流程。
源码
日志监听实时写入socket服务器
package socket;import java.io.BufferedReader;import java.io.File; import java.io.IOException; import java.io.InputStreamReader;import java.io.PrintWriter;import java.io.RandomAccessFile; import java.net.Socket;import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /* * 监测数据,通过socket远程发送到另外服务器 ,见MyServerMulti * ClientRead再通过服务器从socket里读 * */ public class LogViewToSocket { private long lastTimeFileSize = 0; //上次文件大小 /** * 实时输出日志信息 * @param logFile 日志文件 * @throws IOException */ public String getNewFile(File file) { File[] fs=file.listFiles(); long maxtime=0; String newfilename=""; for (int i=0;i<fs.length;i++) { if (fs[i].lastModified()>maxtime) { maxtime=fs[i].lastModified(); newfilename=fs[i].getAbsolutePath(); } } return newfilename; } RandomAccessFile randomFile=null; String newfile=null; String thisfile=null; public void realtimeShowLog(final File logFile,final PrintWriter out) throws IOException{ newfile=getNewFile(logFile); //指定文件可读可写 randomFile = new RandomAccessFile(new File(newfile),"r"); //启动一个线程每1秒钟读取新增的日志信息 ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); exec.scheduleWithFixedDelay(new Runnable(){ public void run() { try { //获得变化部分的 randomFile.seek(lastTimeFileSize); String tmp = ""; while( (tmp = randomFile.readLine())!= null) { System.out.println(new String(tmp.getBytes("ISO8859-1"))); out.println(new String(tmp.getBytes("ISO8859-1"))); out.flush(); } thisfile=getNewFile(logFile); if(!thisfile.equals(newfile)) { randomFile = new RandomAccessFile(new File(newfile),"r"); lastTimeFileSize=0; } else lastTimeFileSize = randomFile.length(); } catch (IOException e) { throw new RuntimeException(e); } } }, 0, 1, TimeUnit.SECONDS); } public static void main(String[] args) throws Exception { LogViewToSocket view = new LogViewToSocket(); Socket socket=new Socket("192.168.27.100",5678); PrintWriter out=new PrintWriter(socket.getOutputStream()); final File tmpLogFile = new File("/home/hadoop/test"); view.realtimeShowLog(tmpLogFile,out); // socket.close(); } }
socket服务器处理
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.util.*; public class MyServerMulti { private static Socket socket1; public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(5678); int i=0; ArrayList<PrintWriter> outs=new ArrayList<PrintWriter>(); /* * 一个client socket发送数据过来, server端再发到其他client socket端 * */ Socket socket1=null; while (true) { Socket socket = server.accept(); i++; System.out.println(i); System.out.println(socket.getInetAddress()); PrintWriter out= new PrintWriter(socket.getOutputStream()); outs.add(out); if(i==1) socket1=socket; if(i==2) invoke(socket1,outs); } } private static void invoke(final Socket client, final ArrayList<PrintWriter> outs) throws IOException { new Thread(new Runnable() { public void run() { BufferedReader in = null; PrintWriter out = null; PrintWriter out1 = null; try { in = new BufferedReader(new InputStreamReader(client.getInputStream())); out = new PrintWriter(client.getOutputStream()); while (true) { String msg = in.readLine(); System.out.println(msg); out.println("Server received " + msg); out.flush(); /*数据转发送到多个client*/ for(int i=0;i<outs.size();i++) { out1=outs.get(i); System.out.println(i); System.out.println("send msg:"+msg); out1.println(msg); out1.flush(); } System.out.println(client.getInetAddress()); if (msg.equals("bye")) { break; } } } catch(IOException ex) { ex.printStackTrace(); } finally { try { in.close(); } catch (Exception e) {} try { out.close(); } catch (Exception e) {} try { client.close(); } catch (Exception e) {} } } }).start(); } }
storm topology
import java.io.BufferedReader;import java.io.BufferedWriter;import java.io.File;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.FileReader;import java.io.FileWriter;import java.io.IOException;import java.io.InputStreamReader;import java.io.OutputStreamWriter;import java.io.PrintWriter;import java.io.RandomAccessFile;import java.net.Socket;import java.net.UnknownHostException;import java.util.Map; //import mytest.ThroughputTest.GenSpout; import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.TopologyBuilder;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;/* * * * storm jar stormtest.jar socket.SocketProcess /home/hadoop/out_socket.txt true * */ public class SocketProcess { public static class SocketSpout extends BaseRichSpout { /** */ static Socket sock=null; static BufferedReader in=null; String str=null; private static final long serialVersionUID = 1L; private SpoutOutputCollector _collector; private BufferedReader br; private String dataFile; private BufferedWriter bw2; RandomAccessFile randomFile; private long lastTimeFileSize = 0; int cnt=0; //定义spout文件 SocketSpout(){ } //定义如何读取spout文件 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub _collector = collector; try { sock=new Socket("192.168.27.100",5678); in= new BufferedReader(new InputStreamReader(sock.getInputStream())); } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //获取下一个tuple的方法 @Override public void nextTuple() { // TODO Auto-generated method stub if(sock==null){ try { sock=new Socket("192.168.27.100",5678); in= new BufferedReader(new InputStreamReader(sock.getInputStream())); } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } while(true){ try { str = in.readLine(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(str); _collector.emit(new Values(str)); if(str.equals("end")){ break; } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("line")); } } public static class Process extends BaseRichBolt{ private String _seperator; private String _outFile; PrintWriter pw; private OutputCollector _collector; private BufferedWriter bw; public Process(String outFile) { this._outFile = outFile; } //把输出结果保存到外部文件里面。 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub this._collector = collector; File out = new File(_outFile); try {// br = new BufferedWriter(new FileWriter(out)); bw = new BufferedWriter(new OutputStreamWriter( new FileOutputStream(out, true))); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } //blot计算单元,把tuple中的数据添加一个bkeep和回车。然后保存到outfile指定的文件中。 @Override public void execute(Tuple input) { // TODO Auto-generated method stub String line = input.getString(0);// System.out.println(line); // String[] str = line.split(_seperator); // System.out.println(str[2]); try { bw.write(line+",bkeep"+"\n"); bw.flush(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } _collector.emit(new Values(line)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("line")); } } public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{ String outFile = argv[0]; //输出文件 boolean distribute = Boolean.valueOf(argv[1]); //本地模式还是集群模式 TopologyBuilder builder = new TopologyBuilder(); //build一个topology builder.setSpout("spout", new SocketSpout(), 1); //指定spout builder.setBolt("bolt", new Process(outFile),1).shuffleGrouping("spout"); //指定bolt,包括bolt、process和grouping Config conf = new Config(); if(distribute){ StormSubmitter.submitTopology("SocketProcess", conf, builder.createTopology()); }else{ LocalCluster cluster = new LocalCluster(); cluster.submitTopology("SocketProcess", conf, builder.createTopology()); } } }
最后执行
storm jar stormtest.jar socket.SocketProcess /home/hadoop/out_socket.txt true
spout接受从socket服务器实时发送过来的数据,经过topology处理,最终将数据写入out_socket.txt文件
本文转自ljianbing51CTO博客,原文链接:http://blog.51cto.com/ljianbing/1733313 ,如需转载请自行联系原作者