开发者社区> 问答> 正文

使用 PyFlink Table API 中定义的 connector创建数据源的方法是什么?

使用 PyFlink Table API 中定义的 connector创建数据源的方法是什么?

展开
收起
游客qzzytmszf3zhq 2021-12-07 14:58:52 395 0
1 条回答
写回答
取消 提交回答
  • 以下示例定义了如何将 Table & SQL 中支持的 connector 用于 PyFlink DataStream API 作业。
    
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    
    t_env.execute_sql("""
            CREATE TABLE my_source (
              a INT,
              b VARCHAR
            ) WITH (
              'connector' = 'datagen',
              'number-of-rows' = '10'
            )
        """)
    
    ds = t_env.to_append_stream(
        t_env.from_path('my_source'),
        Types.ROW([Types.INT(), Types.STRING()]))
    
    2021-12-07 14:59:06
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Spring Boot2.0实战Redis分布式缓存 立即下载
CUDA MATH API 立即下载
API PLAYBOOK 立即下载