基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化

22:FineBI配置数据集

  • 目标实现FineBI访问MySQL结果数据集的配置
  • 实施
  • 安装FineBI
  • 参考《FineBI Windows版本安装手册.docx》安装FineBI

  • 配置连接


数据连接名称:Momo
用户名:root
密码:自己MySQL的密码
数据连接URL:jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=utf8

  • 数据准备


SELECT  
 id, momo_totalcount,momo_province,momo_username,momo_msgcount,
 CASE momo_grouptype WHEN '1' THEN '总消息量' WHEN '2' THEN '各省份发送量'  WHEN '3' THEN '各省份接收量'
  WHEN '4' THEN '各用户发送量' WHEN '5' THEN '各用户接收量' END AS momo_grouptype
FROM  momo_count
  • 小结
  • 实现FineBI访问MySQL结果数据集的配置

23:FineBI构建报表

  • 目标实现FineBI实时报表构建
  • 路径
  • step1:实时报表构建
  • step2:实时报表配置
  • step3:实时刷新测试
  • 实施
  • 实时报表构建
  • 新建仪表盘


  • 添加标题


  • 实时总消息数

  • 发送消息最多的Top10用户








  • 接受消息最多的Top10用户




  • 各省份发送消息Top10



  • 各省份接收消息Top10






  • 各省份总消息量




  • 小结
  • 实现FineBI实时报表构建

24:FineBI实时配置测试

  • 目标:实现实时报表测试
  • 实施
  • 实时报表配置
  • 注意:如果提示已存在,就选择覆盖
  • 添加JS文件
  • 创建js文件:refresh.js
setTimeout(function () {
 var b =document.title;
 var a =BI.designConfigure.reportId;//获取仪表板id
 //这里要指定自己仪表盘的id
 if (a=="d574631848bd4e33acae54f986d34e69") {
  setInterval(function () {
   BI.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
   //Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
   BI.Utils.broadcastAllWidgets2Refresh(true);
  }, 3000);//5000000为定时刷新的频率,单位ms
 }
}, 2000)
  • 将创建好的refresh.js文件放至 FineBI 安装目录%FineBI%/webapps/webroot中

  • 关闭FineBI缓存,然后关闭FineBI

  • 修改jar包,添加js



<!-- 增加刷新功能 --> 
<script type="text/javascript" src="/webroot/refresh.js"></script>

         
  • 重启FineBI
  • 实时刷新测试
  • 清空MySQL结果表
  • 启动Flink程序:运行MoMoFlinkCount
  • 启动Flume程序
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
  • 启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
10
- 观察报表



  • 小结
  • 实现FineBI实时测试
## 附录一:Maven依赖
```xml
  <!--远程仓库-->
  <repositories>
      <repository>
          <id>aliyun</id>
          <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
          <releases><enabled>true</enabled></releases>
          <snapshots>
              <enabled>false</enabled>
              <updatePolicy>never</updatePolicy>
          </snapshots>
      </repository>
  </repositories>
  <dependencies>
      <!--Hbase 客户端-->
      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-client</artifactId>
          <version>2.1.0</version>
      </dependency>
      <!--kafka 客户端-->
      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.4.1</version>
      </dependency>
      <!--JSON解析工具包-->
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.62</version>
      </dependency>
      <!--Flink依赖-->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-runtime-web_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <!-- flink操作hdfs、Kafka、MySQL、Redis,所需要导入该包-->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-shaded-hadoop-2-uber</artifactId>
          <version>2.7.5-10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-jdbc_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>flink-connector-redis_2.11</artifactId>
          <version>1.0</version>
      </dependency>
      <!--HTTP请求的的依赖-->
      <dependency>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpclient</artifactId>
          <version>4.5.4</version>
      </dependency>
      <!--MySQL连接驱动-->
      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.38</version>
      </dependency>
  </dependencies>
  <build>
      <plugins>
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.1</version>
              <configuration>
                  <target>1.8</target>
                  <source>1.8</source>
              </configuration>
          </plugin>
      </plugins>
  </build>

附录二:离线消费者完整代码

package bigdata.itcast.cn.momo.offline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
/**
 * @ClassName MomoKafkaToHbase
 * @Description TODO 离线场景:消费Kafka的数据写入Hbase
 * @Create By     Maynor
 */
public class MomoKafkaToHbase {
    private  static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static Connection conn;
    private static Table table;
    private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名
    private static byte[] family = Bytes.toBytes("C1");//列族
    //todo:2-构建Hbase连接
    //静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能
    static{
        try {
            //构建配置对象
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
            //构建连接
            conn = ConnectionFactory.createConnection(conf);
            //获取表对象
            table = conn.getTable(tableName);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws Exception {
        //todo:1-构建消费者,获取数据
        consumerKafkaToHbase();
//        String momoRowkey = getMomoRowkey("2020-08-13 12:30:00", "13071949728", "17719988692");
//        System.out.println(momoRowkey);
    }
    /**
     * 用于消费Kafka的数据,将合法数据写入Hbase
     */
    private static void consumerKafkaToHbase() throws Exception {
        //构建配置对象
        Properties props = new Properties();
        //指定服务端地址
        props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        //指定消费者组的id
        props.setProperty("group.id", "momo1");
        //关闭自动提交
        props.setProperty("enable.auto.commit", "false");
        //指定K和V反序列化的类型
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //构建消费者的连接
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //指定订阅哪些Topic
        consumer.subscribe(Arrays.asList("MOMO_MSG"));
        //持续拉取数据
        while (true) {
            //向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间
            //拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            //todo:3-处理拉取到的数据:打印
            //取出每个分区的数据进行处理
            Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区
            //对每个分区的数据做处理
            for (TopicPartition partition : partitions) {
                List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据
                //处理这个分区的数据
                long offset = 0;
                for (ConsumerRecord<String, String> record : partRecords) {
                    //获取Topic
                    String topic = record.topic();
                    //获取分区
                    int part = record.partition();
                    //获取offset
                    offset = record.offset();
                    //获取Key
                    String key = record.key();
                    //获取Value
                    String value = record.value();
                    System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);
                    //将Value数据写入Hbase
                    if(value != null && !"".equals(value) && value.split("\001").length == 20 ){
                        writeToHbase(value);
                    }
                }
                //手动提交分区的commit offset
                Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));
                consumer.commitSync(offsets);
            }
        }
    }
    /**
     * 用于实现具体的写入Hbase的方法
     * @param value
     */
    private static void writeToHbase(String value) throws Exception {
        //todo:3-写入Hbase
        //切分数据
        String[] items = value.split("\001");
        String stime = items[0];
        String sender_accounter = items[2];
        String receiver_accounter = items[11];
        //构建rowkey
        String rowkey = getMomoRowkey(stime,sender_accounter,receiver_accounter);
        //构建Put
        Put put = new Put(Bytes.toBytes(rowkey));
        //添加列
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));
        //执行写入
        table.put(put);
    }
    /**
     * 基于消息时间、发送人id、接受人id构建rowkey
     * @param stime
     * @param sender_accounter
     * @param receiver_accounter
     * @return
     * @throws Exception
     */
    private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {
        //转换时间戳
        long time = format.parse(stime).getTime();
        String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;
        //构建MD5
        String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);
        //合并返回
        return prefix+"_"+suffix;
    }
}


目录
相关文章
|
1月前
|
消息中间件 缓存 关系型数据库
Flink CDC产品常见问题之upsert-kafka增加参数报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
3月前
|
消息中间件 Kafka 流计算
Flink的分区表订阅功能是通过Kafka的topic分区来实现的
Flink的分区表订阅功能是通过Kafka的topic分区来实现的【1月更文挑战第6天】【1月更文挑战第26篇】
99 1
|
2月前
|
消息中间件 关系型数据库 MySQL
Flink问题子实现Kafka到Mysql如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
378 2
|
2月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
69 2
|
30天前
|
消息中间件 关系型数据库 MySQL
Flink CDC产品常见问题之用upsert的方式写入kafka失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
30天前
|
消息中间件 关系型数据库 Kafka
Flink CDC产品常见问题之Flink CDC里从kafka消费的时候顺序混乱如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
分布式计算 资源调度 Hadoop
Flink报错问题之Sql往kafka表写聚合数据报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 关系型数据库
Flink CDC数据同步问题之向kafka同步数据报错如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
3月前
|
消息中间件 SQL Java
阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录
阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录
|
4月前
|
消息中间件 Kafka 流计算
Flink消费kafka数据
Flink消费kafka数据
42 0

热门文章

最新文章