Flink sql方式可以直接写HDFS吗?不要通过Hive实现,有的帮忙发个demo,谢了
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
Flink SQL 可以直接写入 HDFS,而无需通过 Hive 实现。以下是实现方式和一个示例代码。
在 Flink 作业中,可以通过 WITH
子句指定 HDFS 的连接信息。例如: - connector
:指定为 filesystem
。 - path
:指定 HDFS 文件路径。 - format
:指定文件格式(如 CSV、JSON、Parquet 等)。
以下是一个将数据写入 HDFS 的完整示例:
-- 创建源表(假设从 Kafka 中读取数据)
CREATE TABLE source_table (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'input_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- 创建目标表(写入 HDFS)
CREATE TABLE sink_table (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://<namenode-host>:<port>/user/flink/output', -- 替换为实际 HDFS 路径
'format' = 'csv' -- 指定文件格式
);
-- 插入数据到目标表
INSERT INTO sink_table
SELECT id, name, age
FROM source_table;
connector
:指定为 filesystem
,表示使用文件系统作为存储介质。path
:HDFS 的路径,需替换为实际的 NameNode 地址和端口。format
:支持多种文件格式,如 csv
、json
、parquet
等,根据需求选择。core-site.xml
和 hdfs-site.xml
文件。如果需要写入 OSS-HDFS 服务,可以将 path
替换为 oss://
前缀的路径,并配置 OSS 的 Endpoint 和访问密钥。例如:
'path' = 'oss://<bucket-name>.<oss-hdfs-endpoint>/user/flink/output',
'format' = 'csv',
'fs.oss.accessKeyId' = '<your-access-key-id>',
'fs.oss.accessKeySecret' = '<your-access-key-secret>',
'fs.oss.endpoint' = '<your-oss-endpoint>'
通过上述方法,您可以直接使用 Flink SQL 将数据写入 HDFS 或 OSS-HDFS,而无需依赖 Hive。