storm实时计算实例(socket实时接入)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

介绍

实现了一个简单的从实时日志文件监听,写入socket服务器,再接入Storm计算的一个流程。

源码

日志监听实时写入socket服务器

package socket;import java.io.BufferedReader;import java.io.File;     
import java.io.IOException;     
import java.io.InputStreamReader;import java.io.PrintWriter;import java.io.RandomAccessFile;     
import java.net.Socket;import java.util.concurrent.Executors;     
import java.util.concurrent.ScheduledExecutorService;     
import java.util.concurrent.TimeUnit;     
/*
 * 监测数据,通过socket远程发送到另外服务器 ,见MyServerMulti
 * ClientRead再通过服务器从socket里读
 * 
 */
    public class LogViewToSocket {     
    private long lastTimeFileSize = 0;  //上次文件大小     
    /**   
     * 实时输出日志信息   
     * @param logFile 日志文件   
     * @throws IOException   
     */    
      public String getNewFile(File file)  {    File[] fs=file.listFiles();    long maxtime=0;    String newfilename="";    for (int i=0;i<fs.length;i++)    {      if (fs[i].lastModified()>maxtime)      {        maxtime=fs[i].lastModified();        newfilename=fs[i].getAbsolutePath();              }    }    return newfilename;  }   RandomAccessFile randomFile=null;   String newfile=null;   String thisfile=null;    public void realtimeShowLog(final File logFile,final PrintWriter out) throws IOException{     
    	   newfile=getNewFile(logFile);        //指定文件可读可写     
            randomFile = new RandomAccessFile(new File(newfile),"r");     
        //启动一个线程每1秒钟读取新增的日志信息     
        ScheduledExecutorService exec =      
            Executors.newScheduledThreadPool(1);     
        exec.scheduleWithFixedDelay(new Runnable(){     
            public void run() {     
                try {     
                    //获得变化部分的     
                    randomFile.seek(lastTimeFileSize);     
                    String tmp = "";     
                    while( (tmp = randomFile.readLine())!= null) {     
                        System.out.println(new String(tmp.getBytes("ISO8859-1"))); 
                        out.println(new String(tmp.getBytes("ISO8859-1")));
                        out.flush(); 
                    }   
                   thisfile=getNewFile(logFile);                   if(!thisfile.equals(newfile))
                   
                   {
                	   randomFile = new RandomAccessFile(new File(newfile),"r");
                	   lastTimeFileSize=0;
                   }                   else
                	   
                    lastTimeFileSize = randomFile.length();     
                   
                } catch (IOException e) {     
                    throw new RuntimeException(e);     
                }     
            }     
        }, 0, 1, TimeUnit.SECONDS);     
    }     
         
    public static void main(String[] args) throws Exception {     
    	LogViewToSocket view = new LogViewToSocket();     

    		Socket socket=new Socket("192.168.27.100",5678); 
   
    	PrintWriter out=new PrintWriter(socket.getOutputStream());    
    	 
    	  

        final File tmpLogFile = new File("/home/hadoop/test");     
        view.realtimeShowLog(tmpLogFile,out); 
       // socket.close();
        
    }     
    
}

socket服务器处理

import java.io.BufferedReader;  
import java.io.IOException;  
import java.io.InputStreamReader;  
import java.io.PrintWriter;  
import java.net.ServerSocket;  
import java.net.Socket;  
import java.net.SocketAddress;
import java.util.*;  
public class MyServerMulti {  
    private static Socket socket1;  public static void main(String[] args) throws IOException {  
        ServerSocket server = new ServerSocket(5678);  
          int i=0;
          ArrayList<PrintWriter> outs=new ArrayList<PrintWriter>();          
          /*
           * 一个client socket发送数据过来, server端再发到其他client socket端
           * 
           */
          Socket socket1=null;        while (true) {
        	
            Socket socket = server.accept();  
             i++;
             System.out.println(i);
             System.out.println(socket.getInetAddress());
            	 PrintWriter out= new PrintWriter(socket.getOutputStream());
            	 outs.add(out);            	 if(i==1)
            		  socket1=socket;            	 if(i==2)
            		 
                 invoke(socket1,outs);
            	 
            
        }  
    }  
      
    private static void invoke(final Socket client, final ArrayList<PrintWriter> outs) throws IOException {  
        new Thread(new Runnable() {  
            public void run() {  
                BufferedReader in = null;  
                PrintWriter out = null;  
                PrintWriter out1 = null;                try {  
                    in = new BufferedReader(new InputStreamReader(client.getInputStream()));  
                    out = new PrintWriter(client.getOutputStream());  
  
                    while (true) {  
                        String msg = in.readLine();  
                        System.out.println(msg);  
                        out.println("Server received " + msg);  
                        out.flush();  
                        
                        /*数据转发送到多个client*/
                        for(int i=0;i<outs.size();i++)
                        {
                        	out1=outs.get(i);
                        	System.out.println(i);
                        	System.out.println("send msg:"+msg);
                        	 out1.println(msg);
                        	out1.flush();
                        }
                        
                        System.out.println(client.getInetAddress());                        if (msg.equals("bye")) {  
                            break;  
                        }  
                    }  
                } catch(IOException ex) {  
                    ex.printStackTrace();  
                } finally {  
                    try {  
                        in.close();  
                    } catch (Exception e) {}  
                    try {  
                        out.close();  
                    } catch (Exception e) {}  
                    try {  
                        client.close();  
                    } catch (Exception e) {}  
                }  
            }  
        }).start();  
    }  
}

storm topology

import java.io.BufferedReader;import java.io.BufferedWriter;import java.io.File;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.FileReader;import java.io.FileWriter;import java.io.IOException;import java.io.InputStreamReader;import java.io.OutputStreamWriter;import java.io.PrintWriter;import java.io.RandomAccessFile;import java.net.Socket;import java.net.UnknownHostException;import java.util.Map; 
//import mytest.ThroughputTest.GenSpout;
 import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.TopologyBuilder;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;/*
 * 
 *
 *  storm jar stormtest.jar socket.SocketProcess /home/hadoop/out_socket.txt true
 * 
 */
 public class SocketProcess {
         public static class  SocketSpout extends BaseRichSpout {
 
                   /**
                    */
        	  static Socket sock=null;        	  static BufferedReader in=null;
        	  String str=null;                   private static final long serialVersionUID = 1L;                   private SpoutOutputCollector _collector;                   private BufferedReader br;                   private String dataFile;                   private BufferedWriter bw2;
                    RandomAccessFile randomFile;                    private long lastTimeFileSize = 0; 
                    int cnt=0;                   //定义spout文件
                    SocketSpout(){
                     
                   } 
                   //定义如何读取spout文件
                   @Override
                   public void open(Map conf, TopologyContext context,
                                     SpoutOutputCollector collector) {                            // TODO Auto-generated method stub
                            _collector = collector;                            try {                sock=new Socket("192.168.27.100",5678);                 in=   
                  new BufferedReader(new InputStreamReader(sock.getInputStream()));   
              } catch (UnknownHostException e) {                // TODO Auto-generated catch block                e.printStackTrace();              } catch (IOException e) {                // TODO Auto-generated catch block                e.printStackTrace();              }
                       
                   } 
                   //获取下一个tuple的方法
                   @Override
                   public void nextTuple() {                            // TODO Auto-generated method stub
                	   if(sock==null){                		     try {                sock=new Socket("192.168.27.100",5678);                 in=   
                    new BufferedReader(new InputStreamReader(sock.getInputStream()));  
              } catch (UnknownHostException e) {                // TODO Auto-generated catch block                e.printStackTrace();              } catch (IOException e) {                // TODO Auto-generated catch block                e.printStackTrace();              } 
                	   }                	   
                	   
                	   while(true){    
                		  
            try {              str = in.readLine();            } catch (IOException e) {              // TODO Auto-generated catch block              e.printStackTrace();            }
                		System.out.println(str);  
                		_collector.emit(new Values(str));                		if(str.equals("end")){    
                			break;    
                			} 
                		}
                	   
                	   
                	   
                	   
                	   
                	   
                	   
                            
                            
                   } 
 
                   @Override
                   public void declareOutputFields(OutputFieldsDeclarer declarer) {                            // TODO Auto-generated method stub
                            declarer.declare(new Fields("line"));
                   }
                  
         }        
 
         public static class Process extends BaseRichBolt{
 
                   private String _seperator;                   private String _outFile;
                   PrintWriter pw;                   private OutputCollector _collector;                   private BufferedWriter bw;                  
                   public Process(String outFile) {                           
                            this._outFile   = outFile;
                           
                   }                  
                   //把输出结果保存到外部文件里面。
                   @Override
                   public void prepare(Map stormConf, TopologyContext context,
                                     OutputCollector collector) {                            // TODO Auto-generated method stub
                            this._collector = collector;
                            File out = new File(_outFile);                            try {//                                  br = new BufferedWriter(new FileWriter(out));
                                     bw = new BufferedWriter(new OutputStreamWriter( 
                             new FileOutputStream(out, true))); 
                            } catch (IOException e1) {                                     // TODO Auto-generated catch block
                                     e1.printStackTrace();
                            }                
                   }                  
                   //blot计算单元,把tuple中的数据添加一个bkeep和回车。然后保存到outfile指定的文件中。
                   @Override
                   public void execute(Tuple input) {                            // TODO Auto-generated method stub
                            String line = input.getString(0);//                         System.out.println(line);
                       //     String[] str = line.split(_seperator);
                         //   System.out.println(str[2]);
                            try {
                                     bw.write(line+",bkeep"+"\n");
                                     bw.flush();
                            } catch (IOException e) {                                     // TODO Auto-generated catch block
                                     e.printStackTrace();
                            }
                           
                            _collector.emit(new Values(line));
                   } 
                   @Override
                   public void declareOutputFields(OutputFieldsDeclarer declarer) {                            // TODO Auto-generated method stub
                            declarer.declare(new Fields("line"));
                   }
                  
         }        
         public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{
                
                   String outFile   = argv[0]; //输出文件
                   boolean distribute = Boolean.valueOf(argv[1]);       //本地模式还是集群模式
                   TopologyBuilder builder = new TopologyBuilder();  //build一个topology
        builder.setSpout("spout", new  SocketSpout(), 1);   //指定spout
        builder.setBolt("bolt", new Process(outFile),1).shuffleGrouping("spout");  //指定bolt,包括bolt、process和grouping
        Config conf = new Config();        if(distribute){
            StormSubmitter.submitTopology("SocketProcess", conf, builder.createTopology());
        }else{
                 LocalCluster cluster = new LocalCluster();
                 cluster.submitTopology("SocketProcess", conf, builder.createTopology());
        }
         }       
}

最后执行 

storm jar stormtest.jar socket.SocketProcess /home/hadoop/out_socket.txt true

spout接受从socket服务器实时发送过来的数据,经过topology处理,最终将数据写入out_socket.txt文件













本文转自ljianbing51CTO博客,原文链接:http://blog.51cto.com/ljianbing/1733313 ,如需转载请自行联系原作者



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8月前
|
分布式计算 Hadoop 大数据
一口气说完MR、Storm、Spark、SparkStreaming和Flink
一口气说完MR、Storm、Spark、SparkStreaming和Flink
|
9天前
|
运维 监控 Java
面经:Storm实时计算框架原理与应用场景
【4月更文挑战第11天】本文是关于Apache Storm实时流处理框架的面试攻略和核心原理解析。文章分享了面试常见主题,包括Storm的架构与核心概念(如Spout、Bolt、Topology、Tuple和Ack机制),编程模型与API,部署与运维,以及应用场景与最佳实践。通过代码示例展示了如何构建一个简单的WordCountTopology,强调理解和运用Storm的关键知识点对于面试和实际工作的重要性。
26 4
面经:Storm实时计算框架原理与应用场景
|
Linux
socket简单文件传输实例
今天在linux系统上写了一个简单的socket文件传输的例子,共享出来仅供参考。例子很简单,只要初学者了解简单的几个socket相关函数和文件函数就能写的出来。
105 0
socket简单文件传输实例
|
PHP
PHP实现Workerman实例 高性能PHP Socket即时通讯框架
PHP实现Workerman实例 高性能PHP Socket即时通讯框架
358 0
|
SQL 机器学习/深度学习 分布式计算
Flink、Spark、Storm技术对比列表
Flink、Spark、Storm技术对比列表
514 0
|
消息中间件 存储 分布式计算
Flink,Storm,SparkStreaming性能对比
Flink,Storm,SparkStreaming性能对比
516 0
Flink,Storm,SparkStreaming性能对比
|
存储 消息中间件 传感器
超越Storm,SparkStreaming——Flink如何实现有状态的计算
超越Storm,SparkStreaming——Flink如何实现有状态的计算
168 0
超越Storm,SparkStreaming——Flink如何实现有状态的计算
学习socket nio 之 mina实例(2)
学习socket nio 之 mina实例(2)
116 0
|
Java Apache
学习socket nio 之 mina实例(1)
学习socket nio 之 mina实例(1)
114 0
学习socket nio 之 mina实例(1)
|
Python
Python 套接字-判断socket服务端有没有关闭的方法实例演示,查看socket运行状态
Python 套接字-判断socket服务端有没有关闭的方法实例演示,查看socket运行状态
507 0
Python 套接字-判断socket服务端有没有关闭的方法实例演示,查看socket运行状态