开发者社区> 问答> 正文

如何使用Spark结构化流媒体为Kafka流实现自定义反序列化?

我正在尝试迁移我当前的流媒体应用程序,这是基于使用RDD(从他们的文档)到他们使用结构化流媒体的新数据集API,我被告知这是使用Spark进行实时流媒体的首选方法。

目前,我的应用程序设置使用了一个名为“SATELLITE”的主题,该主题包含一个包含密钥时间戳和包含SatellitePOJO的值的消息。但是我在弄清楚如何为此实现反序列化器时遇到了问题。在我当前的应用程序中很简单,你只需在你的kafka属性贴图中添加一行kafkaParams.put("value.deserializer", SatelliteMessageDeserializer.class); 我在Java中这样做,这是一个最大的挑战,因为所有解决方案似乎都在Scala中,我不能理解,我不能轻易地将Scala代码转换为Java代码。

我在这个问题中概述了一个JSON示例,该示例目前有效,但对于我需要做的事情似乎过于复杂。鉴于我已经为此目的制作了自定义反序列化器,我不明白为什么我必须首先将它转换为字符串,只是将其转换为JSON,然后将其转换为我想要的类类型。

目前我的应用程序看起来像这样(使用json方法):

import common.model.Satellite;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class SparkStructuredStreaming implements Runnable{

private String bootstrapServers;
private SparkSession session;

public SparkStructuredStreaming(final String bootstrapServers, final SparkSession session) {
    this.bootstrapServers = bootstrapServers;
    this.session = session;
}
@Override
public void run() {
    Dataset<Row> df = session
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", bootstrapServers)
            .option("subscribe", "SATELLITE")
            .load();

    StructType schema =  DataTypes.createStructType(new StructField[] {
            DataTypes.createStructField("id", DataTypes.StringType, true),
            DataTypes.createStructField("gms", DataTypes.StringType, true),
            DataTypes.createStructField("satelliteId", DataTypes.StringType, true),
            DataTypes.createStructField("signalId", DataTypes.StringType, true),
            DataTypes.createStructField("cnr", DataTypes.DoubleType, true),
            DataTypes.createStructField("constellation", DataTypes.StringType, true),
            DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
            DataTypes.createStructField("mountPoint", DataTypes.StringType, true),
            DataTypes.createStructField("pseudorange", DataTypes.DoubleType, true),
            DataTypes.createStructField("epochTime", DataTypes.IntegerType, true)
    });

        Dataset<Satellite> df1 = df.selectExpr("CAST(value AS STRING) as message")
                .select(functions.from_json(functions.col("message"),schema).as("json"))
                .select("json.*")
                .as(Encoders.bean(Satellite.class));

    try {
        df1.writeStream()
                .format("console")
                .option("truncate", "false")
                .start()
                .awaitTermination();

    } catch (StreamingQueryException e) {
        e.printStackTrace();
    }
}

}
我有一个看起来像这样的自定义反序列化器

import common.model.Satellite;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class SatelliteMessageDeserializer implements Deserializer {

private static Logger logger = LoggerFactory.getLogger(SatelliteMessageDeserializer.class);
private ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(Map configs, boolean isKey) {
}

@Override
public void close() {
}

@Override
public Satellite deserialize(String topic, byte[] data) {
    try {
        return objectMapper.readValue(new String(data, "UTF-8"), getMessageClass());
    } catch (Exception e) {
        logger.error("Unable to deserialize message {}", data, e);
        return null;
    }
}

protected Class<Satellite> getMessageClass() {
    return Satellite.class;
}

}
如何在SparkStructuredStreaming课堂上使用我的自定义反序列化器?我使用Spark 2.4,OpenJDK 10和Kafka 2.0

编辑:我已经尝试创建自己的UDF,我认为这应该如何完成,但我不知道如何让它返回一个特定的类型,因为它似乎只允许我使用它们的Datatypes类!

UserDefinedFunction mode = udf(

            (byte[] bytes) -> deserializer.deserialize("", bytes), DataTypes.BinaryType //Needs to be type Satellite, but only allows ones of type DataTypes
    );

Dataset df1 = df.select(mode.apply(col("value")));

展开
收起
社区小助手 2018-12-05 14:34:21 2961 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    from_json 只能在字符串类型的列上工作。

    结构化流式传输始终将Kafka值作为字节使用

    始终使用ByteArrayDeserializer将值反序列化为字节数组。使用DataFrame操作显式反序列化值

    因此,您首先至少要反序列化为String,但我认为您不需要这样做。

    也许可以这样做

    df.select(value).as(Encoders.bean(Satellite.class))
    如果这不起作用,您可以尝试定义自己的UDF /解码器,以便您可以使用类似的东西 SATELLITE_DECODE(value)

    Scala

    object SatelliteDeserializerWrapper {

    val deser = new SatelliteDeserializer

    }
    spark.udf.register("SATELLITE_DECODE", (topic: String, bytes: Array[Byte]) =>

    SatelliteDeserializerWrapper.deser.deserialize(topic, bytes)

    )

    df.selectExpr("""SATELLITE_DECODE("topic1", value) AS message""")

    2019-07-17 23:18:22
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载