各位大佬,请教一个问题: 我通过pyflink1.15,加载机器学习模型后得到了预测数据,并发送到了kafka,结果形式如下: {"f0":478,"f1":17.546555} {"f0":475,"f1":13.629357} {"f0":223,"f1":28.003633} {"f0":334,"f1":25.130732}
其中f0为INT类型,f1为float类型,现在想把结果写入到mysql里面,代码如下,但是报错,请教应该如何写?
env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings)
###从TOPIC-yyyy获取预测结果信息 table_env.execute_sql(""" CREATE TABLE source ( f0 INT, f1 float ) WITH ( 'connector' = 'kafka', 'topic' = 'yyyy', 'properties.bootstrap.servers' = '192.168.15.111:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ) """)
source_table = table_env.from_path("source")
###写入到MYSQL的print_table表 table_env.execute_sql(""" CREATE TABLE sink ( qqq INT, www float ) WITH ( 'connector' = 'jdbc', 'url'='jdbc:mysql://192.168.1.5:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8', 'username'='root', 'driver' = 'com.mysql.cj.jdbc.Driver', 'password'='123456', 'table-name' = 'print_table'
)
""")
source_table.execute_insert("sink").wait()
执行却一直报错,请教大佬如何处理?
要将 Kafka 中的数据写入 MySQL 中,您需要使用 Flink 的 Table API 和 SQL API,以及相应的 Flink Connectors 来连接 Kafka 和 MySQL。
下面是一个简单的示例代码,演示了如何使用 Flink 将 Kafka 中的数据写入 MySQL 中:
scheme Copy from pyflink.table import * from pyflink.table.descriptors import * from pyflink.table.types import DataTypes
env_settings = EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner()
.build()
t_env = TableEnvironment.create(env_settings)
kafka_props = {'bootstrap.servers': 'kafka-broker:9092', 'group.id': 'group1'}
t_env
.connect( Kafka() .version('universal') .topic('input-topic') .start_from_earliest() .properties(kafka_props))
.with_format( Json() .json_schema('{"type":"object","properties":{"f0":{"type":"integer"},"f1":{"type":"float"}}}') .fail_on_missing_field(True))
.with_schema( Schema() .field('f0', DataTypes.INT()) .field('f1', DataTypes.FLOAT()))
.create_temporary_table('input_table')
t_env
.connect( JDBC() .url('jdbc:mysql://localhost:3306/mydb') .table_name('output_table') .driver_name('com.mysql.jdbc.Driver') .username('user') .password('password'))
.with_schema( Schema() .field('f0', DataTypes.INT()) .field('f1', DataTypes.FLOAT()))
.create_temporary_table('output_table')
t_env
.from_path('input_table')
.insert_into('output_table')
t_env.execute('kafka_to_mysql') 在上面的示例中,我们首先创建了一个 Kafka source,使用 Json() 格式来解析 Kafka 中的数据。然后我们将解析得到的数据插入到一个 MySQL sink 中,使用 JDBC() 连接器来连接 MySQL 数据库。
需要注意的是,您需要根据实际情况来修改代码中的连接参数和表名等信息,以及将 Json() 格式中的 json_schema 参数修改为适合您数据格式的 JSON Schema。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。