开发者社区> 问答> 正文

Table Api连接kafka系统,能否把整条的kafka消息看成是一个table字段进行处理?

目前在学习使用pyflink的Table api,请教一个问题: 1、Table Api连接kafka系统,能否把整条的kafka消息看成是一个table字段进行处理?比如,kafka topic连的消息为一个json字符串,把这个字符串整体当做是一个字段,这样可以方便使用 pyflink 的udf函数对消息进行处理转换等操作? 2、如果以上可行,连接kafka的数据格式如何设置,即with_format如何设置,目前官网这方便的资料较少。

新手入门,请多指教,感谢。

来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档

展开
收起
雪哥哥 2021-12-04 17:11:09 817 0
1 条回答
写回答
取消 提交回答
  • 你好, 你想问的应该是如何把kafka里面的一整个json数据当成一个string读进来,然后不做任何format解析对吧。如果是这样的话,我的理解是,你首先不能用json format,需要使用csv format,然后你得指定一个field_delimiter,默认的是逗号,你得换一个,比如\n,要不然就会把你的json字符串数据按照都厚给切分开了。我刚刚用descriptor试验了一下,没有问题。你可以试试。

    下面是我整个PyFlink读取json串进来然后解析数据中time字段的作业 def str_func(str_param): import json return json.loads(str_param)['time']

    s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_parallelism(1) s_env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) st_env = StreamTableEnvironment.create(s_env) result_file = "/tmp/slide_row_window_streaming.csv" if os.path.exists(result_file): os.remove(result_file) st_env
    .connect( # declare the external system to connect to Kafka() .version("0.11") .topic("user") .start_from_earliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") )
    .with_format( # declare a format for this system Csv() .schema(DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.STRING()) ])) .field_delimiter('\n') )
    .with_schema( # declare the schema of the table Schema() .field("a", DataTypes.STRING()) )
    .in_append_mode()
    .register_table_source("source")

    st_env.register_function( "str_func", udf(str_func, [DataTypes.STRING()], DataTypes.STRING())) st_env.register_table_sink("sink", CsvTableSink(["a"], [DataTypes.STRING()], result_file))

    st_env.scan("source").select("str_func(a)").insert_into("sink")

    kafka里面的数据 {"a": "a", "b": 1, "c": 1, "time": "2013-01-01T00:14:13Z"}

    {"a": "b", "b": 2, "c": 2, "time": "2013-01-01T00:24:13Z"} {"a": "a", "b": 3, "c": 3, "time": "2013-01-01T00:34:13Z"} {"a": "a", "b": 4, "c": 4, "time": "2013-01-01T01:14:13Z"} {"a": "b", "b": 4, "c": 5, "time": "2013-01-01T01:24:13Z"} {"a": "a", "b": 5, "c": 2, "time": "2013-01-01T01:34:13Z"}

    最后Csv里面的结果数据为 2013-01-01T00:14:13Z 2013-01-01T00:24:13Z 2013-01-01T00:34:13Z 2013-01-01T01:14:13Z 2013-01-01T01:24:13Z 2013-01-01T01:34:13Z

    来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档

    2021-12-04 17:18:13
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Spring Boot2.0实战Redis分布式缓存 立即下载
CUDA MATH API 立即下载
API PLAYBOOK 立即下载