基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析

08:离线分析:Hbase表设计及构建

  • 目标掌握Hbase表的设计及创建表的实现
  • 路径
  • step1:基础设计
  • step2:Rowkey设计
  • step3:分区设计
  • step4:建表
  • 实施
  • 基础设计
  • Namespace:MOMO_CHAT
  • Table:MOMO_MSG
  • Family:C1
  • Qualifier:与数据中字段名保持一致

  • Rowkey设计
  • 查询需求:根据发件人id + 收件人id + 消息日期 查询聊天记录
  • 发件人账号
  • 收件人账号
  • 时间
  • 设计规则:业务、唯一、长度、散列、组合
  • 设计实现
  • 加盐方案:CRC、Hash、MD5、MUR
  • => 8位、16位、32位
MD5Hash【发件人账号_收件人账号_消息时间 =》 8位】_发件人账号_收件人账号_消息时间
  • 分区设计
  • Rowkey前缀:MD5编码,由字母和数字构成
  • 数据并发量:高
  • 分区设计:使用HexSplit16进制划分多个分区
  • 建表
  • 启动Hbase:start-hbase.sh
  • 进入客户端:hbase shell
#创建NS
create_namespace 'MOMO_CHAT'
#建表
create 'MOMO_CHAT:MOMO_MSG', {NAME => "C1", COMPRESSION => "GZ"}, { NUMREGIONS => 6, SPLITALGO => 'HexStringSplit'}
  • 小结
  • 掌握Hbase表的设计及创建表的实现

09:离线分析:Kafka消费者构建

  • 目标实现离线消费者的开发
  • 路径
  • 整体实现的路径
//入口:调用实现消费Kafka,将数据写入Hbase
public void main(){
    //step1:消费Kafka
    consumerKafka();
}
//用于消费Kafka数据
public void consumerKafka(){
    prop = new Properties()
  KafkaConsumer consumer = new KafkaConsumer(prop)
    consumer.subscribe("MOMO_MSG")
    ConsumerRecords  records = consumer.poll
    //基于每个分区来消费和处理
        record :Topic、Partition、Offset、Key、Value
      //step2:写入Hbase
        writeToHbase(value)
    //提交这个分区的offset
     commitSycn(offset+1)
}
//用于将value的数据写入Hbase方法
public void writeToHbase(){
    //step1:构建连接
    //step2:构建Table对象
    //step3:构建Put对象
    //获取rowkey
   rowkey = getRowkey(value)
    Put put = new Put(rowkey)
    put.添加每一列
    table.put()
}
public String getRowkey(){
    value.getSender
    value.getReceiver
    value.getTime
        rowkey = MD5+sender+receiverId +time
        return rowkey
}
  • 实施
/**
     * 用于消费Kafka的数据,将合法数据写入Hbase
     */
    private static void consumerKafkaToHbase() throws Exception {
        //构建配置对象
        Properties props = new Properties();
        //指定服务端地址
        props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        //指定消费者组的id
        props.setProperty("group.id", "momo");
        //关闭自动提交
        props.setProperty("enable.auto.commit", "false");
        //指定K和V反序列化的类型
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //构建消费者的连接
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //指定订阅哪些Topic
        consumer.subscribe(Arrays.asList("MOMO_MSG"));
        //持续拉取数据
        while (true) {
            //向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间
            //拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            //todo:3-处理拉取到的数据:打印
            //取出每个分区的数据进行处理
            Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区
            //对每个分区的数据做处理
            for (TopicPartition partition : partitions) {
                List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据
                //处理这个分区的数据
                long offset = 0;
                for (ConsumerRecord<String, String> record : partRecords) {
                    //获取Topic
                    String topic = record.topic();
                    //获取分区
                    int part = record.partition();
                    //获取offset
                    offset = record.offset();
                    //获取Key
                    String key = record.key();
                    //获取Value
                    String value = record.value();
                    System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);
                    //将Value数据写入Hbase
                    if(value != null && !"".equals(value) && value.split("\001").length == 20 ){
                        writeToHbase(value);
                    }
                }
                //手动提交分区的commit offset
                Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));
                consumer.commitSync(offsets);
            }
        }
    }
  • 小结
  • 实现离线消费者的开发

10:离线分析:Hbase连接构建

  • 目标实现Hbase连接的构建
  • 实施
private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  private static Connection conn;
    private static Table table;
    private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名
    private static byte[] family = Bytes.toBytes("C1");//列族
    // 静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能
    static{
        try {
            //构建配置对象
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
            //构建连接
            conn = ConnectionFactory.createConnection(conf);
            //获取表对象
            table = conn.getTable(tableName);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
  • 小结
  • 实现Hbase连接的构建

11:离线分析:Rowkey的构建

  • 目标实现Rowkey的构建
  • 实施
private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {
        //转换时间戳
        long time = format.parse(stime).getTime();
        String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;
        //构建MD5
        String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);
        //合并返回
        return prefix+"_"+suffix;
    }
  • 小结
  • 实现Rowkey的构建

12:离线分析:Put数据列构建

  • 目标实现Put数据列的构建
  • 实施
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));
  • 小结
  • 实现Put数据列的构建

13:离线分析:存储运行测试

  • 目标测试运行消费Kafka数据动态写入Hbase
  • 实施
  • 启动消费者程序
  • 启动Flume程序
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
  • 启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
10
  • 观察Hbase结果

  • 小结
  • 测试运行消费Kafka数据动态写入Hbase

14:离线分析:Hive关联测试

  • 目标使用Hive关联Hbase实现离线分析
  • 路径
  • step1:关联
  • step2:查询
  • 实施
  • 启动Hive和yarn
start-yarn.sh
hive-daemon.sh metastore
hive-daemon.sh hiveserver2
start-beeline.sh
  • 关联
create database MOMO_CHAT;
use MOMO_CHAT;
create external table if not exists MOMO_CHAT.MOMO_MSG (
  id string,
  msg_time string ,
  sender_nickyname string , 
  sender_account string , 
  sender_sex string , 
  sender_ip string ,
  sender_os string , 
  sender_phone_type string ,
  sender_network string , 
  sender_gps string , 
  receiver_nickyname string ,
  receiver_ip string ,
  receiver_account string ,
  receiver_os string ,
  receiver_phone_type string ,
  receiver_network string ,
  receiver_gps string ,
  receiver_sex string ,
  msg_type string ,
  distance string ,
  message string 
) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,C1:sender_nickyname, 
C1:sender_account,C1:sender_sex,C1:sender_ip,C1:sender_os,C1:sender_phone_type,
C1:sender_network,C1:sender_gps,C1:receiver_nickyname,C1:receiver_ip,C1:receiver_account,
C1:receiver_os,C1:receiver_phone_type,C1:receiver_network,C1:receiver_gps,C1:receiver_sex,
C1:msg_type,C1:distance,C1:message ') tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');
  • 分析查询
--基础查询
select 
  msg_time,sender_nickyname,receiver_nickyname,distance 
from momo_msg limit 10;
--查询聊天记录:发送人id + 接收人id + 日期:1f300e5d_13280256412_15260978785_1632888342000
select 
  * 
from momo_msg 
where sender_account='13280256412' 
and receiver_account='15260978785' 
and substr(msg_time,0,10) = '2021-09-29';
--统计每个小时的消息数
select
  substr(msg_time,0,13) as hour,
  count(*) as cnt
from momo_msg
group by substr(msg_time,0,13);
  • 小结
  • 使用Hive关联Hbase实现离线分析

15:离线分析:Phoenix关联测试

  • 目标使用Phoenix关联Hbase实现即时查询
  • 路径
  • step1:关联
  • step2:查询
  • 实施
  • 启动
cd /export/server/phoenix-5.0.0-HBase-2.0-bin/
bin/sqlline.py node1:2181
  • 关联
create view if not exists MOMO_CHAT.MOMO_MSG (
  "id" varchar primary key,
  C1."msg_time" varchar ,
  C1."sender_nickyname" varchar , 
  C1."sender_account" varchar , 
  C1."sender_sex" varchar , 
  C1."sender_ip" varchar ,
  C1."sender_os" varchar , 
  C1."sender_phone_type" varchar ,
  C1."sender_network" varchar , 
  C1."sender_gps" varchar , 
  C1."receiver_nickyname" varchar ,
  C1."receiver_ip" varchar ,
  C1."receiver_account" varchar ,
  C1."receiver_os" varchar ,
  C1."receiver_phone_type" varchar ,
  C1."receiver_network" varchar ,
  C1."receiver_gps" varchar ,
  C1."receiver_sex" varchar ,
  C1."msg_type" varchar ,
  C1."distance" varchar ,
  C1."message" varchar
);
  • 即时查询
--基础查询
select 
  "id",c1."sender_account",c1."receiver_account" 
from momo_chat.momo_msg 
limit 10;
--查询每个发送人发送的消息数
select 
  c1."sender_account" ,
  count(*) as cnt 
from momo_chat.momo_msg 
group by c1."sender_account";
--查询每个发送人聊天的人数
select 
  c1."sender_account" ,
  count(distinct c1."receiver_account") as cnt 
from momo_chat.momo_msg 
group by c1."sender_account" 
order by cnt desc;
  • 小结
  • 使用Phoenix关联Hbase实现即时查询
目录
相关文章
|
14天前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
59 2
|
18天前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
|
17天前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
13天前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
13天前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
19天前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
56 9
|
1月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
49 3
|
18天前
|
消息中间件 缓存 Kafka
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
|
2月前
|
消息中间件 存储 Kafka
kafka 在 zookeeper 中保存的数据内容
kafka 在 zookeeper 中保存的数据内容
39 3
|
24天前
|
消息中间件 SQL Java
实时数仓 Hologres产品使用合集之如何用python将kafka数据写入
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。