kafka详解三:开发Kafka应用

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:
一、整体看一下Kafka
我们知道,Kafka系统有三大组件:Producer、Consumer、broker 。

producers 生产(produce)消息(message)并推(push)送给brokers,consumers从brokers把消息提取(pull)出来消费(consume)。
二、开发一个Producer应用    
     Producers用来生产消息并把产生的消息推送到Kafka的Broker。Producers可以是各种应用,比如web应用,服务器端应用,代理应用以及log系统等等。当然,Producers现在有各种语言的实现比如Java、C、Python等。
     我们先看一下Producer在Kafka中的角色:
         
2.1.kafka Producer 的 API
Kafka中和producer相关的API有三个类
  • Producer:最主要的类,用来创建和推送消息
  • KeyedMessage:定义要发送的消息对象,比如定义发送给哪个topic,partition key和发送的内容等。
  • ProducerConfig:配置Producer,比如定义要连接的brokers、partition class、serializer class、partition key等
2.2下面我们就写一个最简单的Producer:产生一条消息并推送给broker
package bonree.producer;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/*******************************************************************************
 * BidPlanStructForm.java Created on 2014-7-8
 * Author: <a href=mailto:wanghouda@126.com>houda</a>
 * @Title: SimpleProducer.java
 * @Package bonree.producer
 * Description:
 * Version: 1.0
 ******************************************************************************/
public class SimpleProducer {
	private static Producer<Integer,String> producer;
	private final Properties props=new Properties();
	public SimpleProducer(){
		//定义连接的broker list
		props.put("metadata.broker.list", "192.168.4.31:9092");
		//定义序列化类(Java对象传输前要序列化)
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		producer = new Producer<Integer, String>(new ProducerConfig(props));
	}
	public static void main(String[] args) {
		SimpleProducer sp=new SimpleProducer();
		//定义topic
		String topic="mytopic";
		//定义要发送给topic的消息
		String messageStr = "send a message to broker ";
		//构建消息对象
		KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
		//推送消息到broker
		producer.send(data);
		producer.close();
	}
}
三、开发一个consumer应用
     Consumer是用来消费Producer产生的消息的,当然一个Consumer可以是各种应用,如可以是一个实时的分析系统,也可以是一个数据仓库或者是一个基于发布订阅模式的解决方案等。Consumer端同样有多种语言的实现,如Java、C、Python等。
     我们看一下Consumer在Kafka中的角色:
   
3.1.kafka Producer 的 API
     Kafka和Producer稍微有些不同,它提供了两种类型的API
  • high-level consumer API:提供了对底层API的抽象,使用起来比较简单
  • simple consumer API:允许重写底层API的实现,提供了更多的控制权,当然使用起来也复杂一些     
由于是第一个应用,我们这部分使用high-level API,它的特点每消费一个message自动移动offset值到下一个message,关于offset在后面的部分会单独介绍。与Producer类似,和Consumer相关的有三个主要的类:
  • KafkaStream:这里面返回的就是Producer生产的消息
  • ConsumerConfig:定义要连接zookeeper的一些配置信息(Kafka通过zookeeper均衡压力,具体请查阅见面几篇文章),比如定义zookeeper的URL、group id、连接zookeeper过期时间等。
  • ConsumerConnector:负责和zookeeper进行连接等工作
3.2下面我们就写一个最简单的Consumer:从broker中消费一个消息
package bonree.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/*******************************************************************************

 * Created on 2014-7-8 Author: <a
 * href=mailto:wanghouda@126.com>houda</a>
 * @Title: SimpleHLConsumer.java
 * @Package bonree.consumer Description: Version: 1.0
 ******************************************************************************/
public class SimpleHLConsumer {
	private final ConsumerConnector consumer;
	private final String topic;

	public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
		Properties props = new Properties();
		//定义连接zookeeper信息
		props.put("zookeeper.connect", zookeeper);
		//定义Consumer所有的groupID,关于groupID,后面会继续介绍
		props.put("group.id", groupId);
		props.put("zookeeper.session.timeout.ms", "500");
		props.put("zookeeper.sync.time.ms", "250");
		props.put("auto.commit.interval.ms", "1000");
		consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
		this.topic = topic;
	}

	public void testConsumer() {
		Map<String, Integer> topicCount = new HashMap<String, Integer>();
		//定义订阅topic数量
		topicCount.put(topic, new Integer(1));
		//返回的是所有topic的Map
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
		//取出我们要需要的topic中的消息流
		List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
		for (final KafkaStream stream : streams) {
			ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
			while (consumerIte.hasNext())
				System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message()));
		}
		if (consumer != null)
			consumer.shutdown();
	}

	public static void main(String[] args) {
		String topic = "mytopic";
		SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.4.32:2181", "testgroup", topic);
		simpleHLConsumer.testConsumer();
	}

}
四、运行查看结果
先启动服务器端相关进程:
  • 运行zookeeper:[root@localhost kafka-0.8]# bin/zookeeper-server-start.sh config/zookeeper.properties
  • 运行Kafkabroker:[root@localhost kafka-0.8]# bin/kafka-server-start.sh config/server.properties
再运行我们写的应用
  • 运行刚才写的SimpleHLConsumer 类的main函数,等待生产者生产消息
  • 运行SimpleProducer的main函数,生产消息并push到broker

结果:运行完SimpleProducer后在SimpleHLConsumer的控制台即可看到生产者生产的消息:“send a message to broker”

   想更深入的了解Kafka请参阅我的另一篇文章: 《Kafka设计与原理详解》

目录
相关文章
|
9月前
|
消息中间件 缓存 NoSQL
手把手教你云相册项目简易开发 day1 Kafka+IDEA+Springboot+Redis+MySQL+libvips 简单运行和使用
手把手教你云相册项目简易开发 day1 Kafka+IDEA+Springboot+Redis+MySQL+libvips 简单运行和使用
137 0
|
7月前
|
消息中间件 NoSQL 关系型数据库
6年高级开发就因这道题少了5K,Kafka如何避免消息重复消费?
一个6年工作经验的小伙伴,被问到这样一个问题,说Kafka是如何避免消息重复消费的?面试完之后,这位小伙伴来找到我,希望我能给一个思路。今天,我给大家分享一下我的思路。
99 1
|
3月前
|
消息中间件 存储 Java
ZooKeeper 在 Kafka 中的应用
ZooKeeper 在 Kafka 中的应用
64 0
|
3月前
|
消息中间件 存储 Java
Kafka开发环境搭建及应用
Kafka开发环境搭建及应用
|
3月前
|
消息中间件 Java Kafka
windows下kafka的环境配置及rdkafka库的应用
windows下kafka的环境配置及rdkafka库的应用
|
3月前
|
消息中间件 监控 负载均衡
Kafka高级应用:如何配置处理MQ百万级消息队列?
在大数据时代,Apache Kafka作为一款高性能的分布式消息队列系统,广泛应用于处理大规模数据流。本文将深入探讨在Kafka环境中处理百万级消息队列的高级应用技巧。
174 0
|
4月前
|
消息中间件 存储 监控
Kafka Streams:深度探索实时流处理应用程序
Apache Kafka Streams 是一款强大的实时流处理库,为构建实时数据处理应用提供了灵活且高性能的解决方案。本文将深入探讨 Kafka Streams 的核心概念、详细原理,并提供更加丰富的示例代码,以帮助大家深入理解和应用这一流处理框架。
|
4月前
|
消息中间件 数据挖掘 Kafka
Kafka在微服务架构中的应用:实现高效通信与数据流动
微服务架构的兴起带来了分布式系统的复杂性,而Kafka作为一款强大的分布式消息系统,为微服务之间的通信和数据流动提供了理想的解决方案。本文将深入探讨Kafka在微服务架构中的应用,并通过丰富的示例代码,帮助大家更全面地理解和应用Kafka的强大功能。
|
4月前
|
消息中间件 Kafka Linux
Kafka【应用 01】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
Kafka【应用 01】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
181 0
|
4月前
|
消息中间件 Java 关系型数据库
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
92 2