Flink SQL与HBase的集成

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

版本说明:


flink-1.12.1

hbase-1.4.13

目录

(1)Flink SQL与HBase的集成配置

(2)测试Flink SQL与HBase集成代码

(3)测试kafka数据源与HBase写入数据


(1)Flink SQL与HBase的集成配置


第一步:Maven dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hbase-base_2.11</artifactId>
  <version>1.12.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-sql-connector-hbase-1.4_2.11</artifactId>
  <version>1.12.1</version>
</dependency>

第二步:将flink-sql-connector-hbase-1.4_2.11-1.12.1.jar这个包复制到flink的lib目录下


flink-sql-connector-hbase-1.4_2.11-1.12.1.jar这个包在maven仓库下载:


https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hbase-1.4_2.11/1.12.1


(2)测试Flink SQL与HBase集成代码


在测试代码之前需要将启动hbase服务,并创建写入hbase的数据表

create 'user_product_hbase','cf'
package com.aikfk.flink.sql.hbase;
import com.aikfk.flink.sql.CommonSQL;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class FlinkKafkaHBase {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        env.enableCheckpointing(5000);
        HiveCatalog hiveCatalog =
                new HiveCatalog(
                        CommonSQL.hiveCatalog_name,
                        CommonSQL.hiveDatabase_name,
                        CommonSQL.hiveConfDir,
                        CommonSQL.version
                        );
        tableEnvironment.registerCatalog(CommonSQL.hiveCatalog_name,hiveCatalog);
        tableEnvironment.useCatalog(CommonSQL.hiveCatalog_name);
        String user_product_kafka_create =
                "CREATE TABLE user_product_kafka (\n" +
                        " row_key STRING," +
                        " user_id STRING," +
                        " product_id STRING," +
                        " click_count INT " +
                        ") WITH (" +
                        " 'connector' = 'kafka'," +
                        " 'topic' = 'kfk'," +
                        " 'properties.bootstrap.servers' = 'bigdata-pro-m07:9092'," +
                        " 'properties.group.id' = 'test1'," +
                        " 'format' = 'json'," +
                        " 'scan.startup.mode' = 'latest-offset'" +
                        ")";
        tableEnvironment.executeSql("DROP TABLE IF EXISTS user_product_kafka");
        tableEnvironment.executeSql(user_product_kafka_create);
        tableEnvironment.executeSql("DROP TABLE IF EXISTS user_product_hbase");
        String user_product_hbase_create =
                        "CREATE TABLE user_product_hbase (\n" +
                        " row_key STRING,\n" +
                        " cf ROW<user_id STRING,product_id STRING,click_count INT>,\n" +
                        " PRIMARY KEY (row_key) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        " 'connector' = 'hbase-1.4',\n" +
                        " 'table-name' = 'user_product_hbase',\n" +
                        " 'zookeeper.quorum' = 'bigdata-pro-m07:2181'\n" +
                        ")";
        tableEnvironment.executeSql(user_product_hbase_create);
        tableEnvironment.executeSql(
                "INSERT INTO user_product_hbase\n" +
                "SELECT row_key, ROW(user_id, product_id, click_count) FROM user_product_kafka").print();
        env.execute();
    }
}


(3)测试kafka数据源与HBase写入数据


生产者生产数据:

package com.aikfk.flink.base;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerUtilSimple extends Thread {
        private String topic = "kfk";
        public KafkaProducerUtilSimple() {
            super();
        }
        private Producer<String, String> createProducer() {
            // 通过Properties类设置Producer的属性
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "bigdata-pro-m07:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            return new KafkaProducer<String, String>(properties);
        }
        @Override
        public void run() {
            Producer<String, String> producer = createProducer();
            Random random = new Random();
            Random random2 = new Random();
            while (true) {
                String user_id = "user_"+random.nextInt(10);
                String product_id = "product_"+random2.nextInt(100);
                System.out.println(user_id + " :" + product_id);
                String time = System.currentTimeMillis() / 1000 + 5 + "";
                String row_key = user_id+"_"+product_id+"_"+time;
                try {
//
                    String kaifa_log = "{" +
                            "\"row_key\":\"" + row_key+"\"," +
                            "\"user_id\":\"" + user_id+"\"," +
                            "\"product_id\":\"" + product_id+"\"," +
                            "\"click_count\":\"1\"}";
                    System.out.println("kaifa_log = " + kaifa_log);
                    producer.send(new ProducerRecord<String, String>(this.topic, kaifa_log));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("=========循环一次==========");
                try {
                    sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String[] args) {
            new KafkaProducerUtilSimple().run();
        }
    }

生产结果:

user_8 :product_48
kaifa_log = {"row_key":"user_8_product_48_1618237864","user_id":"user_8","product_id":"product_48","click_count":"1"}
=========循环一次==========
user_6 :product_53
kaifa_log = {"row_key":"user_6_product_53_1618237867","user_id":"user_6","product_id":"product_53","click_count":"1"}
=========循环一次==========
user_8 :product_34
kaifa_log = {"row_key":"user_8_product_34_1618237870","user_id":"user_8","product_id":"product_34","click_count":"1"}
=========循环一次==========

通过flink SQL client查看数据

5.png

查看hbase写入数据情况:

4.png




相关文章
|
27天前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
171 26
|
5月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
238 15
|
2月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
180 14
|
4月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
89 0
|
5月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
119 2
|
5月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
69 1
|
5月前
|
分布式计算 Java Hadoop
java使用hbase、hadoop报错举例
java使用hbase、hadoop报错举例
142 4
|
4月前
|
分布式计算 Hadoop Shell
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
115 4
|
4月前
|
SQL 分布式计算 Hadoop
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
55 3