Kafka Schema-Registry

简介: Kafka Schema-Registry

一、为什么需要Schema-Registry

1.1、注册表

无论是 使用传统的Avro API自定义序列化类和反序列化类 还是 使用Twitter的Bijection类库实现Avro的序列化与反序列化,这两种方法都有一个缺点:在每条Kafka记录里都嵌入了schema,这会让记录的大小成倍地增加。但是不管怎样,在读取记录时仍然需要用到整个 schema,所以要先找到 schema。有没有什么方法可以让数据共用一个schema?


我们遵循通用的结构模式并使用"schema注册表"来达到目的。"schema注册表"的原理如下:

1.把所有写入数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema ID。

2.负责读取数据的应用程序使用 ID 从注册表里拉取 schema 来反序列化记录。

3.序列化器和反序列化器分别负责处理 schema 的注册和拉取。


schema注册表并不属于Kafka,现在已经有一些开源的schema 注册表实现。比如本文要讨论的Confluent Schema Registry。


1.2、为什么使用 Avro

Avro 序列化相比常见的序列化(比如 json)会更快,序列化的数据会更小。相比 protobuf ,它可以支持实时编译,不需要像 protobuf 那样先定义好数据格式文件,编译之后才能使用。


1.3、Confluent Schema-Registry

Confluent公司为了能让 Kafka 支持 Avro 序列化,创建了 Kafka Schema Registry 项目,项目地址为 https://github.com/confluentinc/schema-registry 。对于存储大量数据的 kafka 来说,使用 Avro 序列化,可以减少数据的存储空间提高了存储量,减少了序列化时间提高了性能。 Kafka 有多个topic,里面存储了不同种类的数据,每种数据都对应着一个 Avro schema 来描述这种格式。Registry 服务支持方便的管理这些 topic 的schema,它还对外提供了多个 `restful 接口,用于存储和查找。

二、Confluent Schema-Registry 安装与使用

2.1、安装

1.Schema Registry的各个发现行版本的下载链接

2.上传到linux系统进行解压安装。

3.本教程使用外部以安装好的Kafka集群不使用内部默认的。

4.修改confluent-5.3.1/etc/schema-registry/schema-registry.properties配置文件

# 注册服务器的监听地址及其端口号
listeners=http://0.0.0.0:8081
# 有关连接外部集群的地址有两种方式:1 通过zk连接 2 通过kafka的控制器 。 本教程采用zk连接
kafkastore.connection.url=henghe-042:2181
# The name of the topic to store schemas in
kafkastore.topic=_schemas
# If true, API requests that fail will include extra debugging information, including stack traces
debug=false

5.注册服务器的启动…/…/bin/schema-registry-start -daemon …/…/etc/schema-registry/schema-registry.properties

2.2、RestAPI使用

# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key/versions
  {"id":1}
# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
     http://localhost:8081/subjects/Kafka-value/versions
  {"id":1}
# List all subjects
$ curl -X GET http://localhost:8081/subjects
  ["Kafka-value","Kafka-key"]
# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
  [1]
# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1
  {"schema":"\"string\""}
# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}
# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}
# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/3
  3
# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
  [1, 2, 3, 4, 5]
# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key
  {"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}
# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
  {"is_compatible":true}
# Get top level config
$ curl -X GET http://localhost:8081/config
  {"compatibilityLevel":"BACKWARD"}
# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://localhost:8081/config
  {"compatibility":"NONE"}
# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "BACKWARD"}' \
    http://localhost:8081/config/Kafka-value
  {"compatibility":"BACKWARD"}

2.2、Java 代码

2.2.0、注册

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}"}' \
http://localhost:8081/subjects/chb_test/versions

2.2.1、添加依赖

我们需要 confluent-common 目录下的common-config-4.1.1.jar、common-utils-4.1.1.jar和全部以jackson开头的 jar 包以及 kafka-serde-tools 目录下的kafka-schema-registry-client-4.1.1.jar和kafka-avro-serializer-4.1.1.jar


将本地jar包导入到本地仓库

mvn install:install-file -Dfile=G:\迅雷下载\kafka-avro-serializer-6.2.0.jar -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=6.2.0  -Dpackaging=jar

添加到pom.xml

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.3.0</version>
    </dependency>
    <!--此依赖是通过本地依赖库导入的,有关如何把jar放入本地依赖库自行搜索-->
    <!--本人的jar文件是在编译源码时自动到依赖库中的所以直接引用-->
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-avro-serializer</artifactId>
      <version>5.3.2</version>
    </dependency>
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-schema-registry-client</artifactId>
      <version>5.3.2</version>
    </dependency>
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>common-config</artifactId>
      <version>5.3.2</version>
    </dependency>
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>common-utils</artifactId>
      <version>5.3.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.8.2</version>
    </dependency>
    <!-- jaskson start -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.9.10</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.9.10</version>
    </dependency>
    <!-- jaskson end -->

2.2.2、Kafka 客户端使用原理

Kafka Schema Registry 提供了 KafkaAvroSerializer 和 KafkaAvroDeserializer 两个类。Kafka 如果要使用 Avro 序列化, 在实例化 KafkaProducer 和 KafkaConsumer 时, 指定序列化或反序列化的配置。


客户端发送数据的流程图如下所示:


2.2.2.1、KafkaProducer
package com.chb.common.kafka.schema;
import java.util.Properties;
import java.util.Random;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ConfluentProducer {
    public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +
            "\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " +
            "{\"name\": \"name\",  \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:6667");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 使用Confluent实现的KafkaAvroSerializer
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://localhost:8081");
        Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);
        Random rand = new Random();
        int id = 0;
        while (id < 100) {
            id++;
            String name = "name" + id;
            int age = rand.nextInt(40) + 1;
            GenericRecord user = new GenericData.Record(schema);
            user.put("id", id);
            user.put("name", name);
            user.put("age", age);
            ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("test-topic", user);
            System.out.println(user);
            producer.send(record);
            Thread.sleep(1000);
        }
        producer.close();
    }
}
2.2.2.2、KafkaConsumer
package com.chb.common.kafka.schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConfluentConsumer {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:6667");
        props.put("group.id", "test1");
        props.put("enable.auto.commit", "false");
        // 配置禁止自动提交,每次从头消费供测试使用
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 使用Confluent实现的KafkaAvroDeserializer
        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        // 添加schema服务的地址,用于获取schema
        props.put("schema.registry.url", "http://localhost:8081");
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));
        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    GenericRecord user = record.value();
                    System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
                            + user.get("name") + ", " + "user.age = " + user.get("age") + "], "
                            + "partition = " + record.partition() + ", " + "offset = " + record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}
2.2.2.3、Consumer的消费结果
value = [user.id = 98, user.name = name98, user.age = 38], partition = 0, offset = 97
value = [user.id = 99, user.name = name99, user.age = 30], partition = 0, offset = 98
value = [user.id = 100, user.name = name100, user.age = 39], partition = 0, offset = 99
相关文章
|
5月前
|
消息中间件 Kafka
Kafka Config
Kafka Config
26 0
|
8月前
|
消息中间件 容灾 Kafka
【Kafka】Kafka 中的 Geo-Replication 是什么?
【4月更文挑战第11天】【Kafka】Kafka 中的 Geo-Replication 是什么?
|
8月前
|
消息中间件 存储 缓存
从0开始回顾Kafka---系列三
消费者只能拉取到这个 offset 之前的消息。
|
8月前
|
消息中间件 存储 容灾
从0开始回顾Kafka---系列一
2、 Kafka有哪些优点和缺点? 优点: 1. 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒。 2. 可扩展性:kafka集群支持水平扩展。 3. 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。 4. 容错性:允许集群中节点故障(若副本数量为n,则允许n-1个节点故障)。 5. 高并发:支持数千个客户端同时读写。 缺点: 1. 同步收发消息的响应时延比较高,因为当客户端发送一条消息的时候,Kafka 并不会立即发送出去,而是要等一会儿攒一批再发送。 2. Kafka 不太适合在线业务场景,由于是批量发送,所以数据达不到真正的实时。 3.
|
8月前
|
消息中间件 存储 安全
从0开始回顾Kafka---系列二
生产者 1、 Kafka 中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么? 分区器 ● 消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。 ● 如果消息 ProducerRecord 中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。 序列化器 ● 生产者需要用序列化器(Serializer)把对象转换成字节数组才能通
|
消息中间件 Kafka Serverless
ffc的kafka触发器和kafka的connector的主要区别
c的kafka触发器和kafka的connector的主要区别
80 1
kafka-manager
kafka-manager
56 0
|
消息中间件 Kafka Linux
kafka3.0创建topic出现zookeeper is not a recognized option
kafka3.0创建topic出现zookeeper is not a recognized option
276 0
|
消息中间件 监控 Kafka
Apache Kafka-使用Kafak Tool 查看Kafka中的数据
Apache Kafka-使用Kafak Tool 查看Kafka中的数据
420 0
|
消息中间件 Java Kafka
Apache Kafka-CMAK(kafka manager)安装部署使用
Apache Kafka-CMAK(kafka manager)安装部署使用
441 0