【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)

需要源码请点赞关注收藏后评论区留言私信~~~

系统简介

新闻话题实时统计分析系统以搜狗实验室的用户查询日志为基础,模拟生成用户查询日志,通过Flume将日志进行实时采集、汇集,分析并进行存储。利用Spark Streaming实时统计分析前20名流量最高的新闻话题,并在前端页面实时显示结果。

系统总体架构

1:利用搜狗实验室的用户查询日志模拟日志生成程序生成用户查询日志,供Flume采集

2:日志采集端Flume采集数据发送给Flume日志汇聚节点,并进行预处理

3:Flume将预处理的数据进行数据存储,存储到HBase数据库中,并发送消息给Kafka的Topic

4:Spark Streaming接收Kafka的Topic实时消息并计算实时话题的数量,并将计算结构保存到mysql数据库中

5:前端页面通过建立WebSocket通道读取Mysql数据库中的数据,实时展示话题的动态变化

表结构设计

(1)MySQL的表结构设计

webCount(新闻话题数量表)

(2)HBase表结构设计

weblogs(日志表)

系统实现

模拟日志生成程序

(1)在IntelliJ IDEA构建Java项目weblogs。编写数据生成模拟类,主要功能是读取搜狗用户日志文件,并构建新的格式写入一个新文件供Flume采集

(2)生成 JAR包,并将JAR包上传到生成日志服务器

(3)编写weblog.sh,调用模拟日志生成JAR包,并将weblog.sh上传到生成日志服务器

代码如下

#/bin/bashecho "start log......"java -jar /opt/jars/weblog.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log

Flume配置

(1)配置日志采集端的Flume服务

(2)配置日志汇聚端的Flume服务

(3)自定义SinkHBase程序设计与开发

(4)修改SimpleRowKeyGenerator类,根据具体业务自定义Rowkey生成方法

(5)生成JAR包

(6) JAR上传,将打包名字替换为Flume默认包名flume-ng-hbase-sink-1.7.0.jar ,然后上传至日志汇聚服务器上的flume/lib目录下,覆盖原有的JAR包

Spark Streaming开发

(1)新建一个MAVEN工程,添加依赖包

(2)编写Scala类StructuredStreamingKafka ,实现从Kafka中读取数据存储到关系型数据库MySQL

Websocket和前端界面开发

(1)新建pom.xml文件,内容如下

(2)编写Java类WeblogService,实现功能为连接MySQL数据库

(3)编写Java类WeblogSocket,实现功能建立WebSocket通讯,取统计数据供前端调用。

(4)建立大屏显示页面index.html,实时进行大屏显示

效果展示

在日志采集端运行模拟日志生成程序

发布Web应用 访问大屏显示页面如下

代码

部分代码如下

package main.java;
import java.io.*;
public class ReadWrite {
      static String readFileName;
      static String writeFileName;
      public static void main(String args[]){
           readFileName = args[0];
           writeFileName = args[1];
          try {
             // readInput();
            readFileByLines(readFileName);
          }catch(Exception e){
          }
      }
    public static void readFileByLines(String fileName) {
        FileInputStream fis = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        String tempString = null;
        try {
            System.out.println("以行为单位读取文件内容,一次读一整行:");
            fis = new FileInputStream(fileName);// FileInputStream
            // 从文件系统中的某个文件中获取字节
            isr = new InputStreamReader(fis,"GBK");
            br = new BufferedReader(isr);
            int count=0;
            while ((tempString = br.readLine()) != null) {
                count++;
                // 显示行号
                Thread.sleep(300);
                String str = new String(tempString.getBytes("UTF8"),"GBK");
                System.out.println("row:"+count+">>>>>>>>"+tempString);
                method1(writeFileName,tempString);
                //appendMethodA(writeFileName,tempString);
            }
            isr.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (isr != null) {
                try {
                    isr.close();
                } catch (IOException e1) {
                }
            }
        }
    }
//    public static void appendMethodA(String fileName, String content) {
//        try {
//            // 打开一个随机访问文件流,按读写方式
//            //logger.info("file line >>>>>>>>>>>>>>>>>>>>>>>>>:"+content);
//            RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");
//
//            // 文件长度,字节数
//            long fileLength = randomFile.length();
//            //将写文件指针移到文件尾。
//            randomFile.seek(fileLength);
//            //randomFile.writeUTF(content);
//            randomFile.writeUTF(content);
//            randomFile.writeUTF("\n");
//           // randomFile.wri;
//
//            randomFile.close();
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
//    }
    public static void method1(String file, String conent) {
        BufferedWriter out = null;
        try {
            out = new BufferedWriter(new OutputStreamWriter(
                    new FileOutputStream(file, true)));
            out.write("\n");
            out.write(conent);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Hbase数据库类代码

package org.apache.flume.sink.hbase;
import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import java.util.ArrayList;
import java.util.List;
public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
    private byte[] table;
    private byte[] cf;
    private byte[] payload;
    private byte[] payloadColumn;
    private byte[] incrementColumn;
    private String rowPrefix;
    private byte[] incrementRow;
    private KeyType keyType;
    @Override
    public void initialize(byte[] table, byte[] cf) {
        this.table = table;
        this.cf = cf;
    }
    @Override
    public List<PutRequest> getActions() {
        List<PutRequest> actions = new ArrayList<PutRequest>();
        if (payloadColumn != null) {
            byte[] rowKey;
            try {
                String[] columns = new String(this.payloadColumn).split(",");
                String[] values = new String(this.payload).split(",");
                for (int i=0; i < columns.length; i++) {
                    byte[] colColumn = columns[i].getBytes();
                    byte[] colValue = values[i].getBytes(Charsets.UTF_8);
                    String datetime = values[0].toString();
                    String userid =  values[1].toString();
                    rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid, datetime);
                    PutRequest putRequest =  new PutRequest(table, rowKey, cf,
                            colColumn, colValue);
                    actions.add(putRequest);
                }
            } catch (Exception e) {
                throw new FlumeException("Could not get row key!", e);
            }
        }
        return actions;
    }
    public List<AtomicIncrementRequest> getIncrements() {
        List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
        if (incrementColumn != null) {
            AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
                    incrementRow, cf, incrementColumn);
            actions.add(inc);
        }
        return actions;
    }
    @Override
    public void cleanUp() {
        // TODO Auto-generated method stub
    }
    @Override
    public void configure(Context context) {
        String pCol = context.getString("payloadColumn", "pCol");
        String iCol = context.getString("incrementColumn", "iCol");
        rowPrefix = context.getString("rowPrefix", "default");
        String suffix = context.getString("suffix", "uuid");
        if (pCol != null && !pCol.isEmpty()) {
            if (suffix.equals("timestamp")) {
                keyType = KeyType.TS;
            } else if (suffix.equals("random")) {
                keyType = KeyType.RANDOM;
            } else if (suffix.equals("nano")) {
                keyType = KeyType.TSNANO;
            } else {
                keyType = KeyType.UUID;
            }
            payloadColumn = pCol.getBytes(Charsets.UTF_8);
        }
        if (iCol != null && !iCol.isEmpty()) {
            incrementColumn = iCol.getBytes(Charsets.UTF_8);
        }
        incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
    }
    @Override
    public void setEvent(Event event) {
        this.payload = event.getBody();
    }
    @Override
    public void configure(ComponentConfiguration conf) {
        // TODO Auto-generated method stub
    }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关文章
|
1月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
96 7
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
56 3
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
201 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
48 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
103 0
|
20天前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
4月前
|
存储 数据采集 数据处理
【Flume拓扑揭秘】掌握Flume的四大常用结构,构建强大的日志收集系统!
【8月更文挑战第24天】Apache Flume是一个强大的工具,专为大规模日志数据的收集、聚合及传输设计。其核心架构包括源(Source)、通道(Channel)与接收器(Sink)。Flume支持多样化的拓扑结构以适应不同需求,包括单层、扇入(Fan-in)、扇出(Fan-out)及复杂多层拓扑。单层拓扑简单直观,适用于单一数据流场景;扇入结构集中处理多源头数据;扇出结构则实现数据多目的地分发;复杂多层拓扑提供高度灵活性,适合多层次数据处理。通过灵活配置,Flume能够高效构建各种规模的数据收集系统。
97 0
|
1月前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
74 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
1月前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,并与使用 RPM 包安装进行了对比
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,并与使用 RPM 包安装进行了对比。通过具体案例,读者可以了解如何准备环境、下载源码、编译安装、配置服务及登录 MySQL。编译源码安装虽然复杂,但提供了更高的定制性和灵活性,适用于需要高度定制的场景。
116 3
|
1月前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。同时,文章还对比了编译源码安装与使用 RPM 包安装的优缺点,帮助读者根据需求选择最合适的方法。通过具体案例,展示了编译源码安装的灵活性和定制性。
166 2