def read_csv_file_example(input_path):
env = StreamExecutionEnvironment.get_execution_environment()
# env.set_parallelism(2)
settings = EnvironmentSettings.new_instance().in_streaming_mode().build() # in_streaming_mode in_batch_mode
t_env = StreamTableEnvironment.create(env, settings)
# 读取 csv 文件
input_descriptor = TableDescriptor.for_connector("filesystem") \
.option("path", input_path) \
.format("csv") \
.option("csv.ignore-parse-errors", "true") \
.option("csv.field-delimiter", ",") \
.schema(
Schema.new_builder()
.column("device_id", DataTypes.STRING())
.column("user_id", DataTypes.STRING())
.column("event_time", DataTypes.TIMESTAMP(3))
.watermark("event_time", "event_time - INTERVAL '5' SECOND") # 定义水印生成规则:每个事件的水印 = 该事件的事件时间 - 5 秒
.build()
).build()
# 注册为临时表
t_env.create_temporary_table('device_events', input_descriptor)
# 创建 Table 对象
csv_table = t_env.from_path('device_events')
# 查看 Schema
csv_table.print_schema()
'''
(
`device_id` STRING,
`user_id` STRING,
`event_time` TIMESTAMP(3) *ROWTIME*,
WATERMARK FOR `event_time`: TIMESTAMP(3) AS event_time - INTERVAL '5' SECOND
)
'''
# 执行查询
csv_table.limit(20).execute().print()
if name == 'main':
inputpath = "hdfs://10.130.33.76:9000/flink/data2/input*.csv"
read_csv_file_example(input_path)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。