版本说明:
flink-1.12.1
hbase-1.4.13
目录
(1)Flink SQL与HBase的集成配置
(2)测试Flink SQL与HBase集成代码
(3)测试kafka数据源与HBase写入数据
(1)Flink SQL与HBase的集成配置
第一步:Maven dependency
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hbase-base_2.11</artifactId> <version>1.12.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-hbase-1.4_2.11</artifactId> <version>1.12.1</version> </dependency>
第二步:将flink-sql-connector-hbase-1.4_2.11-1.12.1.jar这个包复制到flink的lib目录下
flink-sql-connector-hbase-1.4_2.11-1.12.1.jar这个包在maven仓库下载:
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hbase-1.4_2.11/1.12.1
(2)测试Flink SQL与HBase集成代码
在测试代码之前需要将启动hbase服务,并创建写入hbase的数据表
create 'user_product_hbase','cf'
package com.aikfk.flink.sql.hbase; import com.aikfk.flink.sql.CommonSQL; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; public class FlinkKafkaHBase { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings); env.enableCheckpointing(5000); HiveCatalog hiveCatalog = new HiveCatalog( CommonSQL.hiveCatalog_name, CommonSQL.hiveDatabase_name, CommonSQL.hiveConfDir, CommonSQL.version ); tableEnvironment.registerCatalog(CommonSQL.hiveCatalog_name,hiveCatalog); tableEnvironment.useCatalog(CommonSQL.hiveCatalog_name); String user_product_kafka_create = "CREATE TABLE user_product_kafka (\n" + " row_key STRING," + " user_id STRING," + " product_id STRING," + " click_count INT " + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'kfk'," + " 'properties.bootstrap.servers' = 'bigdata-pro-m07:9092'," + " 'properties.group.id' = 'test1'," + " 'format' = 'json'," + " 'scan.startup.mode' = 'latest-offset'" + ")"; tableEnvironment.executeSql("DROP TABLE IF EXISTS user_product_kafka"); tableEnvironment.executeSql(user_product_kafka_create); tableEnvironment.executeSql("DROP TABLE IF EXISTS user_product_hbase"); String user_product_hbase_create = "CREATE TABLE user_product_hbase (\n" + " row_key STRING,\n" + " cf ROW<user_id STRING,product_id STRING,click_count INT>,\n" + " PRIMARY KEY (row_key) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'hbase-1.4',\n" + " 'table-name' = 'user_product_hbase',\n" + " 'zookeeper.quorum' = 'bigdata-pro-m07:2181'\n" + ")"; tableEnvironment.executeSql(user_product_hbase_create); tableEnvironment.executeSql( "INSERT INTO user_product_hbase\n" + "SELECT row_key, ROW(user_id, product_id, click_count) FROM user_product_kafka").print(); env.execute(); } }
(3)测试kafka数据源与HBase写入数据
生产者生产数据:
package com.aikfk.flink.base; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.Random; public class KafkaProducerUtilSimple extends Thread { private String topic = "kfk"; public KafkaProducerUtilSimple() { super(); } private Producer<String, String> createProducer() { // 通过Properties类设置Producer的属性 Properties properties = new Properties(); properties.put("bootstrap.servers", "bigdata-pro-m07:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<String, String>(properties); } @Override public void run() { Producer<String, String> producer = createProducer(); Random random = new Random(); Random random2 = new Random(); while (true) { String user_id = "user_"+random.nextInt(10); String product_id = "product_"+random2.nextInt(100); System.out.println(user_id + " :" + product_id); String time = System.currentTimeMillis() / 1000 + 5 + ""; String row_key = user_id+"_"+product_id+"_"+time; try { // String kaifa_log = "{" + "\"row_key\":\"" + row_key+"\"," + "\"user_id\":\"" + user_id+"\"," + "\"product_id\":\"" + product_id+"\"," + "\"click_count\":\"1\"}"; System.out.println("kaifa_log = " + kaifa_log); producer.send(new ProducerRecord<String, String>(this.topic, kaifa_log)); } catch (Exception e) { e.printStackTrace(); } System.out.println("=========循环一次=========="); try { sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { new KafkaProducerUtilSimple().run(); } }
生产结果:
user_8 :product_48 kaifa_log = {"row_key":"user_8_product_48_1618237864","user_id":"user_8","product_id":"product_48","click_count":"1"} =========循环一次========== user_6 :product_53 kaifa_log = {"row_key":"user_6_product_53_1618237867","user_id":"user_6","product_id":"product_53","click_count":"1"} =========循环一次========== user_8 :product_34 kaifa_log = {"row_key":"user_8_product_34_1618237870","user_id":"user_8","product_id":"product_34","click_count":"1"} =========循环一次==========
通过flink SQL client查看数据
查看hbase写入数据情况: