【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)

需要源码请点赞关注收藏后评论区留言私信~~~

系统简介

新闻话题实时统计分析系统以搜狗实验室的用户查询日志为基础,模拟生成用户查询日志,通过Flume将日志进行实时采集、汇集,分析并进行存储。利用Spark Streaming实时统计分析前20名流量最高的新闻话题,并在前端页面实时显示结果。

系统总体架构

1:利用搜狗实验室的用户查询日志模拟日志生成程序生成用户查询日志,供Flume采集

2:日志采集端Flume采集数据发送给Flume日志汇聚节点,并进行预处理

3:Flume将预处理的数据进行数据存储,存储到HBase数据库中,并发送消息给Kafka的Topic

4:Spark Streaming接收Kafka的Topic实时消息并计算实时话题的数量,并将计算结构保存到mysql数据库中

5:前端页面通过建立WebSocket通道读取Mysql数据库中的数据,实时展示话题的动态变化

表结构设计

(1)MySQL的表结构设计

webCount(新闻话题数量表)

(2)HBase表结构设计

weblogs(日志表)

系统实现

模拟日志生成程序

(1)在IntelliJ IDEA构建Java项目weblogs。编写数据生成模拟类,主要功能是读取搜狗用户日志文件,并构建新的格式写入一个新文件供Flume采集

(2)生成 JAR包,并将JAR包上传到生成日志服务器

(3)编写weblog.sh,调用模拟日志生成JAR包,并将weblog.sh上传到生成日志服务器

代码如下

#/bin/bashecho "start log......"java -jar /opt/jars/weblog.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log

Flume配置

(1)配置日志采集端的Flume服务

(2)配置日志汇聚端的Flume服务

(3)自定义SinkHBase程序设计与开发

(4)修改SimpleRowKeyGenerator类,根据具体业务自定义Rowkey生成方法

(5)生成JAR包

(6) JAR上传,将打包名字替换为Flume默认包名flume-ng-hbase-sink-1.7.0.jar ,然后上传至日志汇聚服务器上的flume/lib目录下,覆盖原有的JAR包

Spark Streaming开发

(1)新建一个MAVEN工程,添加依赖包

(2)编写Scala类StructuredStreamingKafka ,实现从Kafka中读取数据存储到关系型数据库MySQL

Websocket和前端界面开发

(1)新建pom.xml文件,内容如下

(2)编写Java类WeblogService,实现功能为连接MySQL数据库

(3)编写Java类WeblogSocket,实现功能建立WebSocket通讯,取统计数据供前端调用。

(4)建立大屏显示页面index.html,实时进行大屏显示

效果展示

在日志采集端运行模拟日志生成程序

发布Web应用 访问大屏显示页面如下

代码

部分代码如下

package main.java;
import java.io.*;
public class ReadWrite {
      static String readFileName;
      static String writeFileName;
      public static void main(String args[]){
           readFileName = args[0];
           writeFileName = args[1];
          try {
             // readInput();
            readFileByLines(readFileName);
          }catch(Exception e){
          }
      }
    public static void readFileByLines(String fileName) {
        FileInputStream fis = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        String tempString = null;
        try {
            System.out.println("以行为单位读取文件内容,一次读一整行:");
            fis = new FileInputStream(fileName);// FileInputStream
            // 从文件系统中的某个文件中获取字节
            isr = new InputStreamReader(fis,"GBK");
            br = new BufferedReader(isr);
            int count=0;
            while ((tempString = br.readLine()) != null) {
                count++;
                // 显示行号
                Thread.sleep(300);
                String str = new String(tempString.getBytes("UTF8"),"GBK");
                System.out.println("row:"+count+">>>>>>>>"+tempString);
                method1(writeFileName,tempString);
                //appendMethodA(writeFileName,tempString);
            }
            isr.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (isr != null) {
                try {
                    isr.close();
                } catch (IOException e1) {
                }
            }
        }
    }
//    public static void appendMethodA(String fileName, String content) {
//        try {
//            // 打开一个随机访问文件流,按读写方式
//            //logger.info("file line >>>>>>>>>>>>>>>>>>>>>>>>>:"+content);
//            RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");
//
//            // 文件长度,字节数
//            long fileLength = randomFile.length();
//            //将写文件指针移到文件尾。
//            randomFile.seek(fileLength);
//            //randomFile.writeUTF(content);
//            randomFile.writeUTF(content);
//            randomFile.writeUTF("\n");
//           // randomFile.wri;
//
//            randomFile.close();
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
//    }
    public static void method1(String file, String conent) {
        BufferedWriter out = null;
        try {
            out = new BufferedWriter(new OutputStreamWriter(
                    new FileOutputStream(file, true)));
            out.write("\n");
            out.write(conent);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Hbase数据库类代码

package org.apache.flume.sink.hbase;
import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import java.util.ArrayList;
import java.util.List;
public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
    private byte[] table;
    private byte[] cf;
    private byte[] payload;
    private byte[] payloadColumn;
    private byte[] incrementColumn;
    private String rowPrefix;
    private byte[] incrementRow;
    private KeyType keyType;
    @Override
    public void initialize(byte[] table, byte[] cf) {
        this.table = table;
        this.cf = cf;
    }
    @Override
    public List<PutRequest> getActions() {
        List<PutRequest> actions = new ArrayList<PutRequest>();
        if (payloadColumn != null) {
            byte[] rowKey;
            try {
                String[] columns = new String(this.payloadColumn).split(",");
                String[] values = new String(this.payload).split(",");
                for (int i=0; i < columns.length; i++) {
                    byte[] colColumn = columns[i].getBytes();
                    byte[] colValue = values[i].getBytes(Charsets.UTF_8);
                    String datetime = values[0].toString();
                    String userid =  values[1].toString();
                    rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid, datetime);
                    PutRequest putRequest =  new PutRequest(table, rowKey, cf,
                            colColumn, colValue);
                    actions.add(putRequest);
                }
            } catch (Exception e) {
                throw new FlumeException("Could not get row key!", e);
            }
        }
        return actions;
    }
    public List<AtomicIncrementRequest> getIncrements() {
        List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
        if (incrementColumn != null) {
            AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
                    incrementRow, cf, incrementColumn);
            actions.add(inc);
        }
        return actions;
    }
    @Override
    public void cleanUp() {
        // TODO Auto-generated method stub
    }
    @Override
    public void configure(Context context) {
        String pCol = context.getString("payloadColumn", "pCol");
        String iCol = context.getString("incrementColumn", "iCol");
        rowPrefix = context.getString("rowPrefix", "default");
        String suffix = context.getString("suffix", "uuid");
        if (pCol != null && !pCol.isEmpty()) {
            if (suffix.equals("timestamp")) {
                keyType = KeyType.TS;
            } else if (suffix.equals("random")) {
                keyType = KeyType.RANDOM;
            } else if (suffix.equals("nano")) {
                keyType = KeyType.TSNANO;
            } else {
                keyType = KeyType.UUID;
            }
            payloadColumn = pCol.getBytes(Charsets.UTF_8);
        }
        if (iCol != null && !iCol.isEmpty()) {
            incrementColumn = iCol.getBytes(Charsets.UTF_8);
        }
        incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
    }
    @Override
    public void setEvent(Event event) {
        this.payload = event.getBody();
    }
    @Override
    public void configure(ComponentConfiguration conf) {
        // TODO Auto-generated method stub
    }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关文章
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
48 3
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
131 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
82 0
|
14天前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,并与使用 RPM 包安装进行了对比
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,并与使用 RPM 包安装进行了对比。通过具体案例,读者可以了解如何准备环境、下载源码、编译安装、配置服务及登录 MySQL。编译源码安装虽然复杂,但提供了更高的定制性和灵活性,适用于需要高度定制的场景。
49 3
|
17天前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。同时,文章还对比了编译源码安装与使用 RPM 包安装的优缺点,帮助读者根据需求选择最合适的方法。通过具体案例,展示了编译源码安装的灵活性和定制性。
59 2
|
1月前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置服务等,并与使用 RPM 包安装进行了对比,帮助读者根据需求选择合适的方法。编译源码安装虽然复杂,但提供了更高的定制性和灵活性。
222 2
|
1月前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤
【10月更文挑战第7天】本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。同时,文章还对比了编译源码安装与使用 RPM 包安装的优缺点,帮助读者根据自身需求选择合适的方法。
58 3
|
1月前
|
前端开发 Java 数据库连接
表白墙/留言墙 —— 中级SpringBoot项目,MyBatis技术栈MySQL数据库开发,练手项目前后端开发(带完整源码) 全方位全步骤手把手教学
本文是一份全面的表白墙/留言墙项目教程,使用SpringBoot + MyBatis技术栈和MySQL数据库开发,涵盖了项目前后端开发、数据库配置、代码实现和运行的详细步骤。
42 0
表白墙/留言墙 —— 中级SpringBoot项目,MyBatis技术栈MySQL数据库开发,练手项目前后端开发(带完整源码) 全方位全步骤手把手教学
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
55 0