Flink SQL与HBase的集成

简介: 笔记

版本说明:


flink-1.12.1

hbase-1.4.13

目录

(1)Flink SQL与HBase的集成配置

(2)测试Flink SQL与HBase集成代码

(3)测试kafka数据源与HBase写入数据


(1)Flink SQL与HBase的集成配置


第一步:Maven dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hbase-base_2.11</artifactId>
  <version>1.12.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-sql-connector-hbase-1.4_2.11</artifactId>
  <version>1.12.1</version>
</dependency>

第二步:将flink-sql-connector-hbase-1.4_2.11-1.12.1.jar这个包复制到flink的lib目录下


flink-sql-connector-hbase-1.4_2.11-1.12.1.jar这个包在maven仓库下载:


https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hbase-1.4_2.11/1.12.1


(2)测试Flink SQL与HBase集成代码


在测试代码之前需要将启动hbase服务,并创建写入hbase的数据表

create 'user_product_hbase','cf'
package com.aikfk.flink.sql.hbase;
import com.aikfk.flink.sql.CommonSQL;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class FlinkKafkaHBase {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        env.enableCheckpointing(5000);
        HiveCatalog hiveCatalog =
                new HiveCatalog(
                        CommonSQL.hiveCatalog_name,
                        CommonSQL.hiveDatabase_name,
                        CommonSQL.hiveConfDir,
                        CommonSQL.version
                        );
        tableEnvironment.registerCatalog(CommonSQL.hiveCatalog_name,hiveCatalog);
        tableEnvironment.useCatalog(CommonSQL.hiveCatalog_name);
        String user_product_kafka_create =
                "CREATE TABLE user_product_kafka (\n" +
                        " row_key STRING," +
                        " user_id STRING," +
                        " product_id STRING," +
                        " click_count INT " +
                        ") WITH (" +
                        " 'connector' = 'kafka'," +
                        " 'topic' = 'kfk'," +
                        " 'properties.bootstrap.servers' = 'bigdata-pro-m07:9092'," +
                        " 'properties.group.id' = 'test1'," +
                        " 'format' = 'json'," +
                        " 'scan.startup.mode' = 'latest-offset'" +
                        ")";
        tableEnvironment.executeSql("DROP TABLE IF EXISTS user_product_kafka");
        tableEnvironment.executeSql(user_product_kafka_create);
        tableEnvironment.executeSql("DROP TABLE IF EXISTS user_product_hbase");
        String user_product_hbase_create =
                        "CREATE TABLE user_product_hbase (\n" +
                        " row_key STRING,\n" +
                        " cf ROW<user_id STRING,product_id STRING,click_count INT>,\n" +
                        " PRIMARY KEY (row_key) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        " 'connector' = 'hbase-1.4',\n" +
                        " 'table-name' = 'user_product_hbase',\n" +
                        " 'zookeeper.quorum' = 'bigdata-pro-m07:2181'\n" +
                        ")";
        tableEnvironment.executeSql(user_product_hbase_create);
        tableEnvironment.executeSql(
                "INSERT INTO user_product_hbase\n" +
                "SELECT row_key, ROW(user_id, product_id, click_count) FROM user_product_kafka").print();
        env.execute();
    }
}


(3)测试kafka数据源与HBase写入数据


生产者生产数据:

package com.aikfk.flink.base;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerUtilSimple extends Thread {
        private String topic = "kfk";
        public KafkaProducerUtilSimple() {
            super();
        }
        private Producer<String, String> createProducer() {
            // 通过Properties类设置Producer的属性
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "bigdata-pro-m07:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            return new KafkaProducer<String, String>(properties);
        }
        @Override
        public void run() {
            Producer<String, String> producer = createProducer();
            Random random = new Random();
            Random random2 = new Random();
            while (true) {
                String user_id = "user_"+random.nextInt(10);
                String product_id = "product_"+random2.nextInt(100);
                System.out.println(user_id + " :" + product_id);
                String time = System.currentTimeMillis() / 1000 + 5 + "";
                String row_key = user_id+"_"+product_id+"_"+time;
                try {
//
                    String kaifa_log = "{" +
                            "\"row_key\":\"" + row_key+"\"," +
                            "\"user_id\":\"" + user_id+"\"," +
                            "\"product_id\":\"" + product_id+"\"," +
                            "\"click_count\":\"1\"}";
                    System.out.println("kaifa_log = " + kaifa_log);
                    producer.send(new ProducerRecord<String, String>(this.topic, kaifa_log));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("=========循环一次==========");
                try {
                    sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String[] args) {
            new KafkaProducerUtilSimple().run();
        }
    }

生产结果:

user_8 :product_48
kaifa_log = {"row_key":"user_8_product_48_1618237864","user_id":"user_8","product_id":"product_48","click_count":"1"}
=========循环一次==========
user_6 :product_53
kaifa_log = {"row_key":"user_6_product_53_1618237867","user_id":"user_6","product_id":"product_53","click_count":"1"}
=========循环一次==========
user_8 :product_34
kaifa_log = {"row_key":"user_8_product_34_1618237870","user_id":"user_8","product_id":"product_34","click_count":"1"}
=========循环一次==========

通过flink SQL client查看数据

5.png

查看hbase写入数据情况:

4.png




相关文章
|
6月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
967 43
|
5月前
|
SQL 数据可视化 关系型数据库
MCP与PolarDB集成技术分析:降低SQL门槛与简化数据可视化流程的机制解析
阿里云PolarDB与MCP协议融合,打造“自然语言即分析”的新范式。通过云原生数据库与标准化AI接口协同,实现零代码、分钟级从数据到可视化洞察,打破技术壁垒,提升分析效率99%,推动企业数据能力普惠化。
467 3
|
6月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
419 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
7月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
1074 1
|
11月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
561 6
|
9月前
|
分布式计算 Ubuntu Hadoop
Ubuntu22.04下搭建Hadoop3.3.6+Hbase2.5.6+Phoenix5.1.3开发环境的指南
呈上,这些步骤如诗如画,但有效且动人。仿佛一个画家在画布上描绘出一幅完美的画面,这就是你的开发环境。接下来,尽情去创造吧,祝编程愉快!
654 19
|
分布式计算 Java Hadoop
java使用hbase、hadoop报错举例
java使用hbase、hadoop报错举例
382 4
|
分布式计算 Hadoop Shell
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
358 4
|
SQL 分布式计算 Hadoop
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
222 3