【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)

需要源码请点赞关注收藏后评论区留言私信~~~

系统简介

新闻话题实时统计分析系统以搜狗实验室的用户查询日志为基础,模拟生成用户查询日志,通过Flume将日志进行实时采集、汇集,分析并进行存储。利用Spark Streaming实时统计分析前20名流量最高的新闻话题,并在前端页面实时显示结果。

系统总体架构

1:利用搜狗实验室的用户查询日志模拟日志生成程序生成用户查询日志,供Flume采集

2:日志采集端Flume采集数据发送给Flume日志汇聚节点,并进行预处理

3:Flume将预处理的数据进行数据存储,存储到HBase数据库中,并发送消息给Kafka的Topic

4:Spark Streaming接收Kafka的Topic实时消息并计算实时话题的数量,并将计算结构保存到mysql数据库中

5:前端页面通过建立WebSocket通道读取Mysql数据库中的数据,实时展示话题的动态变化

表结构设计

(1)MySQL的表结构设计

webCount(新闻话题数量表)

(2)HBase表结构设计

weblogs(日志表)

系统实现

模拟日志生成程序

(1)在IntelliJ IDEA构建Java项目weblogs。编写数据生成模拟类,主要功能是读取搜狗用户日志文件,并构建新的格式写入一个新文件供Flume采集

(2)生成 JAR包,并将JAR包上传到生成日志服务器

(3)编写weblog.sh,调用模拟日志生成JAR包,并将weblog.sh上传到生成日志服务器

代码如下

#/bin/bashecho "start log......"java -jar /opt/jars/weblog.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log

Flume配置

(1)配置日志采集端的Flume服务

(2)配置日志汇聚端的Flume服务

(3)自定义SinkHBase程序设计与开发

(4)修改SimpleRowKeyGenerator类,根据具体业务自定义Rowkey生成方法

(5)生成JAR包

(6) JAR上传,将打包名字替换为Flume默认包名flume-ng-hbase-sink-1.7.0.jar ,然后上传至日志汇聚服务器上的flume/lib目录下,覆盖原有的JAR包

Spark Streaming开发

(1)新建一个MAVEN工程,添加依赖包

(2)编写Scala类StructuredStreamingKafka ,实现从Kafka中读取数据存储到关系型数据库MySQL

Websocket和前端界面开发

(1)新建pom.xml文件,内容如下

(2)编写Java类WeblogService,实现功能为连接MySQL数据库

(3)编写Java类WeblogSocket,实现功能建立WebSocket通讯,取统计数据供前端调用。

(4)建立大屏显示页面index.html,实时进行大屏显示

效果展示

在日志采集端运行模拟日志生成程序

发布Web应用 访问大屏显示页面如下

代码

部分代码如下

package main.java;
import java.io.*;
public class ReadWrite {
      static String readFileName;
      static String writeFileName;
      public static void main(String args[]){
           readFileName = args[0];
           writeFileName = args[1];
          try {
             // readInput();
            readFileByLines(readFileName);
          }catch(Exception e){
          }
      }
    public static void readFileByLines(String fileName) {
        FileInputStream fis = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        String tempString = null;
        try {
            System.out.println("以行为单位读取文件内容,一次读一整行:");
            fis = new FileInputStream(fileName);// FileInputStream
            // 从文件系统中的某个文件中获取字节
            isr = new InputStreamReader(fis,"GBK");
            br = new BufferedReader(isr);
            int count=0;
            while ((tempString = br.readLine()) != null) {
                count++;
                // 显示行号
                Thread.sleep(300);
                String str = new String(tempString.getBytes("UTF8"),"GBK");
                System.out.println("row:"+count+">>>>>>>>"+tempString);
                method1(writeFileName,tempString);
                //appendMethodA(writeFileName,tempString);
            }
            isr.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (isr != null) {
                try {
                    isr.close();
                } catch (IOException e1) {
                }
            }
        }
    }
//    public static void appendMethodA(String fileName, String content) {
//        try {
//            // 打开一个随机访问文件流,按读写方式
//            //logger.info("file line >>>>>>>>>>>>>>>>>>>>>>>>>:"+content);
//            RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");
//
//            // 文件长度,字节数
//            long fileLength = randomFile.length();
//            //将写文件指针移到文件尾。
//            randomFile.seek(fileLength);
//            //randomFile.writeUTF(content);
//            randomFile.writeUTF(content);
//            randomFile.writeUTF("\n");
//           // randomFile.wri;
//
//            randomFile.close();
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
//    }
    public static void method1(String file, String conent) {
        BufferedWriter out = null;
        try {
            out = new BufferedWriter(new OutputStreamWriter(
                    new FileOutputStream(file, true)));
            out.write("\n");
            out.write(conent);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Hbase数据库类代码

package org.apache.flume.sink.hbase;
import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import java.util.ArrayList;
import java.util.List;
public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
    private byte[] table;
    private byte[] cf;
    private byte[] payload;
    private byte[] payloadColumn;
    private byte[] incrementColumn;
    private String rowPrefix;
    private byte[] incrementRow;
    private KeyType keyType;
    @Override
    public void initialize(byte[] table, byte[] cf) {
        this.table = table;
        this.cf = cf;
    }
    @Override
    public List<PutRequest> getActions() {
        List<PutRequest> actions = new ArrayList<PutRequest>();
        if (payloadColumn != null) {
            byte[] rowKey;
            try {
                String[] columns = new String(this.payloadColumn).split(",");
                String[] values = new String(this.payload).split(",");
                for (int i=0; i < columns.length; i++) {
                    byte[] colColumn = columns[i].getBytes();
                    byte[] colValue = values[i].getBytes(Charsets.UTF_8);
                    String datetime = values[0].toString();
                    String userid =  values[1].toString();
                    rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid, datetime);
                    PutRequest putRequest =  new PutRequest(table, rowKey, cf,
                            colColumn, colValue);
                    actions.add(putRequest);
                }
            } catch (Exception e) {
                throw new FlumeException("Could not get row key!", e);
            }
        }
        return actions;
    }
    public List<AtomicIncrementRequest> getIncrements() {
        List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
        if (incrementColumn != null) {
            AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
                    incrementRow, cf, incrementColumn);
            actions.add(inc);
        }
        return actions;
    }
    @Override
    public void cleanUp() {
        // TODO Auto-generated method stub
    }
    @Override
    public void configure(Context context) {
        String pCol = context.getString("payloadColumn", "pCol");
        String iCol = context.getString("incrementColumn", "iCol");
        rowPrefix = context.getString("rowPrefix", "default");
        String suffix = context.getString("suffix", "uuid");
        if (pCol != null && !pCol.isEmpty()) {
            if (suffix.equals("timestamp")) {
                keyType = KeyType.TS;
            } else if (suffix.equals("random")) {
                keyType = KeyType.RANDOM;
            } else if (suffix.equals("nano")) {
                keyType = KeyType.TSNANO;
            } else {
                keyType = KeyType.UUID;
            }
            payloadColumn = pCol.getBytes(Charsets.UTF_8);
        }
        if (iCol != null && !iCol.isEmpty()) {
            incrementColumn = iCol.getBytes(Charsets.UTF_8);
        }
        incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
    }
    @Override
    public void setEvent(Event event) {
        this.payload = event.getBody();
    }
    @Override
    public void configure(ComponentConfiguration conf) {
        // TODO Auto-generated method stub
    }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关文章
|
1月前
|
关系型数据库 MySQL 数据库
【Mac os系统】安装MySQL数据库
本文详细介绍了在Mac OS系统上安装MySQL数据库的步骤,包括下载、安装、配置环境变量、启动服务、授权设置以及解决常见问题,并提供了一些常用的MySQL命令。
57 0
【Mac os系统】安装MySQL数据库
|
24天前
|
存储 数据采集 数据处理
【Flume拓扑揭秘】掌握Flume的四大常用结构,构建强大的日志收集系统!
【8月更文挑战第24天】Apache Flume是一个强大的工具,专为大规模日志数据的收集、聚合及传输设计。其核心架构包括源(Source)、通道(Channel)与接收器(Sink)。Flume支持多样化的拓扑结构以适应不同需求,包括单层、扇入(Fan-in)、扇出(Fan-out)及复杂多层拓扑。单层拓扑简单直观,适用于单一数据流场景;扇入结构集中处理多源头数据;扇出结构则实现数据多目的地分发;复杂多层拓扑提供高度灵活性,适合多层次数据处理。通过灵活配置,Flume能够高效构建各种规模的数据收集系统。
28 0
|
3月前
|
存储 安全 Java
基于Java+MySQL停车场车位管理系统详细设计和实现(源码+LW+调试文档+讲解等)
基于Java+MySQL停车场车位管理系统详细设计和实现(源码+LW+调试文档+讲解等)
|
26天前
|
关系型数据库 MySQL 应用服务中间件
win7系统搭建PHP+Mysql+Apache环境+部署ecshop项目
这篇文章介绍了如何在Windows 7系统上搭建PHP、MySQL和Apache环境,并部署ECShop项目,包括安装配置步骤、解决常见问题以及使用XAMPP集成环境的替代方案。
36 1
win7系统搭建PHP+Mysql+Apache环境+部署ecshop项目
|
27天前
|
数据可视化 关系型数据库 MySQL
Mysql8 如何在 Window11系统下完成跳过密钥校验、完成数据库密码的修改?
这篇文章介绍了如何在Windows 11系统下跳过MySQL 8的密钥校验,并通过命令行修改root用户的密码。
Mysql8 如何在 Window11系统下完成跳过密钥校验、完成数据库密码的修改?
|
1月前
|
存储 关系型数据库 MySQL
基于python django 医院管理系统,多用户功能,包括管理员、用户、医生,数据库MySQL
本文介绍了一个基于Python Django框架开发的医院管理系统,该系统设计了管理员、用户和医生三个角色,具备多用户功能,并使用MySQL数据库进行数据存储和管理。
基于python django 医院管理系统,多用户功能,包括管理员、用户、医生,数据库MySQL
|
23天前
|
关系型数据库 MySQL Linux
【Azure 应用服务】在创建Web App Service的时候,选Linux系统后无法使用Mysql in App
【Azure 应用服务】在创建Web App Service的时候,选Linux系统后无法使用Mysql in App
【Azure 应用服务】在创建Web App Service的时候,选Linux系统后无法使用Mysql in App
|
27天前
|
关系型数据库 MySQL 数据库连接
成功解决「MySQL问题1」启动mysql时:发生系统错误5拒绝访问
这篇文章介绍了如何解决启动MySQL服务时遇到的系统错误5(拒绝访问),通过管理员权限启动命令窗口并使用"net start mysql"和"net stop mysql"命令来控制服务。
|
28天前
|
SQL 数据可视化 关系型数据库
MySQL 备份可视化巡检系统
MySQL 备份可视化巡检系统
|
29天前
|
关系型数据库 MySQL Linux
一文教会你如何在Linux系统中使用Docker安装Mysql 5.7版本 【详细过程+图解】
这篇文章提供了在Linux系统中使用Docker安装Mysql 5.7版本的详细过程和图解,包括安装指定版本、创建实例、启动、使用Navicat连接测试、文件挂载与端口映射、进入容器、配置文件修改以及重新启动容器等步骤。
一文教会你如何在Linux系统中使用Docker安装Mysql 5.7版本 【详细过程+图解】