各位大佬,有个问题困扰了很久。用flink实现机器学习实时计算,输入数据通过kafka传来的特征数据,计算通过MapFunction,结果为数据的序号和预测结果,输出一方面希望到kafka供后续调用,给前端实时显示预测结果,一方面希望到mysql落地。 在最后数据输出一直不成功,希望通过kafka接受,但是总是失败: Query schema: [f0: RAW('[B', '...')] Sink schema: [a: INT, b: FLOAT]
大概是说我出来的结构是RAW,而实际sink是INT,FLOAT.具体代码如下,请各位大神指导。
import pickle from sklearn.linear_model import LinearRegression from pyflink.common.serialization import JsonRowSerializationSchema from pyflink.datastream import StreamExecutionEnvironment, FilterFunction from pyflink.datastream.connectors import NumberSequenceSource from pyflink.datastream.functions import RuntimeContext, MapFunction,CoMapFunction
####预测结果为yy_pre_predict[0],数据的序号为value[0]。序号很重要,需要和预测结果一起出现 class PreMap(MapFunction): def open(self, runtime_context: RuntimeContext): self.load_model=pickle.load(open("/home/flinkdata/lr.pkl","rb")) def flat_map(self, value): xx_pre= pd.DataFrame({'RM':[value[1]], 'LSTAT':[value[2]], 'PTRATIO':[value[3]]}) yy_pre_predict=self.load_model.predict(xx_pre) return value[0],yy_pre_predict[0] ###构建环境 env = StreamExecutionEnvironment.get_execution_environment() #env.set_parallelism(1) env.add_jars("file:///home/flinkdata/flink-sql-connector-kafka-1.15.0.jar") t_env = StreamTableEnvironment.create(stream_execution_environment=env)
###source来自kafka
t_env.execute_sql(""" CREATE TABLE source ( MYNO INT, RM float, LSTAT float, PTRATIO float ) WITH ( 'connector' = 'kafka', 'topic' = 'bostonsource', 'properties.bootstrap.servers' = '192.168.15.111:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ) """)
###载入source re=t_env.from_path('source') preds=t_env.to_append_stream(re,type_info=Types.TUPLE([Types.INT(),Types.FLOAT(),Types.FLOAT(),Types.FLOAT()]))
###调用PreMap实现预测生成流 final_stream = preds.map(PreMap(),result_type=Types.ROW([Types.INT(),Types.FLOAT()]))
如何将结果final_stream输出?我使用的办法是 t_env.execute_sql(""" CREATE TABLE my_sink ( MYNO INT, PRE FLOAT) WITH ( 'connector' = 'kafka',
'topic' = 'bostonresult',
'properties.bootstrap.servers' = '192.168.15.111:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv' ) """)
final_table=t_env.from_data_stream(final_stream) final_table.insert_into("my_sink") t_env.execute()
出现了前面提到的报错:
Query schema: [f0: RAW('[B', '...')] Sink schema: [a: INT, b: FLOAT]
我尝试直接sink到kafka
from pyflink.common.serialization import SimpleStringSchema from pyflink.datastream.connectors.kafka import KafkaRecordSerializationSchema, KafkaSink from pyflink.datastream import DeliveryGuarantee
brokers = "localhost:9092" topic = "sinktest"
stream = env.from_collection([(1, "Hello"), (2, "World")])
sink = KafkaSink.builder()
.set_bootstrap_servers(brokers)
.set_record_serializer( KafkaRecordSerializationSchema.builder() .set_topic(topic) .set_value_serialization_schema(SimpleStringSchema()) .build() )
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
stream.add_sink(sink)
env.execute()
但是报错from pyflink.datastream import DeliveryGuarantee 无法加载
一直没有好的办法对预测数据的SINK,请各位大神赐教,并附完整的代码,谢谢
这个问题涉及到PyFlink中的序列化和反序列化问题。看起来你的代码中的问题在于你的模型预测生成的数据类型(RAW)和你设置的Kafka数据流接收的数据类型(INT, FLOAT)之间的不匹配。在这个问题上,我有两个建议。
第一个建议是检查一下你的模型预测的输出数据类型。从你的代码中可以看出,你的模型预测的输出应该是一个浮点数,但是报错信息指出输出类型是RAW。你可以尝试在PreMap
类的flat_map
函数中,明确地将yy_pre_predict[0]
转换为float类型,如下所示:
return value[0], float(yy_pre_predict[0])
第二个建议是在创建Kafka数据流的时候,使用正确的序列化和反序列化模式。你可以使用JsonRowSerializationSchema
来定义Kafka数据流的序列化模式,这样可以确保数据流接收到的数据类型与你的模型预测的输出类型匹配。以下是一个使用JsonRowSerializationSchema
的示例:
from pyflink.common.serialization import JsonRowSerializationSchema, JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
env = StreamExecutionEnvironment.get_execution_environment()
deserialization_schema = JsonRowDeserializationSchema.builder()\
.type_info(type_info=Types.ROW([Types.INT(), Types.FLOAT()]))\
.build()
serialization_schema = JsonRowSerializationSchema.builder()\
.type_info(type_info=Types.ROW([Types.INT(), Types.FLOAT()]))\
.build()
kafka_consumer = FlinkKafkaConsumer(
topics='bostonsource',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': '192.168.15.111:9092', 'group.id': 'test_group'}
)
kafka_producer = FlinkKafkaProducer(
topic='bostonresult',
serialization_schema=serialization_schema,
properties={'bootstrap.servers': '192.168.15.111:9092'}
)
stream = env.add_source(kafka_consumer)
# Do your map operation here.
stream.add_sink(kafka_producer)
env.execute()
以上代码定义了从Kafka中读取和写入数据的数据流。首先,创建一个JsonRowDeserializationSchema
和JsonRowSerializationSchema
,这两个对象定义了数据流中数据的数据类型。然后,创建一个FlinkKafkaConsumer
和FlinkKafkaProducer
,并将前面定义的反序列化和序列化模式应用于这两个对象。
您可以使用 Flink 的 KafkaSink 将预测结果输出到 Kafka 中,然后使用 Flink 的 JDBC Connector 将数据写入 MySQL 数据库。
关于您提到的报错,可能是因为您的 Flink 版本不支持 DeliveryGuarantee,您可以尝试使用 Flink 1.13 版本或更高版本,该版本已经支持 DeliveryGuarantee。
下面是一个简单的示例代码,演示如何将 Flink 的机器学习模型预测结果输出到 Kafka 和 MySQL 中:
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import TableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime, FileSystem
from pyflink.table.udf import udf
from sklearn.linear_model import LinearRegression
import numpy as np
import json
# 定义预测函数
def predict(features):
# 加载模型
model = LinearRegression()
model.load('model.pkl')
# 进行预测
features = np.array(features)
features = features.reshape(1, -1)
prediction = model.predict(features)
return prediction[0]
# 创建执行环境
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = TableEnvironment.create(env)
# 定义输入数据格式
input_schema = Schema()
input_schema.field("event_time", DataTypes.TIMESTAMP())
input_schema.field("id", DataTypes.BIGINT())
input_schema.field("feature1", DataTypes.DOUBLE())
input_schema.field("feature2", DataTypes.DOUBLE())
input_schema.field("feature3", DataTypes.DOUBLE())
# 定义输出数据格式
output_schema = Schema()
output_schema.field("id", DataTypes.BIGINT())
output_schema.field("prediction", DataTypes.DOUBLE())
# 定义输入表
t_env.connect(
Kafka()
.version("universal")
.topic("input_topic")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
).with_format(
Json()
.fail_on_missing_field(True)
.json_schema(
"{" +
"\"type\": \"object\", " +
"\"properties\": {" +
"\"event_time\": {\"type\": \"string\", \"format\": \"date-time\"}, " +
"\"id\": {\"type\": \"integer\"}, " +
"\"feature1\": {\"type\": \"number\"}, " +
"\"feature2\": {\"type\": \"number\"}, " +
"\"feature3\": {\"type\": \"number\"}" +
"}" +
"}"
)
.timestamp_format("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", "UTC")
.for_proctime()
).with_schema(input_schema).create_temporary_table("input_table")
# 定义输出表
t_env.connect(
Kafka()
.version("universal")
.topic("output_topic")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
).with_format(
Json()
.fail_on_missing_field(True)
.json_schema(
"{" +
"\"type\": \"object\", " +
"\"properties\": {" +
"\"id\": {\"type\": \"integer\"}, " +
"\"prediction\": {\"type\": \"number\"}" +
"}" +
"}"
)
).with_schema(output_schema).create_temporary_table("output_table")
# 定义预测函数
predict_udf = udf(predict, DataTypes.DOUBLE(), DataTypes.ARRAY(DataTypes.DOUBLE(), 3))
# 执行查询
result = t_env.from_path("input_table") \
.select("id, " + predict_udf("feature1", "feature2", "feature3").alias("prediction")) \
.insert_into("output_table")
# 输出到 Kafka
kafka_producer = FlinkKafkaProducer(
topic="output_topic",
serialization_schema=SimpleStringSchema(),
producer_config={
"bootstrap.servers": "localhost:9092",
"acks": "all"
}
)
result.add_sink(kafka_producer)
# 输出到 MySQL
t_env.connect(
FileSystem().path("/path/to/mysql-jdbc-connector.jar")
).with_format(
"jdbc",
driver="com.mysql.jdbc.Driver",
url="jdbc:mysql://localhost:3306/mydb",
table="output_table"
).with_schema(output_schema).create_temporary_table("mysql_output_table")
t_env.insert_into("mysql_output_table", result)
# 执行任务
env.execute()
在上面的代码中,我们首先定义了一个 predict
函数,用于加载模型并进行预测。然后,我们使用 Flink Table API 定义了输入和输出表的格式,并使用 UDF 将预测函数应用到输入表中的特征字段上,生成输出表。接着,我们使用 Flink Kafka Producer 将输出表中的数据写入到 Kafka 中,并使用 Flink JDBC Connector 将数据写入到 MySQL 数据库中。
需要注意的是,如果您的预测函数返回的数据类型与输出表的数据类型不一致,可能会出现报错。因此,您需要根据实际情况调整输出表的数据类型,使其与预测函数的输出数据类型一致。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。