Kafka的基本概念与安装指南(单机+集群同步)

简介:

最近在搞spark streaming,很自然的前端对接的就是kafka。不过在kafka的使用中还是遇到一些问题,比如mirrormaker莫名其妙的丢失数据[原因稍后再说],消费数据offset错乱[之后介绍spark streaming的时候再解释]

总之,还是遇到了不少的问题。本篇就从下面几个方面介绍一下kafka:

  1. 基本介绍
  2. 安装与helloworld
  3. producer
  4. consumer
  5. mirror maker跨集群同步
  6. 控制台

基本介绍

Kafka是一款分布式的消息队列框架,它由三个重要的部分组成:

  1. Producer 消息的生产者,负责生产消息
  2. Broker 消息的存储,负责消息的持久化与高可用
  3. Consumer 消息的消费者,负责消费消息

大致的结构如下:

消息则是通过topic进行标识,每个topic可以有多个partition分区组成。每一个parition内部消息是按照顺序写入的,所有的partition加起来才是全部的数据,也就是说kafka并不能保证全局有序,只能保证在某一个partition内部是有序的。

消费者消费数据的时候是根据一个叫做offset的游标来记录消费的位置,可以通俗的把它理解成递增的id。

消费者可以由多个组成一个消费者组,同一个消费者组内的数据不会重复消费。不过消费者的数量跟partition的数量是有关系的,如果只有一个partition,那么即便是由10个消费者,同一时间也只能由一个消费者进行消费。

另外,broker是负责消息的持久化,前面提到过消息是通过partition组织在一起的,物理上则是通过一个log文件来记录。如果有一条消息写入,就会追加到log文件的末尾,当大小超过一定的阈值后,就新建一个log文件。如果log文件的修改时间超过一定的阈值,kafka还会清理掉该文件。

原理的东西就简单说这么多,下面来看看安装与体验吧!

安装与hello world

按照官方文档的步骤,是最快的入门方式:

下载安装包

官方下载地址下载安装包,并参照对应的版本的文档即可,下载后执行下面的命令:

> tar -xzf kafka_2.11-0.9.0.0.tgz
> cd kafka_2.11-0.9.0.0

启动zookeeper

如果方便的话,最好自己额外安装zookeeper,或者与其他的组建公用一个zk,否则单独为了kafka运行一个zk还是挺浪费资源的。

> bin/zookeeper-server-start.sh config/zookeeper.properties

最好不要随意修改zk的地址,2181是默认的端口号,如果修改,后面启动kafka会很麻烦,修改的地方会很多。

启动kafka-broker

bin/kafka-server-start.sh config/server.properties

创建主题并查看

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181

启动producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

启动consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

至此,单机版的kafka就搭建完成了!如果要创建kafka的集群,可以直接

producer例子

import kafka.producer.KeyedMessage;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class KafkaProducer {
    private static final String TOPIC = "test"; //kafka创建的topic
    private static final String CONTENT = "This is a single message"; //要发送的内容
    private static final String BROKER_LIST = "xxxx:9092"; //broker的地址和端口
    private static final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder"; // 序列化类

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("serializer.class", SERIALIZER_CLASS);
        props.put("metadata.broker.list", BROKER_LIST);


        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);

        //Send one message.
        KeyedMessage<String, String> message =
                new KeyedMessage<String, String>(TOPIC, CONTENT);
        producer.send(message);

        //Send multiple messages.
        List<KeyedMessage<String,String>> messages =
                new ArrayList<KeyedMessage<String, String>>();
        for (int i = 0; i < 100; i++) {
            messages.add(new KeyedMessage<String, String>
                    (TOPIC, i+"Multiple message at a time. " + i));
        }
        producer.send(messages);
        producer.close();
    }
}

执行后,如果有一个consumer启动,就可以看到消息输出。

consumer例子

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("zookeeper.connect", "xxxx:2181");
        props.put("group.id", "t1");

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put("xxx-topic", 1);
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        Map<String, List<KafkaStream<byte[], byte[]>>> msgStreams = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> msgStreamList = msgStreams.get("test");

        for(KafkaStream stream : msgStreamList){
            ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
            while(iterator.hasNext()) {
                String message = new String(iterator.next().message());
                if(message.contains("xxxx")){
                    System.out.println(message);
                }
            }
        }
    }
}

跨集群同步——mirror maker

如果公司有云环境,可能还涉及到多个集群环境数据的同步。那么官方提供了一个mirrormaker的工具,它其实就是封装了一个consumer和一个producer,把一个集群的数据,直接消费到另一个集群。

代码可以参考github:
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala

文档可以参考:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330

我这里介绍一下它的用法,首先启动的脚本,官方已经封装到kafka解压后的bin目录下。

主要用到了kafka-run-class.sh,kafka-mirror-maker.sh脚本其实就是对它的一层封装:

exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker $@

然后需要创建两个配置文件,分别是consumer的配置文件和producer的配置文件:

consumer.properties

zookeeper.connect=xxxx:2181 
group.id=test-mirror

zookeeper.connect是想要消费的集群的zk地址,group.id是消费者组的id,一定别跟其他的mirrormaker搞到一起哈![这就是我开篇遇到的问题原因]。

producer.properties

zk.connect=localhost:2181
bootstrap.servers=localhost:9092

zk.connect是消息即将存储的zk地址, bootstrap.servers是消息即将存储的broker地址。(我试过没有bootstrap.servers的话,会报错)

然后执行下面的命令,启动脚本即可:

./kafka-run-class.sh kafka.tools.MirrorMaker --consumerrties --producer.config producer.properties --whitelist test --num.streams 2

num.streams控制了消费者的个数,必须要设置的。

这样就开启了mirrormaker服务,可以看到第一个集群的所有消息,都同步到了第二个集群。

控制台主要功能介绍

控制台可以安装kafka-manager进行监控与管理,安装的教程可以参考:
http://blog.csdn.net/lsshlsw/article/details/47300145

集群概况

主题

broker

消费者

参考

本文转自博客园xingoo的博客,原文链接:Kafka的基本概念与安装指南(单机+集群同步),如需转载请自行联系原博主。
相关文章
|
8月前
|
消息中间件 Kafka Linux
Linux下安装Kafka 3.9.1
本文介绍Kafka 3.9.1版本的安装与配置,包括通过ZooKeeper或KRaft模式启动Kafka。涵盖环境变量设置、日志路径修改、集群UUID生成、存储格式化及服务启停操作,适用于Linux环境下的部署实践。
1156 0
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
585 4
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
585 2
|
消息中间件 运维 Java
搭建Zookeeper、Kafka集群
本文详细介绍了Zookeeper和Kafka集群的搭建过程,涵盖系统环境配置、IP设置、主机名设定、防火墙与Selinux关闭、JDK安装等基础步骤。随后深入讲解了Zookeeper集群的安装与配置,包括数据目录创建、节点信息设置、SASL认证配置及服务启动管理。接着描述了Kafka集群的安装,涉及配置文件修改、安全认证设置、生产消费认证以及服务启停操作。最后通过创建Topic、发送与查看消息等测试验证集群功能。全网可搜《小陈运维》获取更多信息。
1083 1
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
3607 1
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
消息中间件 Kafka Docker
docker compose 安装 kafka
通过本文的步骤,您可以快速在本地使用 Docker Compose 安装并配置 Kafka 和 Zookeeper。Docker Compose 简化了多容器应用的管理,方便快速搭建和测试分布式系统。
2233 2
|
消息中间件 人工智能 安全
秒级灾备恢复:Kafka 2025 AI自愈集群下载及跨云Topic迁移终极教程
Apache Kafka 2025作为企业级实时数据中枢,实现五大革新:量子安全传输(CRYSTALS-Kyber抗量子加密算法)、联邦学习总线(支持TensorFlow Federated/Horizontal FL框架)、AI自愈集群(MTTR缩短至30秒内)、多模态数据处理(原生支持视频流、3D点云等)和跨云弹性扩展(AWS/GCP/Azure间自动迁移)。平台采用混合云基础设施矩阵与软件依赖拓扑设计,提供智能部署架构。安装流程涵盖抗量子安装包获取、量子密钥配置及联邦学习总线设置。
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
711 2
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
618 1
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
399 6

热门文章

最新文章