Kafka快速入门

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 本文将带您快速的入门Kafka,体验Kafka的基本功能。安装环境为centos7 jdk1.8参考官网:http://kafka.apache.org/quickstart

一、下载Kafka



官网下载地址  http://kafka.apache.org/downloads

截至2019年7月8日  最新版本为 2.3.0   2.12为编译的scala版本   2.3.0为kafka版本

  • Scala 2.12  - kafka_2.12-2.3.0.tgz (asc, sha512)
解压
> tar -xzf kafka_2.12-2.3.0.tgz
> cd kafka_2.12-2.3.0

二、启动服务


要先启动zookeeper   kafka内置了一个   也可以不用

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

三、创建topic


replication-factor为1   partitions为1
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看topic
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

也可以不创建topic  设置自动创建  当publish的时候


四、发送消息


用command line client 进行测试  一行就是一条消息

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

五、消费者


command line consumer  可以接收消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message


六、设置多broker集群


单broker没有意思  我们可以设置三个broker

首先为每个broker 复制配置文件

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

然后编辑

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

broker.id是唯一的  cluster中每一个node的名字  我们在same machine上  所有要设置listeners和log.dirs  以防冲突

建一个topic 一个partitions  三个replication-factor

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
用describe看看都是什么情况
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
   Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0


  • 有几个概念 :
  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

刚才那个topic

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
   Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

发送 接收

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

试一下容错 fault-tolerance

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564


看一下变化:Leader换了一个  因为1被干掉了

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
   Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

还是收到了消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

七、使用kafka  import/export data


刚才都是console 的数据,其他的sources    other systems呢  用Kafka Connect

弄一个数据

> echo -e "foo\nbar" > test.txt

启动  指定配置文件

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

验证一下

> more test.sink.txt
foo
bar

消费者端

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

可以继续写入

> echo Another line>> test.txt


八、使用Kafka Streams


http://kafka.apache.org/22/documentation/streams/quickstart

WordCountDemo

https://github.com/apache/kafka/blob/2.2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

代码片段

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input",
   Consumed.with(stringSerde, stringSerde);
KTable<String, Long> wordCounts = textLines
   // Split each text line, by whitespace, into words.
   .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
   // Group the text words as message keys
   .groupBy((key, value) -> value)
   // Count the occurrences of each word (message key).
   .count()
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

建一个 Kafka producer  指定input topic    output topic

> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
Created topic "streams-wordcount-output".

启动WordCount demo application

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

启动一个生产者写数据

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams

启动一个消费者接数据

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
   --topic streams-wordcount-output \
   --from-beginning \
   --formatter kafka.tools.DefaultMessageFormatter \
   --property print.key=true \
   --property print.value=true \
   --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
   --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
kafka   1
相关文章
|
消息中间件 存储 安全
kafka快速入门1
kafka快速入门1
122 0
|
5月前
|
消息中间件 存储 Java
快速入门 Kafka 和 Java 搭配使用
快速入门 Kafka 和 Java 搭配使用
185 0
|
消息中间件 缓存 大数据
Kafka学习---1、Kafka 概述、Kafka快速入门
Kafka学习---1、Kafka 概述、Kafka快速入门
Kafka学习---1、Kafka 概述、Kafka快速入门
|
消息中间件 JSON Java
kafka快速入门2
kafka快速入门2
103 0
|
消息中间件 存储 传感器
macOS 系统 安装 Kafka 快速入门
macOS 系统 安装 Kafka 快速入门
352 0
|
消息中间件 存储 缓存
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(上)
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(上)
|
消息中间件 存储 Java
Kafka快速入门
Kafka快速入门
|
消息中间件 中间件 Kafka
测试中间件 - Kafka Tool 快速入门(一)
测试中间件 - Kafka Tool 快速入门(一)
459 8
测试中间件 - Kafka Tool 快速入门(一)
|
消息中间件 存储 缓存
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(下)
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本
Kafka快速入门(Kafka Broker)节点服役和退役、手动调整副本(下)
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务

热门文章

最新文章

下一篇
无影云桌面