开发者社区> 问答> 正文

Flink 1.11.1 输出Hbase,除rowkey以外全部为null

Hi, all 我在本机进行FLINK输出到HBase的测试,采用datagen生成数据,直接输出到Hbase,发现除rowkey以外全部为null。 请大家帮我分析下问题,谢谢。

Flink 版本 1.11.1;HBASE 版本: 1.4.13 ; phoenix 版本: 4.15.0-HBase-1.4

HBASE中的建表SQL: CREATE TABLE IF NOT EXISTS "ods_iot_gasdevice"( "rowkey" varchar not null primary key, "base_info"."device_id" INTEGER, "base_info"."verify_code" varchar, "status_info"."battery_power" INTEGER, "status_info"."device_status" INTEGER, "time_characteristics"."create_time" TIMESTAMP ); FLINK SQL: CREATE TABLE gasmessage ( deviceId int COMMENT '设备id', deviceStatus int COMMENT '设备状态', notifyType varchar COMMENT '消息类型', batteryStatus int COMMENT '电池电量', verifyCode varchar COMMENT '设备码', createTime as localtimestamp, dataType int COMMENT '数据类型', messageId int COMMENT 'messageId', -- 定义业务字段 createTime 为时间时间,并使用5秒延迟水印策略 WATERMARK FOR createTime AS createTime - INTERVAL '5' SECOND ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.deviceId.kind'='sequence', 'fields.deviceId.start'='2900', 'fields.deviceId.end'='2950', 'fields.deviceStatus.min'='1', 'fields.deviceStatus.max'='4', 'fields.notifyType.length'='17', 'fields.batteryStatus.min'='1', 'fields.batteryStatus.max'='100', 'fields.verifyCode.length'='15', 'fields.dataType.min'='1', 'fields.dataType.max'='4' );

CREATE TABLE ods_iot_gasdevice ( rowkey String, base_info ROW<device_id INT, verify_code String >, status_info ROW <battery_power INT, device_status INT>, time_characteristics ROW <create_time timestamp (3)>, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-1.4', 'table-name' = 'ods_iot_gasdevice', 'zookeeper.quorum' = 'hbase-master-0.hbase-master.default.svc.cluster.local:2181', 'sink.buffer-flush.max-rows' = '20' );

-- 用 ROW(...) 构造函数构造列簇,并往 HBase 表写数据。 INSERT INTO ods_iot_gasdevice SELECT Cast(deviceId as varchar) as rowkey, ROW (deviceId , verifyCode), ROW (batteryStatus, deviceStatus), ROW (createTime) FROM gasmessage;

create table print with ( 'connector' = 'print' ) LIKE ods_iot_gasdevice (EXCLUDING ALL);

insert into print SELECT Cast(deviceId as varchar) as rowkey, ROW (deviceId, verifyCode), ROW (batteryStatus, deviceStatus), ROW (createTime) FROM gasmessage;

测试结果: 0: jdbc:phoenix:> select * from "ods_iot_gasdevice"; +---------+------------+--------------+----------------+----------------+--------------+ | rowkey | device_id | verify_code | battery_power | device_status | create_time | +---------+------------+--------------+----------------+----------------+--------------+ | 2900 | null | | null | null | null | | 2901 | null | | null | null | null | | 2902 | null | | null | null | null | | 2903 | null | | null | null | null | | 2904 | null | | null | null | null | | 2905 | null | | null | null | null | | 2906 | null | | null | null | null | | 2907 | null | | null | null | null | | 2908 | null | | null | null | null | | 2909 | null | | null | null | null | | 2910 | null | | null | null | null | | 2911 | null | | null | null | null | | 2912 | null | | null | null | null | | 2913 | null | | null | null | null | | 2914 | null | | null | null | null | | 2915 | null | | null | null | null | | 2916 | null | | null | null | null | | 2917 | null | | null | null | null | | 2918 | null | | null | null | null | | 2919 | null | | null | null | null | | 2920 | null | | null | null | null | | 2921 | null | | null | null | null | | 2922 | null | | null | null | null | | 2923 | null | | null | null | null | | 2924 | null | | null | null | null | | 2925 | null | | null | null | null | | 2926 | null | | null | null | null | | 2927 | null | | null | null | null | | 2928 | null | | null | null | null | | 2929 | null | | null | null | null | | 2930 | null | | null | null | null | | 2931 | null | | null | null | null | | 2932 | null | | null | null | null | | 2933 | null | | null | null | null | | 2934 | null | | null | null | null | | 2935 | null | | null | null | null | | 2936 | null | | null | null | null | | 2937 | null | | null | null | null | | 2938 | null | | null | null | null | | 2939 | null | | null | null | null | | 2940 | null | | null | null | null | | 2941 | null | | null | null | null | | 2942 | null | | null | null | null | | 2943 | null | | null | null | null | | 2944 | null | | null | null | null | | 2945 | null | | null | null | null | | 2946 | null | | null | null | null | | 2947 | null | | null | null | null | | 2948 | null | | null | null | null | | 2949 | null | | null | null | null | | 2950 | null | | null | null | null | +---------+------------+--------------+----------------+----------------+--------------+ 51 rows selected (0.034 seconds)*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-07 12:25:53 659 0
1 条回答
写回答
取消 提交回答
  • hi、phoenix类型和hbase类型有一部分不兼容:1、使用phoenix建表并通过phoenix jdbc插入数据;2、使用hbase api建表并插入数据,然后phoenix映射到现有hbase表;现在是数据通过hbase api存到hbase了,但phoenix反序列化读取时候有问题,或者你建phoenix表的时候数据类型选择兼容hbase原始类型的类型*来自志愿者整理的flink

    2021-12-07 15:44:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载