一、kafka简介
1、kafka是一种高吞吐量的分布式发布订阅消息系统,不像activeMQ一样,它只是一个队列
它的里面基本都是topic,这样的一种结构。它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消费。kafka专门为大数据而生的。
主要在大数据当中结合fuame+kafka做实时日志数据分析。
2、kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
①、通过0(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。(文件追加的方式写入数据,过期的数据定期删除)。
②、高吞吐量:即使是非常普遍的硬件kafka也可以支持每秒数百万的消息。
反而针对少量的数据,它的效率并不高,如果大的数据量的话,它的效率就最高。
kafka的6到8倍的效率:
③、支持通过kafka服务器和消费集群来区分消息
④、支持hadoop并行数据加载,因为kafka在分区里面发布消息是相互不受影响的,是完全隔离的。
问题:怎么保证kafka消费的消息是有序的呢?
回答:定义一个分区就可以了。
二、使用场景:
1、Messaging
对于一些常规的消息系统,kafka是个不错的选择,partitions(分区:主要作用保存副本,为了达到容错的效果)/replication(副本)和容错,可以使kafka具有良好的扩展性和性能优势。不过到目前位置,我们应该很清除的认识到,kafka并没有提供JMS中的"事务性"(可能会发生数据丢失),"消息传输担保(消息确认机制)","消息分组"等企业特性;kafka只能使用作为”常规“的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)。所以在kafka中数据丢失是很正常的,所以日志用kafka来做分析。如果把重要的数据放入到kafka中也不是很可取的方案。
2、Websit activity tracking
kafka可以作为”网站活性跟踪“的最佳工具;可以将网页/用户操作等信息发送到kafka中,并实时监控,或者离线统计分析等。
3、Log Aggregation
kafka的特性决定它非常适合作为”日志收集中心";application可以将操作日子“批量”“异步”的发送到kafka集群中,而不是保存在本地或者DB中,kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能大的开支。此时consumer端可以使hadoop等其他系统化的存储和分析系统。如果大量的数据的话,可以存储到hbase中,然后由kafka拉取数据,做数据的分析。
三、kafka的架构
首先是一堆生产者,生产消息,partition代表一个分区,分区里的消息是有序的,一个生产者将一个消息发送到两个分区。而每一个分区都可以对应一个消费者,甚至是一个消费组。
kafka集群是依赖zookeeper(协调服务的)来做leader选举的。它最新的消息是在最前面。最旧的消息应该在最后面。慢慢的是由新数据变为老数据。
producers:生产者
message:消息
Topic:Topic可以按照分区来分割的,分区与分区之间可以并行的接收到消息的。
如果整个集群中只有一个分区的话,那么整个集群的消费是有顺序性的。
整个的架构模型:生产者将消息push给分区。然后由消费者进行poll拉取消息。
怎么知道拉取的是最新的数据呢,需要配置一个参数
四、kafka中的术语
在深入理解kafka之前,先介绍下kafka中的术语,下图展示了kafka的相关术语以及之间的关系:
首先生产者写数据到分区里面,一个生产者可以写两份数据到分区里面去。上面的topic总共 有三个分区,比如在写的时候,produce1它发送了两个消息到分区里面0和1,也向分区Partition2里面发送了两个消息0和1.这时只能有一个分区提供0和1给消费。这个时候就会进行一个选举,选举出一个leader出来,然后跟随着也就是follower的消息当成副本。
在kafka Broker整个Kafka集群当中,有三个备份。消费组可以两两为一组。也可以一个为一组。也可以三个为一个组。主要为了保证每个消息者都有消息可消费,尽量的去均等。
上图中一个topic配置了三个partition。Partition1有两个 offset:0和1。Partition2有 4个offset.Patition3有1个offset。副本的id和副本所在的机器的id恰好相同(也就是说副本id和brokerId是一样的)。
如果一个topic的副本数为3,那么kafka将在集群中为每个partition创建3个相同的副本。集群中每个broker存储一个或者多个partition。多个producere和consumer可同时生产和消费数据。kafka高性能的地方在于一个也能选举,自己就为leader。
3.1、broker
kafka集群包含一个或多个服务器,服务器节点称为broker。
broker存储topic的数据,如果某个topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
如果某topic有N个partition,集群有N+M个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致kafka集群数据不均衡。
总结:不管有多少个partition,都是发给broker的。
3.2、Topic
顾名思义Topics是一些主题的集合,更通俗的说Topic就像一个消息队列,生产者可以向其写入消息,消费者可以从中读取消息,一个Topic支持多个生产者或消费者同时订阅它,所以其扩展性很好。Topic又可以由一个或多个partition(分区)组成,比如下图:
其中每个partition中的消息是有序的,但相互之间的顺序就不能保证了,若Topic有多个partition,生产者的消息可以指定或者由系统根据算法分配到指定分区,若你需要所有消息都是有序的,那么你最好只用一个分区。另外partition支持消息位移读取,消息位移有消费者自身管理,比如下图:
由上图可以看出,不同消费者对同一分区的消息读取互不干扰,消费者可以通过设置消息(offset)来控制自己想要获取的数据,比如可以从头读取,最新数据读取,重读读取等功能
读取完消息是不会删掉的。
3.3、partition
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件(段文件)存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
3.4、Produer
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
3.5、Consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
3.6、Consumer Group/Consumers
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
A、Consumers
Consumers是一群消费者的集合,可以称之为消费者组,是一种更高层次的的抽象,向Topic订阅消费消息的单位是Consumers,当然它其中也可以只有一个消费者(consumer)。下面是关于consumer的两条原则:
- 假如所有消费者都在同一个消费者组中,那么它们将协同消费订阅Topic的部分消息(根据分区与消费者的数量分配),保存负载平衡;
- 假如所有消费者都在不同的消费者组中,并且订阅了同个Topic,那么它们将可以消费Topic的所有消息;
下面是一个简单的例子,帮助大家理解:四个分区上的数据有可能都是不一样的
上图中有两个Server节点,有一个Topic被分为四个分区(P0-P3)分别被分配在两个节点上,另外还有两个消费者组(GA,GB),其中GA有两个消费者实例,GB有四个消费者实例。
从图中我们可以看出,首先订阅Topic的单位是消费者组,另外我们发现Topic中的消息根据一定规则将消息推送给具体消费者,主要原则如下:下面的原则也是为什么能kafka做到高吞吐量的原因所在:尽量会均等的给每个消费者去消费。
- 若消费者数小于partition数,且消费者数为一个,那么它就消费所有消息;
- 若消费者数小于partition数,假设消费者数为N,partition数为M,那么每个消费者能消费的分区数为M/N或M/N+1;
- 若消费者数等于partition数,那么每个消费者都会均等分配到一个分区的消息;
- 若消费者数大于partition数,则将会出现部分消费者得不到消息分区,出现空闲的情况;
总的来说,Kafka会根据消费者组的情况均衡分配消息,比如有消息的实例宕机,亦或者有新的消费者加入等情况。
结论: 在进行高吞吐量的架构下面,最好保持Kafka集群中的分区数和消费组中消费者个数相同
4.7、Leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
4.8、Follower (ISR):它做的就是master-slave的结构
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
Leader的选举是partition里面的选举,和zk的选举是不一样的。
★ Leader 负责读和写, follower负责数据同步和备份,follower会定时的从leader去拉取数据,到自己的里面来备份。只有同步的数据的follower才能够进入到同步的列表中来。
四、如何设置开机虚拟机,zk自动启动
1、在生产环境下,只要加上 su - root -C '/usr/local/zookeeper354/bin/zkServer.sh start'
2、安装完查看下zk
controller_epoch:代表选举的轮数,leader:代表哪一台属于leader。isr:维护的是副本id。意思就是分区为0的副本存在分区1,2,3上面,1属于leader,2,3属于follower。2,1,3代表broker的id
kafka默认的清除日志的时间是168个小时,意思说日志在机器上存储的时间是7天。
这边生产者生产消息,消费者订阅了这个主题,就能够收到消息
第一个图是生产者,第二个图是消费者
注意
1、如果消费者不指定from begining的话,会从最后一个偏移量去读取。
2、消费者不会马上读取消息,它必须等到数据同步完成之后才会读,如果没有同步完成,是不允许去读的,它会一直阻塞,等待同步完成,才会放开读和写的过程。
如果消费者写一个ip能否收到消息呢?操作如下:
生产者消费消息:
消费者消费消息:
所以这样也能消费。总结:
1、集群是在任何的一台机器上操作都是一样的效果。
2、集群中由于所有的分区里面的副本全部都是和zk关联的,这个时候去找的话它会去找到它的副本,如果找的副本不是leader,这时会从state找到它的信息,把leader找到,然后从leader这边消费,所以在进群中写一个就可以了。不管读和写找的都是leader,也是从副本里面找到leader,然后再让leader接收写的请求。这就是高可用的机制。高可用是由zookeeper来维护的。
代码如下:
package com.weizhaoyang;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "192.168.124.241:9092,192.168.124.242:9092,192.168.124.243:9092");
//ack是判断请求是否为完整的条件(即判断是否成功发送)。all将会阻塞消息,这种设置性能最低,但是最可靠。
props.put("acks", "1");
//retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。
props.put("retries", 0);
//producer缓存每个分区未发送消息,缓存的大小是通过batch.size()配置设定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有一个缓冲区)
props.put("batch.size", 16384);
//默认缓冲区可立即发送,即便缓冲区空间没有满;但是,如果你想减少请求的数量,可以设置linger.ms大于0.这将指示生产者发送请求之前等待一段时间
//希望更多的消息补填到未满的批中。这类似于tcp的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger时间为1ms,然后,如果我们
//没有填满缓冲区,这个设置将增加1ms的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是linger.ms=0。
//不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
props.put("linger.ms", 1);
//buffer.memory控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值
//通过max.block.ms设定,之后他将抛出一个TimeoutExecption。
props.put("buffer.memory", 33554432);
//key.serializer和value.serializer示例:将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerizlizaer或StringSerializer处理简单的byte和String类型.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//设置kafka的分区数量
props.put("kafka.partitions", 12);
KafkaProducer<String, String> producer = new KafkaProducer(props);
for (int i = 0; i < 50; i++){
System.out.println("key-->key"+i+" value-->vvv"+i);
producer.send(new ProducerRecord<String, String>("test-topic1", "key"+i, "vvv"+i));
Thread.sleep(1000);
}
producer.close();
}
}
启动运行结果如下:这边边生产,这边就边消费。key的作用保证同一个key路由到同一个分区上。因为它会用key执行hash算法来得到hash值去模上分区的数量最终的结果是分区其中的一个。
这时写下consumer端的代码,代码如下:可以指定消费端按指定的分区去消费
package com.weizhaoyang;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerTest {
public KafkaConsumer<String, String> getConsmer() {
Properties props = new Properties();
//设置kafka服务器
props.put("bootstrap.servers", "192.168.124.241:9092,192.168.124.242:9092,192.168.124.243:9092");
//消费者群组ID,发布-订阅模式,即如果一个生产者,多个消费者都要消费,那么需要定义自己的群组,同一个群组内的消费者只有一个能消费到消息
props.put("group.id", "test");
//true,消费者的偏移量将在后台定期提交;false关闭自动提交位移,在消息被完整处理之后再手动提交位移
props.put("enable.auto.commit", "true");
//如何设置为自动提交(enable.auto.commit=true),这里设置自动提交周期
props.put("auto.commit.interval.ms", "1000");
//session.timeout.ms:在使用kafka的组管理时,用于检测消费者故障的超时
props.put("session.timeout.ms", "30000");
//key.serializer和value.serializer示例:将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerizlizaer或StringSerializer处理简单的byte和String类型.
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
public static void main(String[] args) {
ConsumerTest kconsumer = new ConsumerTest();
KafkaConsumer<String, String> consumer = kconsumer.getConsmer();
//consumer.subscribe(Arrays.asList("aaa"));
TopicPartition partition=new TopicPartition("test-topic1",2);
consumer.assign(Arrays.asList(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println("offset = "+record.offset()+", key = "+record.key()+", value = "+ record.value());
}
}
}
运行的结果如下:
总结:这再一次证明了,一个分区内的消息是有序被消费了。