kafka快速入门1

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: kafka快速入门1

1 kafka

1.1 kafka介绍

Kafka 是一个分布式流媒体平台

kafka官网:http://kafka.apache.org/

(1)流媒体平台有三个关键功能:

  • 发布和订阅记录流,类似于消息队列或企业消息传递系统。
  • 容错的持久方式存储记录流
  • 记录发生时处理流。

(2)Kafka通常用于两大类应用:

  • 构建可在系统或应用程序之间可靠获取数据的实时流数据管道
  • 构建转换或响应数据流的实时流应用程序

(3)kafka名词解释

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • onsumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

1.2 kafka安装和配置

1.2.1 jdk环境

首先需要安装Java环境,同时配置环境变量

1.2.2 zookeeper安装

Zookeeper是安装Kafka集群的必要组件,Kafka通过Zookeeper来实施对元数据信息的管理,包括集

群、主题、分区等内容。

同样在官网下载安装包到指定目录解压缩

ZooKeeper 官网: http://zookeeper.apache.org/

在今天提供的资料中,有一个zookeeper-3.4.14.tar.gz包,上传到服务器,也可以到官网上下载

(1)解压压缩包

tar zxvf zookeeper-3.4.14.tar.gz


2)修改配置文件,进入安装路径conf目录,并将zoo_sample.cfg文件修改为zoo.cfg

cd zookeeper-3.4.14  #进入安装目录
cd conf   #进入配置目录
mv zoo_sample.cfg zoo.cfg  # 把文件改名

(3)创建存放数据的目录 data

在zookeeper安装的根目录创建目录 data

mkdir data

创建完的效果如下:

(4)配置数据存储目录

进入conf目录下,编辑zoo.cfg

vi conf/zoo.cfg

修改内容,如下图

(5)启动zookeeper


进入bin目录

./zkServer.sh start # 启动
./zkServer.sh status # 查看状态
./zkServer.sh restart # 重启
./zkServer.sh stop # 关闭

启动后可以查看进行

jps

1.2.3 kafka安装

(1)官网下载

下载地址:http://kafka.apache.org/downloads

也可以在今天的资源文件夹中找到这个安装,直接上传到服务器即可

(2)解压

tar zxvf kafka_2.12-2.2.1.tgz

(3)修改参数

修改config目录下的server.properties文件,效果如下

  • 修改listeners=PLAINTEXT://host:9092
  • log.dirs=/root/kafka_2.12-2.2.1/logs 需要在kafka安装目录新建logs目录

(4)启动kafka

在kafka的根目录

bin/kafka-server-start.sh config/server.properties  #启动kafka

查看进程

注意:启动kafka之前,必须先启动zookeeper

1.3 kafka入门案例

1.3.1 创建工程kafka-demo

创建kafka-demo工程,引入依赖信息

<properties>
    <kafka.client.version>2.0.1</kafka.client.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.client.version}</version>
    </dependency>
</dependencies>

做一个java普通的生产者和消费者只需要依赖kafka-clients即可

1.3.2 消息生产者

创建类:

package com.oldlu.kafka.simple;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
 * Kafka 消息生产者
 */
public class ProducerFastStart {
    // Kafka集群地址
    private static final String brokerList = "192.168.200.130:9092";
    // 主题名称-之前已经创建
    private static final String topic = "kafka-hello";
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 设置key序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);
        // 设置值序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置集群地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        // KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        //封装消息
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Kafka-demo-001", "hello, Kafka!");
        try {
            //发送消息
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }
}
1.3.3 消息消费者

创建消费者类:

package com.oldlu.kafka.simple;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
 * Kafka 消息消费者
 */
public class ConsumerFastStart {
    // Kafka集群地址
    private static final String brokerList = "192.168.200.130:9092";
    // 主题名称-之前已经创建
    private static final String topic = "kafka-hello";
    // 消费组
    private static final String groupId = "group.demo1";
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//分组
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}
1.3.4 测试及结论-重要
  • 生产者发送消息,同一个组中的多个消费者只能有一个消费者接收消息
  • 生产者发送消息,如果有多个组,每个组中只能有一个消费者接收消息,如果想要实现广播的效果,可以让每个消费者单独有一个组即可,这样每个消费者都可以接收到消息
1.3.5 相关概念再介绍

在kafka概述里介绍了概念包括:topic、producer、consumer、broker,这些是最基本的一些概念,想要更深入理解kafka还要知道它的一些其他概念定义:

  • 消息Message

Kafka 中的数据单元被称为消息message,也被称为记录records,可以把它看作数据库表中某一行的记录。

  • topic

Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)

  • 批次

为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。

  • 分区Partition-难点

主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现kafka 的伸缩性。topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个文件进行存储。partition中的数据是有序的,partition之间的数据是没有顺序的。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

  • broker

一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

  • Broker 集群

broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。

  • 副本Replica

Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica);Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica);当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从ISR列表(保持同步的副本列表)中删除,重新创建一个Follower。

  • Zookeeper

kafka对与zookeeper是强依赖的,是以zookeeper作为基础的,即使不做集群,也需要zk的支持。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行重平衡。

  • 消费者群组Consumer Group

生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。

  • 偏移量Consumer Offset

偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

  • 重平衡Rebalance

消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

  • 分组的作用

分不同的组就可以广播让所有分组消费者消费到,简单说就是让一个消息多个消费

1.3.5 生产者详解-理解

(1)发送消息的工作原理

(2)发送类型

  • 发送并忘记(fire-and-forget)

把消息发送给服务器,并不关心它是否正常到达,大多数情况下,消息会正常到达,因为kafka是高可用的,而且生产者会自动尝试重发,使用这种方式有时候会丢失一些信息

  • 同步发送

使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

 //发送消息
  try {
      RecordMetadata recordMetadata = producer.send(record).get();
      System.out.println(recordMetadata.offset());//获取偏移量
  }catch (Exception e){
      e.printStackTrace();
  }

如果服务器返回错误,get()方法会抛出异常,如果没有发生错误,我们就会得到一个RecordMetadata对象,可以用它来获取消息的偏移量.

  • 异步发送

调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数。如下代码

//发送消息
  try {
      producer.send(record, new Callback() {
          @Override
          public void onCompletion(RecordMetadata recordMetadata, Exception e) {
              if(e!=null){
                  e.printStackTrace();
              }
              System.out.println(recordMetadata.offset());
          }
      });
  }catch (Exception e){
      e.printStackTrace();
  }

如果kafka返回一个错误,onCompletion()方法会抛出一个非空(non null)异常,可以根据实际情况处理,比如记录错误日志,或者把消息写入“错误消息”文件中,方便后期进行分析。

(3)参数详解

到目前为止,我们只介绍了生产者的几个必要参数(bootstrap.servers、序列化器等)

生产者还有很多可配置的参数,在kafka官方文档中都有说明,大部分都有合理的默认值,所以没有必要去修改它们,不过有几个参数在内存使用,性能和可靠性方法对生产者有影响

  • acks

指的是producer的消息发送确认机制

  • acks=0
    生产者在成功写入消息之前不会等待任何来自服务器的响应,也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。
  • 不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
  • acks=1

  • 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应,如果消息无法到达首领节点,生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。
  • acks=all
  • 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过他的延迟比acks=1时更高。
  • retries

生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms


目录
相关文章
|
6月前
|
消息中间件 存储 Java
快速入门 Kafka 和 Java 搭配使用
快速入门 Kafka 和 Java 搭配使用
209 0
|
消息中间件 缓存 大数据
Kafka学习---1、Kafka 概述、Kafka快速入门
Kafka学习---1、Kafka 概述、Kafka快速入门
Kafka学习---1、Kafka 概述、Kafka快速入门
|
消息中间件 JSON Java
kafka快速入门2
kafka快速入门2
109 0
|
消息中间件 存储 传感器
macOS 系统 安装 Kafka 快速入门
macOS 系统 安装 Kafka 快速入门
381 0
|
消息中间件 存储 缓存
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(上)
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(上)
|
消息中间件 中间件 Kafka
测试中间件 - Kafka Tool 快速入门(一)
测试中间件 - Kafka Tool 快速入门(一)
472 11
测试中间件 - Kafka Tool 快速入门(一)
|
消息中间件 存储 Java
Kafka快速入门
Kafka快速入门
|
消息中间件 Java Kafka
Kafka快速入门
本文将带您快速的入门Kafka,体验Kafka的基本功能。 安装环境为centos7 jdk1.8 参考官网:http://kafka.apache.org/quickstart
231 57
|
消息中间件 NoSQL 中间件
测试中间件 - Kafka Tool 快速入门(二)
测试中间件 - Kafka Tool 快速入门(二)
233 8
测试中间件 - Kafka Tool 快速入门(二)
|
消息中间件 存储 缓存
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(下)
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(下)

热门文章

最新文章