Avro是一种与编程语言无关的序列化格式
丰富的数据结构
紧凑快速的二进制数据格式
提供容器文件,用来持久化数据
远程过程调用
与动态语言充分集成,代码生成不需要读写数据文件,也不需要实现RPC协议
avro依靠schema
1. 增加pom.xml依赖
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.11.0</version> <executions> <execution> <phase>generate-resources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> </plugins> </build>
2. 定义schema文件
2.1 People.avsc
{ "namespace": "com.nq", "type": "record", "name": "People", "fields" : [ {"name": "name", "type":"string"}, {"name": "age", "type":"int"}, {"name": "hasHouse", "type":"boolean"}, {"name": "children","type":"string" } ] }
2.2 运行mvn avro:schema
生成的java类在项目文件在/target/generated-sources/avro/com/nq/People.java
2.3 复制到自己需要的地方
此外还可以下载avro-tools 生成实体类
3. kafka生产者
import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Properties; public class AvroProducer { public static void main(String[] args) throws IOException { Properties props = new Properties(); String topic = "test-vip"; // 改成自己的 props.put("bootstrap.servers", "kafka-node01:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); ByteArrayOutputStream out = new ByteArrayOutputStream(); SpecificDatumWriter<People> datumWriter = new SpecificDatumWriter<>(People.class); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); for (int i = 0; i < 1000; i++) { out.reset(); People people = new People(); people.setName("达拉崩吧---" + i); people.setAge(i); people.setChildren("chilren===" + i); people.setHasHouse(i % 2 == 0); datumWriter.write(people, encoder); encoder.flush(); ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, "vip-" + i, out.toByteArray()); producer.send(record); } out.close(); producer.close(); } }
4. kafka消费者
import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class AvroConsumer { public static void main(String[] args) { Properties props = new Properties(); String topic = "test-vip"; // 改成自己的 props.put("bootstrap.servers", "kafka-node01:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put("group.id", "avro-test"); props.put("auto.offset.reset","latest"); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props); SpecificDatumReader<People> datumReader = new SpecificDatumReader<>(People.getClassSchema()); consumer.subscribe(Collections.singletonList(topic)); try { while (true){ ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(1000L)); for (ConsumerRecord<String, byte[]> record : records) { BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null); People people = null; try { people = datumReader.read(null, decoder); } catch (IOException e) { e.printStackTrace(); } System.out.println("key: " + record.key()+"\t" + people); } } } finally { consumer.close(); } } }