【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)

需要源码请点赞关注收藏后评论区留言私信~~~

系统简介

新闻话题实时统计分析系统以搜狗实验室的用户查询日志为基础,模拟生成用户查询日志,通过Flume将日志进行实时采集、汇集,分析并进行存储。利用Spark Streaming实时统计分析前20名流量最高的新闻话题,并在前端页面实时显示结果。

系统总体架构

1:利用搜狗实验室的用户查询日志模拟日志生成程序生成用户查询日志,供Flume采集

2:日志采集端Flume采集数据发送给Flume日志汇聚节点,并进行预处理

3:Flume将预处理的数据进行数据存储,存储到HBase数据库中,并发送消息给Kafka的Topic

4:Spark Streaming接收Kafka的Topic实时消息并计算实时话题的数量,并将计算结构保存到mysql数据库中

5:前端页面通过建立WebSocket通道读取Mysql数据库中的数据,实时展示话题的动态变化

表结构设计

(1)MySQL的表结构设计

webCount(新闻话题数量表)

(2)HBase表结构设计

weblogs(日志表)

系统实现

模拟日志生成程序

(1)在IntelliJ IDEA构建Java项目weblogs。编写数据生成模拟类,主要功能是读取搜狗用户日志文件,并构建新的格式写入一个新文件供Flume采集

(2)生成 JAR包,并将JAR包上传到生成日志服务器

(3)编写weblog.sh,调用模拟日志生成JAR包,并将weblog.sh上传到生成日志服务器

代码如下

#/bin/bashecho "start log......"java -jar /opt/jars/weblog.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log

Flume配置

(1)配置日志采集端的Flume服务

(2)配置日志汇聚端的Flume服务

(3)自定义SinkHBase程序设计与开发

(4)修改SimpleRowKeyGenerator类,根据具体业务自定义Rowkey生成方法

(5)生成JAR包

(6) JAR上传,将打包名字替换为Flume默认包名flume-ng-hbase-sink-1.7.0.jar ,然后上传至日志汇聚服务器上的flume/lib目录下,覆盖原有的JAR包

Spark Streaming开发

(1)新建一个MAVEN工程,添加依赖包

(2)编写Scala类StructuredStreamingKafka ,实现从Kafka中读取数据存储到关系型数据库MySQL

Websocket和前端界面开发

(1)新建pom.xml文件,内容如下

(2)编写Java类WeblogService,实现功能为连接MySQL数据库

(3)编写Java类WeblogSocket,实现功能建立WebSocket通讯,取统计数据供前端调用。

(4)建立大屏显示页面index.html,实时进行大屏显示

效果展示

在日志采集端运行模拟日志生成程序

发布Web应用 访问大屏显示页面如下

代码

部分代码如下

package main.java;
import java.io.*;
public class ReadWrite {
      static String readFileName;
      static String writeFileName;
      public static void main(String args[]){
           readFileName = args[0];
           writeFileName = args[1];
          try {
             // readInput();
            readFileByLines(readFileName);
          }catch(Exception e){
          }
      }
    public static void readFileByLines(String fileName) {
        FileInputStream fis = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        String tempString = null;
        try {
            System.out.println("以行为单位读取文件内容,一次读一整行:");
            fis = new FileInputStream(fileName);// FileInputStream
            // 从文件系统中的某个文件中获取字节
            isr = new InputStreamReader(fis,"GBK");
            br = new BufferedReader(isr);
            int count=0;
            while ((tempString = br.readLine()) != null) {
                count++;
                // 显示行号
                Thread.sleep(300);
                String str = new String(tempString.getBytes("UTF8"),"GBK");
                System.out.println("row:"+count+">>>>>>>>"+tempString);
                method1(writeFileName,tempString);
                //appendMethodA(writeFileName,tempString);
            }
            isr.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (isr != null) {
                try {
                    isr.close();
                } catch (IOException e1) {
                }
            }
        }
    }
//    public static void appendMethodA(String fileName, String content) {
//        try {
//            // 打开一个随机访问文件流,按读写方式
//            //logger.info("file line >>>>>>>>>>>>>>>>>>>>>>>>>:"+content);
//            RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");
//
//            // 文件长度,字节数
//            long fileLength = randomFile.length();
//            //将写文件指针移到文件尾。
//            randomFile.seek(fileLength);
//            //randomFile.writeUTF(content);
//            randomFile.writeUTF(content);
//            randomFile.writeUTF("\n");
//           // randomFile.wri;
//
//            randomFile.close();
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
//    }
    public static void method1(String file, String conent) {
        BufferedWriter out = null;
        try {
            out = new BufferedWriter(new OutputStreamWriter(
                    new FileOutputStream(file, true)));
            out.write("\n");
            out.write(conent);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Hbase数据库类代码

package org.apache.flume.sink.hbase;
import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import java.util.ArrayList;
import java.util.List;
public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
    private byte[] table;
    private byte[] cf;
    private byte[] payload;
    private byte[] payloadColumn;
    private byte[] incrementColumn;
    private String rowPrefix;
    private byte[] incrementRow;
    private KeyType keyType;
    @Override
    public void initialize(byte[] table, byte[] cf) {
        this.table = table;
        this.cf = cf;
    }
    @Override
    public List<PutRequest> getActions() {
        List<PutRequest> actions = new ArrayList<PutRequest>();
        if (payloadColumn != null) {
            byte[] rowKey;
            try {
                String[] columns = new String(this.payloadColumn).split(",");
                String[] values = new String(this.payload).split(",");
                for (int i=0; i < columns.length; i++) {
                    byte[] colColumn = columns[i].getBytes();
                    byte[] colValue = values[i].getBytes(Charsets.UTF_8);
                    String datetime = values[0].toString();
                    String userid =  values[1].toString();
                    rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid, datetime);
                    PutRequest putRequest =  new PutRequest(table, rowKey, cf,
                            colColumn, colValue);
                    actions.add(putRequest);
                }
            } catch (Exception e) {
                throw new FlumeException("Could not get row key!", e);
            }
        }
        return actions;
    }
    public List<AtomicIncrementRequest> getIncrements() {
        List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
        if (incrementColumn != null) {
            AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
                    incrementRow, cf, incrementColumn);
            actions.add(inc);
        }
        return actions;
    }
    @Override
    public void cleanUp() {
        // TODO Auto-generated method stub
    }
    @Override
    public void configure(Context context) {
        String pCol = context.getString("payloadColumn", "pCol");
        String iCol = context.getString("incrementColumn", "iCol");
        rowPrefix = context.getString("rowPrefix", "default");
        String suffix = context.getString("suffix", "uuid");
        if (pCol != null && !pCol.isEmpty()) {
            if (suffix.equals("timestamp")) {
                keyType = KeyType.TS;
            } else if (suffix.equals("random")) {
                keyType = KeyType.RANDOM;
            } else if (suffix.equals("nano")) {
                keyType = KeyType.TSNANO;
            } else {
                keyType = KeyType.UUID;
            }
            payloadColumn = pCol.getBytes(Charsets.UTF_8);
        }
        if (iCol != null && !iCol.isEmpty()) {
            incrementColumn = iCol.getBytes(Charsets.UTF_8);
        }
        incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
    }
    @Override
    public void setEvent(Event event) {
        this.payload = event.getBody();
    }
    @Override
    public void configure(ComponentConfiguration conf) {
        // TODO Auto-generated method stub
    }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关文章
|
2月前
|
消息中间件 监控 网络协议
Flume系统
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输系统,起源于Cloudera。【2月更文挑战第8天】
25 4
|
2月前
|
消息中间件 关系型数据库 MySQL
Flink问题子实现Kafka到Mysql如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
379 2
|
4月前
|
消息中间件 关系型数据库 MySQL
Flink最后一站___Flink数据写入Kafka+从Kafka存入Mysql
Flink最后一站___Flink数据写入Kafka+从Kafka存入Mysql
32 0
|
1月前
|
分布式计算 监控 Java
Spark学习---day06、Spark内核(源码提交流程、任务执行)
Spark学习---day06、Spark内核(源码提交流程、任务执行)
41 2
|
3月前
|
消息中间件 分布式计算 Kafka
Spark与Kafka的集成与流数据处理
Spark与Kafka的集成与流数据处理
|
3月前
|
分布式计算 数据处理 Apache
Spark Streaming与数据源连接:Kinesis、Flume等
Spark Streaming与数据源连接:Kinesis、Flume等
|
3月前
|
消息中间件 分布式计算 Kafka
使用Kafka与Spark Streaming进行流数据集成
使用Kafka与Spark Streaming进行流数据集成
|
3月前
|
SQL 分布式计算 数据处理
Spark的生态系统概览:Spark SQL、Spark Streaming
Spark的生态系统概览:Spark SQL、Spark Streaming
|
3月前
|
NoSQL Java 关系型数据库
使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致
使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致
43 0
|
4月前
|
消息中间件 关系型数据库 MySQL
在kafka connect 同步 mysql 主从数据库
在kafka connect 同步 mysql 主从数据库
45 0