基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案

16:实时计算需求及技术方案

  • 目标了解实时计算需求及技术方案
  • 路径
  • step1:实时计算需求
  • step2:技术方案
  • 实施
  • 实时计算需求
  • 实时统计消息总量
select count(*) from tbname;
  • 实时统计各个地区发送消息的总量
select sender_area,count(*) from tbname group by sender_area;
  • 实时统计各个地区接收消息的总量
select receiver_area,count(*) from tbname group by receiver_area;
  • 实时统计每个用户发送消息的总量
select sender_account,count(*) from tbname group by sender_account;
  • 实时统计每个用户接收消息的总量
select receiver_account,count(*) from tbname group by receiver_account;
  • |
  • 构建实时统计报表
  • 技术方案
  • 实时采集:Flume
  • 实时存储:Kafka
  • 实时计算:Flink
  • 实时结果:MySQL / Redis
  • 实时报表:FineBI / JavaWeb可视化

  • 小结
  • 了解实时计算需求及技术选型

17:Flink的基本介绍

  • 目标了解Flink的功能、特点及应用场景
  • 路径
  • step1:功能
  • step2:特点
  • step3:应用
  • 实施
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
  • 功能:可以基于任何普通的集群平台,对有界的数据流或者无界的数据流实现高性能的有状态的分布式实时计算
  • Flink DataSet:对有界数据进行批处理操作
  • Flink DataStream:对无界数据进行实时处理操作
  • Flink Table:基于DSL实现结构化数据处理
  • Flink SQL:基于SQL实现结构化数据处理
  • Flink Gelly:Flink的图计算库
  • Flink ML:Flink的机器学习库
  • 特点
  • 支持高吞吐、低延迟、高性能的流处理
  • 支持带有事件时间的窗口(Window)操作
  • 支持有状态计算的Exactly-once语义
  • 支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
  • 支持具有Backpressure功能的持续流模型
  • 支持基于轻量级分布式快照(Snapshot)实现的容错
  • 一个运行时同时支持Batch on Streaming处理和Streaming处理
  • Flink在JVM内部实现了自己的内存管理
  • 支持迭代计算
  • 支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
  • 应用:所有实时及离线数据计算场景
  • 数据分析应用:实时数据仓库
  • Batch analytics(批处理分析)
  • Streaming analytics(流处理分析)
  • 事件驱动类应用
  • 欺诈检测(Fraud detection)
  • 异常检测(Anomaly detection)
  • 基于规则的告警(Rule-based alerting)
  • 业务流程监控(Business process monitoring)
  • Web应用程序(社交网络)
  • 数据管道ETL
  • Periodic ETL和Data Pipeline
  • 公司
  • 小结
  • 了解Flink的功能、特点及应用场景

18:代码模块构建

  • 目标实现开发环境的代码模块构建
  • 实施
  • 导入包中代码到IDEA中

  • flink包:应用类包,用于存放实际的应用类
  • MoMoFlinkCount:用于实现对每个需求的统计计算
  • MySQLSink:用于将计算的结果写入MySQL
  • pojo包:实体类包,用于存放所有实体类
  • MoMoCountBean:用于封装统计分析的结果
private Integer id ;        //结果id
private Long moMoTotalCount ;   //总消息数
private String moMoProvince ;       //省份
private String moMoUsername ;       //用户
private Long moMo_MsgCount ;        //消息数
//结果类型:1-总消息数 2-各省份发送消息数 3-各省份接受消息数 4-每个用户发送消息数 5-每个用户接受消息数
private String groupType ;   
  • utils包:工具类包,用于存放所有工具类
  • HttpClientUtils:用于实现将经纬度地址解析为省份的工具类
public static String findByLatAndLng(String lat , String lng)
  • 参数:经度,维度
  • 返回值:省份
  • 小结
  • 实现开发环境的代码模块构建

19:省份解析工具类测试

  • 目标:了解省份解析的实现
  • 路径
  • step1:基本设计
  • step2:注册百度开发者
  • step3:测试省份解析
  • 实施
  • 基本设计
  • 业务场景:根据IP或者经纬度解析得到用户的国家、省份、城市信息
  • 方案一:离线解析库【本地解析,快,不精准】
  • 方案二:在线解析库【远程解析,并发限制,精准】
  • 注册百度开发者
https://api.map.baidu.com/reverse_geocoding/v3/?ak=您的ak&output=json&coordtype=wgs84ll&location=31.225696563611,121.49884033194  //GET请求
  • 注册开放平台,获取AK码:参考《附录三》
  • 测试省份解析
  • 注意:将代码中的AK码更改为自己的
package bigdata.itcast.cn.momo.online.utils;
  import com.alibaba.fastjson.JSONObject;
  import org.apache.http.HttpEntity;
  import org.apache.http.client.methods.CloseableHttpResponse;
  import org.apache.http.client.methods.HttpGet;
  import org.apache.http.impl.client.CloseableHttpClient;
  import org.apache.http.impl.client.HttpClients;
  import org.apache.http.util.EntityUtils;
  import javax.swing.text.html.parser.Entity;
  import java.io.IOException;
  import java.util.Map;
  public class HttpClientUtils {
      //传入经纬度, 返回查询的地区
      public static String findByLatAndLng(String lat , String lng){
          try {
              CloseableHttpClient httpClient = HttpClients.createDefault();
              String url = "http://api.map.baidu.com/reverse_geocoding/v3/?ak=l8hKKRCuX2zrRa93jneDrPmc2UspGatO&output=json&coordtype=wgs84ll&location="+lat+","+lng;
              System.out.println(url);
              //请求解析
              HttpGet httpGet = new HttpGet(url);
              //得到结果
              CloseableHttpResponse response = httpClient.execute(httpGet);
              //获取数据
              HttpEntity httpEntity = response.getEntity();
              //转换成JSON
              String json = EntityUtils.toString(httpEntity);
              //从JSON中返回省份
              Map<String,Object> result = JSONObject.parseObject(json, Map.class);
              if(result.get("status").equals(0)){
                  Map<String,Object> resultMap =  (Map<String,Object>)result.get("result");
                  resultMap = (Map<String, Object>) resultMap.get("addressComponent");
                  String province = (String) resultMap.get("province");
                  return province;
              }
          } catch (IOException e) {
              e.printStackTrace();
          }
          return null;
      }
      public static void main(String[] args) {
          //测试
          String sf = findByLatAndLng("43.921297","124.655376");
          System.out.println(sf);
      }
  }
  • 小结
  • 了解省份解析的实现

20:Flink代码解读

  • 目标了解Flink代码的基本实现
  • 路径
  • step1:消费Kafka
  • step2:实时统计分析
  • step3:实时更新结果到MySQL
  • 实施
  • 消费Kafka
//构建Kafka配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.setProperty("group.id", "momo2");
//构建消费者
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("MOMO_MSG", new SimpleStringSchema(),props);
//Flink加载消费者
DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);
  • 实时统计分析
//todo:3. 进行转换统计操作:
//3.1: 统计总消息量
countTotalMsg(streamSource);
//3.2: 基于经纬度, 统计各省份发送消息量
countProvinceSenderMsg(streamSource);
//3.3: 基于经纬度, 统计各省份接收消息量
countProvinceReceiverMsg(streamSource);
//3.4: 统计各个用户, 发送消息量
countUserNameSenderMsg(streamSource);
//3.5: 统计各个用户, 接收消息量
countUserNameReceiverMsg(streamSource);
//5. 执行flink操作
env.execute("momoFlinkCount");
  • 实时更新结果到MySQL
streamOperator.addSink(new MysqlSink("2"));
if (status.equals("2")){
    String sql = "select * from momo_count where momo_groupType = '2' and momo_province= '"+value.getMoMoProvince()+"' ";
    ResultSet resultSet = stat.executeQuery(sql);
    boolean flag = resultSet.next();
    if(flag) {
        sql = "update momo_count set momo_msgcount= '"+value.getMoMo_MsgCount()+ "' where momo_groupType = '2' and momo_province= '"+value.getMoMoProvince()+"' ";
    }else {
        sql = "insert into momo_count( momo_province,momo_msgcount,momo_groupType) values ('"+value.getMoMoProvince()+"',"+value.getMoMo_MsgCount()+",'2') ";
    }
    stat.executeUpdate(sql);
}
  • 小结
  • 了解Flink代码的基本实现

21:Flink实时计算测试

  • 目标实现Flink实时分析测试
  • 路径
  • step1:MySQL准备
  • step2:运行测试
  • 实施
  • MySQL准备
  • 找到SQL文件

  • 运行SQL文件创建结果数据库、表

  • 运行测试
  • 启动Flink程序:运行MoMoFlinkCount
  • 启动Flume程序
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
  • 启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
100
  • 观察MySQL结果

  • 小结
  • 实现Flink实时分析测试


目录
相关文章
|
7月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
1242 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
消息中间件 存储 传感器
447 0
|
11月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
12月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
331 11
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
695 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
7月前
|
数据采集 缓存 大数据
【赵渝强老师】大数据日志采集引擎Flume
Apache Flume 是一个分布式、可靠的数据采集系统,支持从多种数据源收集日志信息,并传输至指定目的地。其核心架构由Source、Channel、Sink三组件构成,通过Event封装数据,保障高效与可靠传输。
421 1
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
338 2
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
332 1
|
存储 分布式计算 大数据
【Flume的大数据之旅】探索Flume如何成为大数据分析的得力助手,从日志收集到实时处理一网打尽!
【8月更文挑战第24天】Apache Flume是一款高效可靠的数据收集系统,专为Hadoop环境设计。它能在数据产生端与分析/存储端间搭建桥梁,适用于日志收集、数据集成、实时处理及数据备份等多种场景。通过监控不同来源的日志文件并将数据标准化后传输至Hadoop等平台,Flume支持了性能监控、数据分析等多种需求。此外,它还能与Apache Storm或Flink等实时处理框架集成,实现数据的即时分析。下面展示了一个简单的Flume配置示例,说明如何将日志数据导入HDFS进行存储。总之,Flume凭借其灵活性和强大的集成能力,在大数据处理流程中占据了重要地位。
443 3
|
存储 数据采集 数据处理
【Flume拓扑揭秘】掌握Flume的四大常用结构,构建强大的日志收集系统!
【8月更文挑战第24天】Apache Flume是一个强大的工具,专为大规模日志数据的收集、聚合及传输设计。其核心架构包括源(Source)、通道(Channel)与接收器(Sink)。Flume支持多样化的拓扑结构以适应不同需求,包括单层、扇入(Fan-in)、扇出(Fan-out)及复杂多层拓扑。单层拓扑简单直观,适用于单一数据流场景;扇入结构集中处理多源头数据;扇出结构则实现数据多目的地分发;复杂多层拓扑提供高度灵活性,适合多层次数据处理。通过灵活配置,Flume能够高效构建各种规模的数据收集系统。
480 0

热门文章

最新文章