开发者社区> 问答> 正文

如何创建AvroDeserialzationSchema并在Flink Kafka Consumer中使用?

为kafka avro序列化主题创建一个flink消费者。我有kafka主题流avro序列化数据。我可以通过avroconsoleconsumer看到它。

Flink 1.6.0添加了AvroDeserializationSchema,但我找不到它的用法的完整示例。有一些生成一个avrodeserialization类似乎在1.6.0之前添加了类。

我有一个通过avro工具生成的avro类。

现在我一直在尝试跟随存在的例子,但它们不同,我不能把事情搞定。(我不经常用Java编程)

大多数人使用以下某种形式

Myclass mc = new MyClass();
AvroDeserializationSchema ads = new AvroDeserializationSchema<> (Myclass.class);
FlinkKafkaConsumer010 kc = new FlinkKafkaConsumer010<>(topic,ads,properties);
其中Myclass是通过avro-tools jar生成的avro类。这是正确的方法吗?在执行此操作并利用内部flink 1.6.0 avrodeserializationschema类时,我遇到了一些私有/公共访问问题。我是否必须创建一个新类并扩展avrodeserializationschema?

展开
收起
社区小助手 2018-12-11 16:30:59 7667 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    我挖掘了kafka消费者javadocs并找到了一个消耗avro流的例子。我仍然需要将kafka消费转换为flinkKafkaConsumer,但下面的代码有效。

    对于io.confluent引用工作,我必须添加存储库和pom文件的依赖项。


    confluent

    <url>http://packages.confluent.io/maven/</url>

    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>3.1.1</version>

    public class StreamingJob {

    // static DeserializationSchema avroSchema = new AvroDeserializationSchema(pendingsv.class);

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "opssupport.alarms");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", "http://localhost:8081");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        String topic = "pendingSVs_";
        final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
        consumer.subscribe(Arrays.asList(topic));
    
        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    
    // execute program
    //env.execute("Flink Streaming Java API Skeleton");

    }

    2019-07-17 23:19:52
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载