友德
2018-07-11
3683浏览量
Kafka系统架构;
Kafka开发;
Kafka参数调优;
Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
Kafka集群包含一个或多个服务器,这种服务器被称为broker;
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,实际开发中通过称为队列, topic名称,即叫做Kafka队名称。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处);
Partition是物理上的概念,每个Topic包含一个或多个Partition;
partition物理上由多个segment组成;
负责发布消息到Kafka broker;
消息消费者,向Kafka broker读取消息的客户端;
每个Consumer属于一个特定的Consumer Group可为每个Consumer指定group name,若不指定group name则属于默认的group, 例如 在电商中发货中心、交易中心分别是两个Kafka consumer group。
数据传输的事务定义通常有以下三种级别
最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输;
最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输;
精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的,但是很难做到;
当发布消息时,Kafka有一个committed的概念,一旦消息被提交了,只要消息被写入的分区的所在的副本broker是活动的,数据就不会丢失,但是如果producer发布消息时发生了网络错误, Kafka现在也没一个完美的解决方案;
如果consumer崩溃了,会有另外一个consumer接着消费消息,它需要从一个合适的 offset 继续处理。这种情况下可以有以下选择:
consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。
consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”;
“精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起;
broker/ids/[0…N]
broker/topics/[topic]
broker id 为3的broker为某个topic提供了2个分区进行消息的存储
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
/consumers/[group_id]/ids/[consumer_id]
/consumers/[group_id]/ids
/brokers/ids/[0…N]
Kafka所有日志文件均存储在server.properties文件配置参数log.dirs下,假设 log.dirs配置为/export/kafka/log/, 同一个topic下有多个不同partition,每个partition为一个目录,目录的命名规范为: topic名称+有序序号,第一个partition序号从0开始,序号最大值为partitions数量减1,例如: 一个名为trade为的topic, 有4个partition, 则在/export/kafka/log/目录下有下面的信息:
trade-0 trade-1 trade-2 trade-3
partition中文件存储方式:每个partion目录相当于一个巨型文件被平均分配到多个大小相等segment数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。这样做的好处就是能快速删除无用文件,有效提高磁盘利用率, 下面的partition中文件存储方式:
第一步查找segment file;
第二步通过segment file查找message;
示例: 查找为offset=368776的message, 如下图所示;
定位segment file: 其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.
第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.第三个文件
00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起
始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。当
offset=368776时定位到00000000000000368769.index|log
在segment file中查找:当offset=368776时,依次定位到00000000000000368769.index的元数据物理
位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序
查找直到offset=368776为止;
四层的负载均衡,不常用, 使用zookeeper进行负载均衡, 常 用, 其中partitioner.class 的配置决定了Kafka生产者的负载均衡, 有三种情况:
partition和consumer 对应好, 平均
1: 5个partition, 5个consumer, (1, 1, 1, 1, 1)
2: 8个 partition, 5个consumer, (2, 2, 2, 1, 1)
3: 5个 partition, 8个consumer, (1, 1, 1, 1, 1, 0, 0, 0)
1.一般情况下,根据并发量,一个Kafka集群有3-5台broker机器,阿里是一条业务线7-8台物理机,内存是192G,为了减少消息的堆积,一个topic下128+个Partition, 默认是消费集群机器数量的2-3倍, 例如线上消费集群是64台,partition建议设置为128 – 192台之间, Note however that there cannot be more consumer instances in a consumer group than partitions.
2.一主多备部署:一个备的broker在和主的broker在同一个机房,另一个备的broker部署在同城的另一个机房, 进一步增加高可靠性.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
public class KafkaProducerTest {
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class);
private static Properties properties = null;
// kafka连续配置 项
static {
properties = new Properties();
properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092");
properties.put("producer.type", "sync");
properties.put("request.required.acks", "1");
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");
properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
}
public void produce() {
KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties);
ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>(
"test", "kkk".getBytes(), "vvv".getBytes());
kafkaProducer.send(kafkaRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(null != e) {
LOG.info("the offset of the send record is {}", metadata.offset());
LOG.error(e.getMessage(), e);
}
LOG.info("complete!");
}
});
kafkaProducer.close();
}
public static void main(String[] args) {
KafkaProducerTest kafkaProducerTest = new KafkaProducerTest();
for (int i = 0; i < 10; i++) {
kafkaProducerTest.produce();
}
}
}
public class ConsumerSample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("groupid", "test_group");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("test-topic", 4);
Map<String, List<KafkaStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");
ExecutorService executor = Executors.newFixedThreadPool(4);
for (final KafkaStream<Message> stream : streams) {
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata msgAndMetadata : stream) {
System.out.println("topic: " + msgAndMetadata.topic());
Message message = (Message) msgAndMetadata.message();
ByteBuffer buffer = message.payload();
buffer.get(bytes);
String tmp = new String(bytes);
System.out.println("message content: " + tmp);
}
}
});
}
}
}
局部保序模式(hash映射)
全部保序模式 (只有一个Partition)
不保序模式(轮训模式)
异步,解耦、削峰
通过agent定时收集每台主机的syslog信息
重新设置消费的partition offset
网络不通
jar包冲突(netty, slf4j)
RocketMq
JMQ
notify
ActiveMQ(Apache)
Hornet(Jboss)
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
构建可靠、高效、易扩展的技术基石