Kafka+Avro的demo

简介: Kafka+Avro的demo

最近在对消息中间件进行调研,原先项目里使用的是RabbitMQ,理由很简单:对开发语言支持度是最好的,没有之一。但是业界的反馈是,其高并发和分布式支持存在不足~

我就打算再了解一下kafka,看看它是怎么个用法~~

如RabbitMQ一样,kafka也提供了终端脚本来完成基本功能的测试,可以看一下这里。但光玩玩脚本或官方提供的例子,是不能满足我的~

作为把消息队列,让其使用在项目中,除了解决和业务代码进行互动以外,还要考虑数据传输的格式问题,也就是说如何解决producer和consumer之间通信的协议问题。

官方推荐的就是Avro,只可惜我找了半天,都没有一个现成的kafka+Avro的demo供我测试,那就只能自己试着写个了~~

package me.kazaff.mq;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import me.kazaff.mq.avro.Message;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.*;

public class Main {

    public static void main(String[] args){
        try {
            //消息生产
            produce();

            //消息消费
            consume();

        }catch (Exception ex){
            System.out.println(ex);
        }

    }

    private static void produce() throws IOException{
        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");    //key的类型需要和serializer保持一致,如果key是String,则需要配置为kafka.serializer.StringEncoder,如果不配置,默认为kafka.serializer.DefaultEncoder,即二进制格式
        props.put("partition.class", "me.kazaff.mq.MyPartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, byte[]> producer = new Producer<String, byte[]>(config);

        Random rnd = new Random();
        for(int index = 0; index <= 10; index++){
            Message msg = new Message();
            msg.setUrl("blog.kazaff.me");
            msg.setIp("192.168.137." + rnd.nextInt(255));
            msg.setDate(Long.toString(new Date().getTime()));

            DatumWriter<Message> msgDatumWriter = new SpecificDatumWriter<Message>(Message.class);
            ByteArrayOutputStream os = new ByteArrayOutputStream();
            try {
                Encoder e = EncoderFactory.get().binaryEncoder(os, null);
                msgDatumWriter.write(msg, e);
                e.flush();
                byte[] byteData = os.toByteArray();

                KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>("demo", "0", byteData);
                producer.send(data);

            }catch (IOException ex){
                System.out.println(ex.getMessage());
            }finally {
                os.close();
            }
        }
        producer.close();
    }

    private static void consume(){
        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "1");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");

        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put("demo", 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("demo");
        KafkaStream steam = streams.get(0);

        ConsumerIterator<byte[], byte[]> it = steam.iterator();
        while (it.hasNext()){
            try {
                DatumReader<Message> reader = new SpecificDatumReader<Message>(Message.class);
                Decoder decoder = DecoderFactory.get().binaryDecoder(it.next().message(), null);
                Message msg = reader.read(null, decoder);

                System.out.println(msg.getDate() + "," + msg.getUrl() + "," + msg.getIp());

            }catch (Exception ex){
                System.out.println(ex);
            }
        }

        if(consumer != null)
            consumer.shutdown();
    }
}

PS:这只是个测试的例子,存在各种问题,不建议直接使用在项目中。

为了方便大家直接测试,我把pom.xml也贴出来:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me.kazaff.mq</groupId>
    <artifactId>kafkaProducer</artifactId>
    <version>0.0.1</version>
    <packaging>jar</packaging>
    <name>kafkaProducer</name>
    <description>The Producer of message, use Avro to encode data, and send to the kafka.</description>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.8.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.7.6-cdh5.2.5</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.7.6</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>cloudera-repo-releases</id>
            <url>https://repository.cloudera.com/artifactory/repo/</url>
        </repository>
    </repositories>
</project>

下面是Avro使用的Schema:

{
    "namespace": "me.kazaff.mq.avro",
    "type": "record",
    "name": "Message",
    "fields": [
        {
            "name": "date",
            "type": "string"
        },
        {
            "name": "url",
            "type": "string"
        },
        {
            "name": "ip",
            "type": "string"
        }
    ]
}

代码中使用的MyPartitioner其实很sb:

package me.kazaff.mq;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

/**
 * Created by kazaff on 2015/4/21.
 */
public class MyPartitioner implements Partitioner {
    public MyPartitioner(VerifiableProperties props){}

    public int partition(Object key, int numPartitions){
        return 0;
    }
}

需要注意的是,我是先使用Maven根据配置的plugin,把声明的Schema先处理生成对应的Class文件,然后再进行运行测试的~

问题


1.

java.lang.ClassCastException: [B cannot be cast to java.lang.String

解决方法很简单:

props.put("serializer.class", "kafka.serializer.DefaultEncoder");

这样,kafka就默认使用二进制的序列化方案处理Avro的编码结果了。

2.

java.lang.ClassCastException: java.lang.String cannot be cast to [B

这个问题是最恶心的,搜了半天都没有找到原因,原因是问题1中那么设置后,Kafka所有的数据序列化方式都成了二进制方式,包括我们后面要使用的“key”(用于kafka选择分区的线索)。

所以你还需要再加一条配置:

props.put("key.serializer.class", "kafka.serializer.StringEncoder");

单独设置一下“key”的序列化方式,这样就可以编译运行了~~


初尝Kafka和Avro,就这么点儿要记录的,不要嫌少哟~~

相关文章
|
6月前
|
消息中间件 Java Kafka
kafka入门demo
kafka入门demo
74 0
|
4月前
|
消息中间件 存储 资源调度
实时计算 Flink版产品使用问题之在消费Kafka的Avro消息,如何配置FlinkKafka消费者的相关参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 网络协议 Java
springboot+netty+kafka实现设备信息收集(完整demo复制可用)
springboot+netty+kafka实现设备信息收集(完整demo复制可用)
92 0
|
消息中间件 Java Kafka
kafka 客户端使用Avro序列化
kafka 客户端使用Avro序列化
203 0
|
消息中间件 SQL JSON
Blink流式计算-Kafka接入demo
//定义解析Kakfa message的UDTF CREATE FUNCTION myParse AS 'com.xxxxxx.MyKafkaUDTF'; CREATE FUNCTION myUdf AS 'com.xxxxxxx.MyWaterMarkUDTF'; //注意:kafka源表DDL字段必须与以下例子一致 create table my_input (
1578 0
|
消息中间件 Kafka API
Kafka原理解析-旧版本0.8高级Api的Demo和配置信息获取技巧
旧版本高级Api封装: package xxxxxx; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; public class KafkaProducerTest implements Runnable {
562 0
|
消息中间件 Kafka
kafka strom elasticsearch demo
https://github.com/mvalleavila/Kafka-Storm-ElasticSearch
780 0
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
46 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
268 9