pyflink在读取hdfs文件的时候如何使用通配符?

def read_csv_file_example(input_path):
env = StreamExecutionEnvironment.get_execution_environment()

# env.set_parallelism(2)

settings = EnvironmentSettings.new_instance().in_streaming_mode().build() # in_streaming_mode in_batch_mode
t_env = StreamTableEnvironment.create(env, settings)

# 读取 csv 文件
input_descriptor = TableDescriptor.for_connector("filesystem") \
    .option("path", input_path) \
    .format("csv") \
    .option("csv.ignore-parse-errors", "true") \
    .option("csv.field-delimiter", ",") \
    .schema(
    Schema.new_builder()
        .column("device_id", DataTypes.STRING())
        .column("user_id", DataTypes.STRING())
        .column("event_time", DataTypes.TIMESTAMP(3))
        .watermark("event_time", "event_time - INTERVAL '5' SECOND")  # 定义水印生成规则:每个事件的水印 = 该事件的事件时间 - 5 秒
        .build()
).build()

# 注册为临时表
t_env.create_temporary_table('device_events', input_descriptor)

# 创建 Table 对象
csv_table = t_env.from_path('device_events')

# 查看 Schema
csv_table.print_schema()
'''
(
  `device_id` STRING,
  `user_id` STRING,
  `event_time` TIMESTAMP(3) *ROWTIME*,
  WATERMARK FOR `event_time`: TIMESTAMP(3) AS event_time - INTERVAL '5' SECOND
)
'''

# 执行查询
csv_table.limit(20).execute().print()

if name == 'main':
inputpath = "hdfs://10.130.33.76:9000/flink/data2/input*.csv"
read_csv_file_example(input_path)

展开
收起
游客i3vs243rqyvl2 2026-01-21 17:58:34 16 分享 版权
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理