Flink SQL与HBase的集成

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

版本说明:


flink-1.12.1

hbase-1.4.13

目录

(1)Flink SQL与HBase的集成配置

(2)测试Flink SQL与HBase集成代码

(3)测试kafka数据源与HBase写入数据


(1)Flink SQL与HBase的集成配置


第一步:Maven dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hbase-base_2.11</artifactId>
  <version>1.12.1</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-sql-connector-hbase-1.4_2.11</artifactId>
  <version>1.12.1</version>
</dependency>

第二步:将flink-sql-connector-hbase-1.4_2.11-1.12.1.jar这个包复制到flink的lib目录下


flink-sql-connector-hbase-1.4_2.11-1.12.1.jar这个包在maven仓库下载:


https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hbase-1.4_2.11/1.12.1


(2)测试Flink SQL与HBase集成代码


在测试代码之前需要将启动hbase服务,并创建写入hbase的数据表

create 'user_product_hbase','cf'
package com.aikfk.flink.sql.hbase;
import com.aikfk.flink.sql.CommonSQL;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class FlinkKafkaHBase {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        env.enableCheckpointing(5000);
        HiveCatalog hiveCatalog =
                new HiveCatalog(
                        CommonSQL.hiveCatalog_name,
                        CommonSQL.hiveDatabase_name,
                        CommonSQL.hiveConfDir,
                        CommonSQL.version
                        );
        tableEnvironment.registerCatalog(CommonSQL.hiveCatalog_name,hiveCatalog);
        tableEnvironment.useCatalog(CommonSQL.hiveCatalog_name);
        String user_product_kafka_create =
                "CREATE TABLE user_product_kafka (\n" +
                        " row_key STRING," +
                        " user_id STRING," +
                        " product_id STRING," +
                        " click_count INT " +
                        ") WITH (" +
                        " 'connector' = 'kafka'," +
                        " 'topic' = 'kfk'," +
                        " 'properties.bootstrap.servers' = 'bigdata-pro-m07:9092'," +
                        " 'properties.group.id' = 'test1'," +
                        " 'format' = 'json'," +
                        " 'scan.startup.mode' = 'latest-offset'" +
                        ")";
        tableEnvironment.executeSql("DROP TABLE IF EXISTS user_product_kafka");
        tableEnvironment.executeSql(user_product_kafka_create);
        tableEnvironment.executeSql("DROP TABLE IF EXISTS user_product_hbase");
        String user_product_hbase_create =
                        "CREATE TABLE user_product_hbase (\n" +
                        " row_key STRING,\n" +
                        " cf ROW<user_id STRING,product_id STRING,click_count INT>,\n" +
                        " PRIMARY KEY (row_key) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        " 'connector' = 'hbase-1.4',\n" +
                        " 'table-name' = 'user_product_hbase',\n" +
                        " 'zookeeper.quorum' = 'bigdata-pro-m07:2181'\n" +
                        ")";
        tableEnvironment.executeSql(user_product_hbase_create);
        tableEnvironment.executeSql(
                "INSERT INTO user_product_hbase\n" +
                "SELECT row_key, ROW(user_id, product_id, click_count) FROM user_product_kafka").print();
        env.execute();
    }
}


(3)测试kafka数据源与HBase写入数据


生产者生产数据:

package com.aikfk.flink.base;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerUtilSimple extends Thread {
        private String topic = "kfk";
        public KafkaProducerUtilSimple() {
            super();
        }
        private Producer<String, String> createProducer() {
            // 通过Properties类设置Producer的属性
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "bigdata-pro-m07:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            return new KafkaProducer<String, String>(properties);
        }
        @Override
        public void run() {
            Producer<String, String> producer = createProducer();
            Random random = new Random();
            Random random2 = new Random();
            while (true) {
                String user_id = "user_"+random.nextInt(10);
                String product_id = "product_"+random2.nextInt(100);
                System.out.println(user_id + " :" + product_id);
                String time = System.currentTimeMillis() / 1000 + 5 + "";
                String row_key = user_id+"_"+product_id+"_"+time;
                try {
//
                    String kaifa_log = "{" +
                            "\"row_key\":\"" + row_key+"\"," +
                            "\"user_id\":\"" + user_id+"\"," +
                            "\"product_id\":\"" + product_id+"\"," +
                            "\"click_count\":\"1\"}";
                    System.out.println("kaifa_log = " + kaifa_log);
                    producer.send(new ProducerRecord<String, String>(this.topic, kaifa_log));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("=========循环一次==========");
                try {
                    sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String[] args) {
            new KafkaProducerUtilSimple().run();
        }
    }

生产结果:

user_8 :product_48
kaifa_log = {"row_key":"user_8_product_48_1618237864","user_id":"user_8","product_id":"product_48","click_count":"1"}
=========循环一次==========
user_6 :product_53
kaifa_log = {"row_key":"user_6_product_53_1618237867","user_id":"user_6","product_id":"product_53","click_count":"1"}
=========循环一次==========
user_8 :product_34
kaifa_log = {"row_key":"user_8_product_34_1618237870","user_id":"user_8","product_id":"product_34","click_count":"1"}
=========循环一次==========

通过flink SQL client查看数据

5.png

查看hbase写入数据情况:

4.png




相关文章
|
16天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
48 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
163 15
|
17天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
48 1
|
2月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
582 1
Flink CDC:新一代实时数据集成框架
|
1月前
|
SQL 机器学习/深度学习 数据库
SQL与Python集成:数据库操作无缝衔接
在开始之前,确保你已经安装了必要的Python库,如`sqlite3`(用于SQLite数据库)或`psycopg2`(用于PostgreSQL数据库)。这些库提供了Python与SQL数据库之间的接口。
|
1月前
|
SQL 数据库连接 数据库
管理系统中的Visual Studio与SQL集成技巧与方法
在现代软件开发和管理系统中,Visual Studio(VS)作为强大的集成开发环境(IDE),与SQL数据库的紧密集成是构建高效、可靠应用程序的关键
|
1月前
|
SQL 机器学习/深度学习 数据采集
SQL与Python集成:数据库操作无缝衔接2a.bijius.com
Python与SQL的集成是现代数据科学和工程实践的核心。通过有效的数据查询、管理与自动化,可以显著提升数据分析和决策过程的效率与准确性。随着技术的不断发展,这种集成的应用场景将更加广泛,为数据驱动的创新提供更强大的支持。
|
1月前
|
SQL 机器学习/深度学习 数据库
SQL与Python集成:数据库操作无缝衔接
1. Python与SQL集成的关键步骤 在开始之前,确保你已经安装了必要的Python库,如`sqlite3`(用于SQLite数据库)或`psycopg2`(用于PostgreSQL数据库)。这些库提供了Python与SQL数据库之间的接口。
|
1月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
45 0
|
1月前
|
SQL 监控 数据库
管理系统VS SQL:高效集成的关键技巧与方法
在现代企业信息化建设中,管理系统(如ERP、CRM等)与SQL数据库之间的紧密集成是确保数据流动顺畅、业务逻辑高效执行的关键