Kafka Consumer接口

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介:

对于kafka的consumer接口,提供两种版本,

 

high-level

一种high-level版本,比较简单不用关心offset, 会自动的读zookeeper中该Consumer group的last offset 
参考,https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

不过要注意一些注意事项,对于多个partition和多个consumer 
1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 
2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 
    最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目 
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同 
4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化 
5. High-level接口中获取不到数据的时候是会block的

简单版,

简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置 
因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset

复制代码
        Properties props = new Properties();
        props.put("auto.offset.reset", "smallest"); //必须要加,如果要读旧数据
         props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "pv");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        
        ConsumerConfig conf = new ConsumerConfig(props);
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
        String topic = "page_visits";
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        
        KafkaStream<byte[], byte[]> stream = streams.get(0); 
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()){
            System.out.println("message: " + new String(it.next().message()));
        }
        
        if (consumer != null) consumer.shutdown();   //其实执行不到,因为上面的hasNext会block
复制代码

在用high-level的consumer时,两个给力的工具,

1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv

可以看到当前group offset的状况,比如这里看pv的状况,3个partition

Group           Topic                          Pid Offset          logSize         Lag             Owner 
pv              page_visits                    0   21              21              0               none 
pv              page_visits                    1   19              19              0               none 
pv              page_visits                    2   20              20              0               none

关键就是offset,logSize和Lag 
这里以前读完了,所以offset=logSize,并且Lag=0

2. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties  page_visits

3个参数, 
[earliest | latest],表示将offset置到哪里 
consumer.properties ,这里是配置文件的路径 
topic,topic名,这里是page_visits

我们对上面的pv group执行完这个操作后,再去check group offset状况,结果如下,

Group           Topic                          Pid Offset          logSize         Lag             Owner 
pv              page_visits                    0   0               21              21              none 
pv              page_visits                    1   0               19              19              none 
pv              page_visits                    2   0               20              20              none

可以看到offset已经被清0,Lag=logSize

 

底下给出原文中多线程consumer的完整代码

复制代码
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ConsumerGroupExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;
 
    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector,注意下面对conf的配置
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }
 
    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }
 
    public void run(int a_numThreads) { // 创建并发的consumers
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建Streams
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 每个线程对应于一个KafkaStream
 
        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);
 
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread
            threadNumber++;
        }
    }
 
    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
 
        return new ConsumerConfig(props);
    }
 
    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);
 
        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);
 
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
 
        }
        example.shutdown();
    }
}
复制代码

 

SimpleConsumer

另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口

参考,https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

什么时候用这个接口?

  1. Read a message multiple times
  2. Consume only a subset of the partitions in a topic in a process
  3. Manage transactions to make sure a message is processed once and only once

 

当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦 
所以不是一定要用,最好别用

  1. You must keep track of the offsets in your application to know where you left off consuming.
  2. You must figure out which Broker is the lead Broker for a topic and partition
  3. You must handle Broker leader changes

使用SimpleConsumer的步骤:

  1. Find an active Broker and find out which Broker is the leader for your topic and partition
  2. Determine who the replica Brokers are for your topic and partition
  3. Build the request defining what data you are interested in
  4. Fetch the data
  5. Identify and recover from leader changes

首先,你必须知道读哪个topic的哪个partition 
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker 
再者,自己去写request并fetch数据 
最终,还要注意需要识别和处理broker leader的改变

 

逐步来看,

Finding the Lead Broker for a Topic and Partition

思路就是,遍历每个broker,取出该topic的metadata,然后再遍历其中的每个partition metadata,如果找到我们要找的partition就返回 
根据返回的PartitionMetadata.leader().host()找到leader broker

复制代码
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) { //遍历每个broker 
            SimpleConsumer consumer = null;
            try {
                //创建Simple Consumer,
//class SimpleConsumer(val host: String,val port: Int,val soTimeout: Int
// ,val bufferSize: Int,val clientId: String)
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); // kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); //发送TopicMetadata Request请求 List<TopicMetadata> metaData = resp.topicsMetadata(); //取到Topic的Metadata for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) {//遍历每个partition的metadata if (part.partitionId() == a_partition) { //确认是否是我们要找的partition returnMetaData = part; break loop; //找到就返回 } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } return returnMetaData; }
复制代码

 

Finding Starting Offset for Reads

request主要的信息就是Map<TopicAndPartition, PartitionOffsetRequestInfo>

TopicAndPartition就是对topic和partition信息的封装 
PartitionOffsetRequestInfo的定义 
case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int) 
其中参数time,表示where to start reading data,两个取值 
kafka.api.OffsetRequest.EarliestTime(),the beginning of the data in the logs 
kafka.api.OffsetRequest.LatestTime(),will only stream new messages

不要认为起始的offset一定是0,因为messages会过期,被删除

另外一个参数不清楚什么含义,代码中取的是1

复制代码
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); //build offset fetch request info
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(),clientName); OffsetResponse response
= consumer.getOffsetsBefore(request); //取到offsets if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); //取到的一组offset return offsets[0]; //取第一个开始读 }
复制代码

 

Reading the Data

首先在FetchRequest上加上Fetch,指明topic,partition,开始的offset,读取的大小 
如果producer在写入很大的message时,也许这里指定的1000000是不够的,会返回an empty message set,这时需要增加这个值,直到得到一个非空的message set。

复制代码
// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.
// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder()
        .clientId(clientName)
        .addFetch(a_topic, a_partition, readOffset, 100000) // 1000000bytes
        .build();
FetchResponse fetchResponse = consumer.fetch(req);
 
if (fetchResponse.hasError()) {
        // See Error Handling
}
numErrors = 0;
 
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
    long currentOffset = messageAndOffset.offset();
    if (currentOffset < readOffset) { // 必要判断,因为对于compressed message,会返回整个block,所以可能包含old的message
        System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
        continue;
    }
    readOffset = messageAndOffset.nextOffset(); // 获取下一个readOffset
    ByteBuffer payload = messageAndOffset.message().payload();
 
    byte[] bytes = new byte[payload.limit()];
    payload.get(bytes);
    System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
    numRead++;
}
 
if (numRead == 0) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException ie) {
    }
}
复制代码

 

Error Handling
复制代码
if (fetchResponse.hasError()) {
     numErrors++;
     // Something went wrong!
     short code = fetchResponse.errorCode(a_topic, a_partition);
     System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
     if (numErrors > 5) break;
 
     if (code == ErrorMapping.OffsetOutOfRangeCode())  { // 处理offset非法的问题,用最新的offset
         // We asked for an invalid offset. For simple case ask for the last element to reset
         readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
         continue;
     }
     consumer.close();
     consumer = null;
     leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); // 更新leader broker
     continue;
 }
复制代码

没有特别的逻辑,只是重新调用findLeader获取leader broker 
并且防止在切换过程中,取不到leader信息,加上sleep逻辑

复制代码
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
       for (int i = 0; i < 3; i++) {
           boolean goToSleep = false;
           PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
           if (metadata == null) {
               goToSleep = true;
           } else if (metadata.leader() == null) {
               goToSleep = true;
           } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
               // first time through if the leader hasn't changed give ZooKeeper a second to recover
               // second time, assume the broker did recover before failover, or it was a non-Broker issue
               //
               goToSleep = true;
           } else {
               return metadata.leader().host();
           }
           if (goToSleep) {
               try {
                   Thread.sleep(1000);
               } catch (InterruptedException ie) {
               }
           }
       }
       System.out.println("Unable to find new leader after Broker failure. Exiting");
       throw new Exception("Unable to find new leader after Broker failure. Exiting");
   }
复制代码
 
Full Source Code
复制代码
package com.test.simple;
 
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
 
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
public class SimpleExample {
    public static void main(String args[]) {
        SimpleExample example = new SimpleExample();
        long maxReads = Long.parseLong(args[0]);
        String topic = args[1];
        int partition = Integer.parseInt(args[2]);
        List<String> seeds = new ArrayList<String>();
        seeds.add(args[3]);
        int port = Integer.parseInt(args[4]);
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
             e.printStackTrace();
        }
    }
 
    private List<String> m_replicaBrokers = new ArrayList<String>();
 
    public SimpleExample() {
        m_replicaBrokers = new ArrayList<String>();
    }
 
    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // find the meta data about the topic and partition we are interested in
        //
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;
 
        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
 
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);
 
            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                if (code == ErrorMapping.OffsetOutOfRangeCode())  {
                    // We asked for an invalid offset. For simple case ask for the last element to reset
                    readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;
 
            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();
 
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }
 
            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) consumer.close();
    }
 
    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
 
        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }
 
    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give ZooKeeper a second to recover
                // second time, assume the broker did recover before failover, or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }
 
    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
 
                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}



本文章摘自博客园,原文发布日期:2014-06-18 
目录
相关文章
|
8月前
|
消息中间件 Kafka API
kafka Consumer high-level api 之白名单
kafka Consumer high-level api 之白名单
|
2月前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
3月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
102 4
|
4月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
415 4
|
5月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
172 4
|
5月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
137 2
|
5月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
104 8
|
5月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
126 7
|
5月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
216 4

热门文章

最新文章