浅谈kafka 一

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 浅谈kafka 一

Kafka官方给出的定义是:Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. (Apache Kafka 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。)

大数据领域中我们常用kakfa来构建流处理数据管道,与Spark或者Flink对接。

搭建Kafka集群,我们选用的kafka版本是kafka_2.12-2.4.1,Zookeeper版本为3.6.3。

zookeeper配置

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.0=10.60.**.**:2888:3888
server.1=10.60.**.**:2888:3888
server.2=10.60.**.**:2888:3888


默认情况下,Linux系统中没有/tmp/zookeeper/data和/tmp/zookeeper/log这两个目录,所以接下来还要创建这两个目录。

第四步,在${dataDir}目录(也就是/tmp/zookeeper/data)下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号。

启动

./zkServer.sh start


查看zookeeper服务状态

./zkServer.sh status


三个节点成功部署的话,jps后会有一个Leader和两个follower。

Kafka配置

修改 server.properties

# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/usr/local/kafka/data
# 配置zk的三个节点
zookeeper.connect=10.60.**.**:2181,10.60.**.**:2181,10.60.**.**:2181


将安装好的kafka复制到另外两台服务器,修改另外两个节点的broker.id分别为1和2

配置KAFKA_HOME环境变量

export KAFKA_HOME=/usr/local/kafka
export PATH=:$PATH:${KAFKA_HOME}


启动服务器

nohup bin/kafka-server-start.sh config/server.properties &

由于集群的服务器可能很多,手动启动比较麻烦,可以写一个一键启动和关闭的shell脚本:

在kafkalist文件中写入需要启动的服务器ip

cat /root/myshell/kafkalist | while read line
do
{
 echo $line
 ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
}&
wait
done

示例demo

我们创建一个topic,用java代码创建一个kafka producer向topic中发送数据。

./bin/kafka-topics.sh --zookeeper 10.60.**.**:2181 --create --topic topic-demo --replication-factor 3 --partitions 4


查看该topic

./bin/kafka-topics.sh --zookeeper 10.60.**.**:2181 --describe --topic topic-demo

Java Producer:

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
 *
 * 1. 创建用于连接Kafka的Properties配置
 * 2. 创建一个生产者对象KafkaProducer
 * 3. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
 * 4. 再调用一个Future.get()方法等待响应
 * 5. 关闭生产者
 */
public class KafkaProducerTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "117.50.**.**:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
        // 3. 发送1-100的消息到指定的topic中
        for(int i = 0; i < 10000000; ++i) {
            // 一、使用同步等待的方式发送消息
            // // 构建一条消息,直接new ProducerRecord
            // ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
            // Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
            // // 调用Future的get方法等待响应
            // future.get();
            // System.out.println("第" + i + "条消息写入成功!");
            // 二、使用异步回调的方式发送消息
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-demo", null, i + "");
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    // 1. 判断发送消息是否成功
                    if(exception == null) {
                        // 发送成功
                        // 主题
                        String topic = metadata.topic();
                        // 分区id
                        int partition = metadata.partition();
                        // 偏移量
                        long offset = metadata.offset();
                        System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset);
                    }
                    else {
                        // 发送出现错误
                        System.out.println("生产消息出现异常!");
                        // 打印异常消息
                        System.out.println(exception.getMessage());
                        // 打印调用栈
                        System.out.println(exception.getStackTrace());
                    }
                }
            });
        }
        // 4.关闭生产者
        kafkaProducer.close();
    }
}

消费数据

[root@master kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic topic-demo --from-beginning
441665
441666
441667
441668
441669
441670
441671
441672
441673
441674
441675
441676
441677
441678
441679
441680
441681
441682
441683


可以看到数据已经写入。

相关文章
|
4月前
|
消息中间件 分布式计算 Java
|
2月前
|
消息中间件 Java Kafka
Kafka
Kafka
13 1
|
2月前
|
消息中间件 存储 分布式计算
|
3月前
|
消息中间件 存储 Java
玩转Kafka—初步使用
玩转Kafka—初步使用
30 0
|
6月前
|
消息中间件 缓存 算法
Kafka为什么这么快?
Kafka 是一个基于发布-订阅模式的消息系统,它可以在多个生产者和消费者之间传递大量的数据。Kafka 的一个显著特点是它的高吞吐率,即每秒可以处理百万级别的消息。那么 Kafka 是如何实现这样高得性能呢?本文将从七个方面来分析 Kafka 的速度优势。
41 1
|
6月前
|
消息中间件 开发框架 Java
113 Kafka介绍
113 Kafka介绍
40 0
|
9月前
|
消息中间件 缓存 Java
Kafka介绍
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。 Kafka是一种高吞吐量的分布式发布订阅消息系统,作为消息中间件来说都起到了系统间解耦、异步、削峰等作用,同时又提供了Kafka streaming插件包在应用端实现实时在线流处理,它可以收集并处理用户在网站中的所有动作流数据以及物联网设备的采样信息
133 0
|
消息中间件 存储 负载均衡
初识Kafka
通过阅读本篇文字,你可以了解到 Kafka 中的概念:消息、主题、分区、消费者群组、broker 等。
267 0
初识Kafka
|
消息中间件 存储 Kafka
kafka-初识kafka
- kafka是一个具有高吞吐,可水平扩展,可持久化的流式数据处理平台。 - kafka主要包括:消息系统、日志系统、流式处理平台、zookeeper 四大重要组件。 消息系统的重要概念:生产者(producer),消费者(customer),服务节点(broker)。消息系统中一个重要的原理:通过连通器原理实现了保持数据的一致性。
79 0
kafka-初识kafka
|
消息中间件 缓存 Java
kafka为什么这么快
kafka为什么这么快
786 0
kafka为什么这么快