Step By Step
消费端程序
1、参数配置
参考:Kafka消息发送的三种模式 连接参数配置部分。
2、消费端:Code Sample
import com.base.JavaKafkaConfigurer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerDemo {
//加载kafka.properties
public static Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
public static Properties initConfig()
{
Properties props = new Properties();
//消息的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//设置接入点,即控制台的实例详情页显示的“默认接入点”
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//两次poll之间的最大允许间隔
//请不要改得太大,服务器会掐掉空闲连接,不要超过30000
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 25000);
//每次poll的最大数量
//注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//当前消费实例所属的消费组,请在控制台申请之后填写
//属于同一个组的消费实例,会负载消费消息
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
return props;
}
public static void main(String[] args) {
Properties properties = initConfig();
int consumerThreadNum = 6; // 可以设置和Topic的分区数据一致,这样一个分区就可以分配一个线程来消费消息。
String topic = kafkaProperties.getProperty("topic");
for (int i = 0; i < consumerThreadNum; i++) {
new KafkaConsumerThread(properties, topic).start();
}
}
public static class KafkaConsumerThread extends Thread{
private KafkaConsumer<String, String> kafkaConsumer;
public KafkaConsumerThread(Properties props, String topic)
{
this.kafkaConsumer = new KafkaConsumer<String, String>(props);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
}
public void run()
{
try {
while (true)
{
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record: records) {
System.out.println("Thread num: " + this.getName());
System.out.println(String.format("Consume partition:%d offset:%d the message body:%s", record.partition(), record.offset(),record.value()));
}
}
}
catch (Exception e)
{
e.printStackTrace();
}finally {
kafkaConsumer.close();
}
}
}
}
3、消费端情况
4、服务端控制台查看