kafka producer实例及原理分析

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,182元/月
云原生网关 MSE Higress,422元/月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
简介:

1.前言

首先,描述下应用场景:

假设,公司有一款游戏,需要做行为统计分析,数据的源头来自日志,由于用户行为非常多,导致日志量非常大。将日志数据插入数据库然后再进行分析,已经满足不了。最好的办法是存日志,然后通过对日志的分析,计算出有用的数据。我们采用kafka这种分布式日志系统来实现这一过程。

步骤如下:

  • 搭建KAFKA系统运行环境


如果你还没有搭建起来,可以参考我的博客:

http://zhangfengzhe.blog.51cto.com/8855103/1556650


  • 设计数据存储格式



  • Producer端获取数据,并对数据按上述设计的格式进行编码


  • Producer将已经编码的数据发送到broker上,在broker上进行存储


  • Consumer端从broker中获取数据,分析计算。




2.实现过程


为了快速实现,我们简化日志消息格式。

在eclipse新建JAVA PROJECT,将kafka/libs下*.jar配置到项目build path即可。


Step 1 : 简单的POJO对象(MobileGameLog)

1
2
3
4
private  String actionType;
private  String appKey;
private  String guid;
private  String time;


说明:

actionType 代表行为类型

appKey     代表游戏ID

guid       代表角色

time       代表时间


提供getter/setter方法,并override toString()



Step 2 : 提供serializer


需要注意的是,POJO对象需要序列化转化成KAFKA识别的消息存储格式--byte[]



1
2
3
4
5
6
7
8
public  class  MobileGameKafkaMessage  implements  kafka.serializer.Encoder<MobileGameLog>{
@Override
public  byte [] toBytes(MobileGameLog mobileGameLog) {
return  mobileGameLog.toString().getBytes();
}
public  MobileGameKafkaMessage(VerifiableProperties props){
}
}




Step 3 : 提供Partitioner


我们可以提供Partitioner,这样可以使得数据按照我们的策略来存储在brokers中。


wKioL1QzrbnROV9UAAKDYIN8EWw025.jpg

这里,我根据appKey来进行分区。



Step 4 : 提供Producer



  • 提供配置


wKioL1QzrsiiG8CyAAGOsHs0upE284.jpg


  • 运行kafka环境


启动zookeeper:

1
2
[root@localhost kafka_2.9.2-0.8.1.1] # bin/zookeeper-server-start.sh  
config /zookeeper .properties &


启动kafka broker(id=0):

1
2
[root@localhost kafka_2.9.2-0.8.1.1] # bin/kafka-server-start.sh 
config /server .properties &


启动kafka broker(id=1)

1
2
[root@localhost kafka_2.9.2-0.8.1.1] # bin/kafka-server-start.sh  
config /server-1 .properties &


上述过程,在我的博客【搭建kafka运行环境】里面都有详细记录,大家可以参考。



创建一个topic:

1
2
[root@localhost kafka_2.9.2-0.8.1.1] # bin/kafka-topics.sh --zookeeper localhost:2181 
--create --topic log_1 --replication-factor 2 --partitions 3

注意topic:log_1有3个分区,2个复制。




  • 制造数据并发送


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Producer<key , value>
// V: type of the message
// K: type of the optional key associated with the message
kafka.javaapi.producer.Producer<MobileGameLog, MobileGameLog> producer 
new  Producer<MobileGameLog, MobileGameLog>(
config);
 
List<KeyedMessage<MobileGameLog, MobileGameLog>> list 
new  ArrayList<KeyedMessage<MobileGameLog, MobileGameLog>>();
 
// 5条tlbb数据
for  ( int  i =  1 ; i <=  5 ; i++) {
MobileGameLog log =  new  MobileGameLog();
log.setActionType( "YuanBaoShop" );
log.setAppKey( "tlbb" );
log.setGuid( "xxx_"  + i);
log.setTime( "2014-10-01 10:00:20" );
KeyedMessage<MobileGameLog, MobileGameLog> keyedMessage 
new  KeyedMessage<MobileGameLog, MobileGameLog>(
"log_1" , log, log);
list.add(keyedMessage);
}
 
// 8条ldj数据
for  ( int  i =  1 ; i <=  8 ; i++) {
MobileGameLog log =  new  MobileGameLog();
log.setActionType( "BlackMarket" );
log.setAppKey( "ldj" );
log.setGuid( "yyy_"  + i);
log.setTime( "2014-10-02 10:00:20" );
KeyedMessage<MobileGameLog, MobileGameLog> keyedMessage 
new  KeyedMessage<MobileGameLog, MobileGameLog>(
"log_1" , log, log);
list.add(keyedMessage);
}
 
producer.send(list);
producer.close();


说明:


a.producer既可以send 一个keyedMessage,可以是一个keyedMessage list.


b.注意producer实例化时的泛型。value是消息对象,即POJO,key是这个pojo的标示,这个是要用来进行分区的。


c.producer向broker发送的是KeyedMessage,注意实例化时的泛型,KEY/VALUE的意义同b.


d.KeyedMessage需要指明topic name.



  • eclipse 运行结果如下:


-------start info

运行至MobileGameKafkaPartition

VerifiableProperties : {metadata.broker.list=192.168.152.2:9092,192.168.152.2:9093, 

zk.connectiontimeout.ms=6000, request.required.acks=1, 

partitioner.class=com.sohu.game.kafka.day2.MobileGameKafkaPartition, 

serializer.class=com.sohu.game.kafka.day2.MobileGameKafkaMessage}

-------end info

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : tlbb

存储的分区为:0

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : tlbb

存储的分区为:0

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : tlbb

存储的分区为:0

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : tlbb

存储的分区为:0

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : tlbb

存储的分区为:0

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info

-------start info

运行至MobileGameKafkaPartition的partition方法,分区大小为:3

分区key : ldj

存储的分区为:2

-------end info



  • kafka consumer console 结果如下:


wKioL1Qzs_by4VoYAAP86rC1nao716.jpg





3.原理分析


查看topic:log_1详细信息:

1
2
3
4
5
6
[root@localhost kafka_2.9.2-0.8.1.1] # bin/kafka-topics.sh --zookeeper localhost:2181 
--describe --topic log_1
Topic: log_1 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: log_1 Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1
Topic: log_1 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: log_1 Partition: 2 Leader: 0 Replicas: 1,0 Isr: 0,1

log_1有2个broker进行储存,每一个broker上有3个分区,并且每一个分区的leader都是broker(id=0)


查看broker(id=0)上的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
[root@localhost tmp] # ll
total 52
drwxr-xr-x  2 root root 4096 Oct  7 01:23 hsperfdata_root
drwxr-xr-x 10 root root 4096 Oct  7 02:40 kafka-logs
drwxr-xr-x  8 root root 4096 Oct  7 02:40 kafka-logs-1
srwxr-xr-x  1 root root    0 Sep 20 18:15 mapping-root
drwxrwxrwt  2 root root 4096 Oct  6 00:34 VMwareDnD
drwx------  2 root root 4096 Oct  6 18:05 vmware-root
drwxr-xr-x  3 root root 4096 Sep 20 19:58 zookeeper
[root@localhost tmp]
[root@localhost tmp]
[root@localhost tmp]
[root@localhost tmp] # cd kafka-logs
[root@localhost kafka-logs] # pwd
/tmp/kafka-logs
[root@localhost kafka-logs] # ll
total 80
drwxr-xr-x 2 root root 4096 Oct  7 01:02 log_1-0
drwxr-xr-x 2 root root 4096 Oct  7 01:02 log_1-1
drwxr-xr-x 2 root root 4096 Oct  7 01:02 log_1-2
drwxr-xr-x 2 root root 4096 Oct  6 01:01 my_first_topic-0
-rw-r--r-- 1 root root  100 Oct  7 02:40 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root  100 Oct  7 02:40 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 Oct  6 01:01  test -0
drwxr-xr-x 2 root root 4096 Oct  6 01:01 topic_1-0
drwxr-xr-x 2 root root 4096 Sep 21 00:21 topic_2-0
drwxr-xr-x 2 root root 4096 Sep 21 00:22 topic_3-0
[root@localhost kafka-logs] # cd log_1-0/
[root@localhost log_1-0] # ll
total 12
-rw-r--r-- 1 root root 10485760 Oct  7 01:16 00000000000000000000.index
-rw-r--r-- 1 root root     1020 Oct  7 01:18 00000000000000000000.log
[root@localhost log_1-0] # cat -A 00000000000000000000.log 
^@^@^@^@^@^@^@^@^@^@^@M-@M-r^L2M-V^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_1,  time =2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_1,  time =2014-10-01 10:00:20]^@^@^@^@^@^@^@^A^@^@^@M-@^^M-46M-h^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_2,  time =2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_2, 
time =2014-10-01 10:00:20]^@^@^@^@^@^@^@^B^@^@^@M-@M-sM-s7=^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_3,  time =2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_3, 
time =2014-10-01 10:00:20]^@^@^@^@^@^@^@^C^@^@^@M-@^\M-58M-U^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_4,  time =2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_4, 
time =2014-10-01 10:00:20]^@^@^@^@^@^@^@^D^@^@^@M-@M-qM-r9^@^@^@^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_5,  time =2014-10-01 10:00:20]^@^@^@YMobileGameLog [actionType=YuanBaoShop, appKey=tlbb, guid=xxx_5, 
time =2014-10-01 10:00:20][root@localhost log_1-0] #

注意kafka broker(id=0)的日志信息显示:

有log_1-0,log_1-1,log_1-2三个目录,对应于0,1,2三个分区。

说明,topic在broker上是以partition为单位进行储存的。

上面的0分区的日志信息显示,tlbb的5条数据都被储存了2遍,并且可以发现在分区内,都是有序的。

我们在创建log_1时指定复制2份,所以数据在分区内被储存了2遍。


同理,我们继续分析broker(id=0)上的1,2分区的内容,有:

分区1无数据,分区2上8条ldj的数据被储存了2遍。

由于我们只制造了2种appkey的数据,根据分区函数,只会返回2个partition number,所以导致有一个分区没有数据。


同上的,继续分析broker(id=1)上的0,1,2分区的内容,有:

分区0,tlbb的5条数据被储存2遍

分区1,没有数据

分区2,ldj的8条数据被储存2遍


可见,broker(id=0),broker(id=1)他们的分区数据完全一致,这也就是为什么kafka的高可用性,某些broker挂了,其他的broker还可以继续提供服务和数据。


本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1561021,如需转载请自行联系原作者


相关文章
|
4月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
327 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
4月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
158 11
|
4月前
|
消息中间件 架构师 Java
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
|
10月前
|
消息中间件 存储 缓存
大厂面试高频:Kafka 工作原理 ( 详细图解 )
本文详细解析了 Kafka 的核心架构和实现原理,消息中间件是亿级互联网架构的基石,大厂面试高频,非常重要,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka 工作原理 ( 详细图解 )
|
10月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
487 5
|
8月前
|
消息中间件 存储 缓存
一文带你秒懂 Kafka工作原理!
Apache Kafka 是一个高吞吐量、低延迟的分布式消息系统,广泛应用于实时数据处理、日志收集和消息队列等领域。它最初由LinkedIn开发,2011年成为Apache项目。Kafka支持消息的发布与订阅,具备高效的消息持久化能力,适用于TB级数据的处理。
|
6月前
|
消息中间件 Kafka API
原理剖析| Kafka Exactly Once 语义实现原理:幂等性与事务消息
原理剖析| Kafka Exactly Once 语义实现原理:幂等性与事务消息
158 0
|
11月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
205 4
|
11月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
137 2