bboss kafka组件使用介绍

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: bboss kafka组件使用介绍 本文使用的实例对应的gradle源码工程git访问地址: http://git.oschina.net/bboss/bestpractice testkafka子工程地址 http://git.
bboss kafka组件使用介绍
本文使用的实例对应的gradle源码工程git访问地址:
http://git.oschina.net/bboss/bestpractice
testkafka子工程地址
http://git.oschina.net/bboss/bestpractice/tree/master/testkafka
bboss kafka组件作用
  • 快速配置kafka客户端和消费者
  • 发送数据到kafka
  • 从kafka接收和处理数据(支持批量消息处理和按条处理)

1.导入bboss kafka组件
maven坐标
<dependency>
    <groupId>com.bbossgroups.plugins</groupId>
    <artifactId>bboss-plugin-kafka</artifactId>
    <version>5.0.6.2</version>
</dependency>

gradle坐标
compile 'com.bbossgroups.plugins:bboss-plugin-kafka:5.0.6.2'



2.使用kafka producer,发送消息
2.1 kafka producer配置
编写kafka.xml配置文件,放到classpath跟路径下面
<properties>
	<property name="productorPropes">
		<propes>
			
			<property name="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer">
				<description> <![CDATA[ 指定序列化处理类,默认为kafka.serializer.DefaultEncoder,即byte[] ]]></description>
			</property>
			<property name="key.serializer" value="org.apache.kafka.common.serialization.LongSerializer">
				<description> <![CDATA[ 指定序列化处理类,默认为kafka.serializer.DefaultEncoder,即byte[] ]]></description>
			</property>
					
			<property name="compression.type" value="gzip">
				<description> <![CDATA[ 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定]]></description>
			</property>
			<property name="bootstrap.servers" value="hadoop85:9092,hadoop86:9092,hadoop88:9092">
				<description> <![CDATA[ 指定kafka节点列表,用于获取metadata(元数据),不必全部指定]]></description>
			</property>
		</propes>
	</property>
        <property name="workerThreadSize" value="100"/>
        <property name="workerThreadQueueSize" value="10240"/>
	<property name="kafkaproductor" 
		class="org.frameworkset.plugin.kafka.KafkaProductor"
		init-method="init"
		f:sendDatatoKafka="true"
f:sendAsyn="true"
		f:productorPropes="attr:productorPropes"/>		 
		
</properties>


相关配置说明:

bootstrap.servers kafka服务器地址配置
value.serializer kafka消息序列化插件配置
key.serializer kafka消息key序列化插件配置
f:sendDatatoKafka="true" 是否启动消息发送功能,false 禁用,true 启用
f:sendAsyn="true" 控制组件是否异步发送消息,默认为true
workerThreadSize 异步发送消息线程池,默认100
workerThreadQueueSize 异步发送消息队列,默认10240

2.2 发送kafka消息

发送kafka消息相关组件:
org.frameworkset.plugin.kafka.KafkaUtil
org.frameworkset.plugin.kafka.KafkaProductor

KafkaUtil组件加载配置文件并获取KafkaProductor ,通过KafkaProductor 发送kafka消息
KafkaProductor productor = KafkaUtil.getKafkaProductor("kafkaproductor");
		productor.send("blackcat",//kafka topic
				1l, //message key
				"aaa");//message
		productor.send("blackcat", //kafka topic
				"bbb"); //message


异步方式发送消息

<property name="workerThreadSize" value="100"/>
<property name="workerThreadQueueSize" value="10240"/>

<property name="kafkaproductor"
class="org.frameworkset.plugin.kafka.KafkaProductor"
init-method="init"
f:sendDatatoKafka="true"
f:sendAsyn="true"
f:productorPropes="attr:productorPropes"/>

通过api控制是否异步发送消息:

//异步方式发送消息
productor.send("blackcat",3l,"aaa",true);
productor.send("blackcat",4l,"bbb",true);

//同步方式发送消息
productor.send("blackcat",5l,"aaa",false);
productor.send("blackcat",6l,"bbb",false);

3.接收和处理kafka消息
3.1 kafka consumer配置
新建kafkaconsumer.xml文件,放到classpath根路径下面
<properties>
	<property name="consumerPropes">
		<propes>


			<property name="group.id" value="test">
				<description> <![CDATA[ 指定kafka group id]]></description>
			</property>
			<property name="zookeeper.session.timeout.ms" value="30000">
				<description> <![CDATA[ 指定kafkazk会话超时时间]]></description>
			</property>
			

			<property name="auto.commit.interval.ms" value="3000">
				<description> <![CDATA[ 指定kafka自动提交时间间隔]]></description>
			</property>

			<property name="auto.offset.reset" value="smallest">
				<description> <![CDATA[ ]]></description>
			</property>
			<property name="zookeeper.connect" value="hadoop85:2181,hadoop86:2181,hadoop88:2181">
				<description> <![CDATA[ 指定kafka节点列表,用于获取metadata(元数据),不必全部指定]]></description>
			</property>

		</propes>
	</property>
	<property name="kafkaconsumer"
		class="org.frameworkset.plugin.kafka.KafkaBatchConsumer" init-method="init"
f:batchsize="-1"
		f:checkinterval="10000"
		f:productorPropes="attr:consumerPropes" f:topic="blackcat"
		f:storeService="attr:storeService" f:partitions="4" />
	<property name="storeService"
		 class="org.frameworkset.plugin.kafka.StoreServiceTest" />	

</properties>

配置说明:
storeService 配置消息处理组件
zookeeper.connect 配置管理kafka服务器和消息的zookeeper集群地址
f:topic="blackcat" 消费的kafka topic
f:partitions="4" topic对应的分区数,决定并行处理消息的工作线程
f:batchsize="-1" 批处理消息条数,-1禁用批处理,>0时按照批处理方式按批次提交消息给storeservice组件
f:checkinterval="10000" 指定批处理消息接收最大等待时间,单位毫秒。按照批处理方式时,如果超过checkinterval指定的时间,到达的消息没有到达batchsize,则强制提交处理当前批次的数据到storeservice组件

3.2 接收和处理消息
接收和处理消息相关组件:
org.frameworkset.plugin.kafka.KafkaConsumer
org.frameworkset.plugin.kafka.StoreService

编写消息处理组件,处理组件需要实现接口
org.frameworkset.plugin.kafka.StoreService
//按条处理数据
public void store(MessageAndMetadata<byte[], byte[]> message)  throws Exception ;
public void closeService();
//按批处理消息
public void store(List<MessageAndMetadata<byte[], byte[]>> messages) throws Exception
StoreServiceTest实现:
package org.frameworkset.plugin.kafka;

import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import kafka.message.MessageAndMetadata;

public class StoreServiceTest extends BaseStoreService {
	StringDeserializer sd = new StringDeserializer();
	LongDeserializer ld = new LongDeserializer();
	@Override
	public void store(List<MessageAndMetadata<byte[], byte[]>> messages) throws Exception {
		for(MessageAndMetadata<byte[], byte[]> message:messages){
			String data = sd.deserialize(null,message.message());
			long key = ld.deserialize(null, message.key());
			System.out.println("key="+key+",data="+data);
		}
	}

	@Override
	public void closeService() {
		sd.close();
		ld.close();
	}

	@Override
	public void store(MessageAndMetadata<byte[], byte[]> message) throws Exception {
		String data = sd.deserialize(null,message.message());
		long key = ld.deserialize(null, message.key());
		System.out.println("key="+key+",data="+data);
	}

}


3.3 加载kafka consumer配置并启动消息接收线程
BaseApplicationContext context = DefaultApplicationContext.getApplicationContext("kafkaconfumer.xml");
		KafkaListener consumer = context.getTBeanObject("kafkaconsumer", KafkaListener.class);
		Thread t = new Thread(consumer);
		t.start();



目录
相关文章
|
6月前
|
消息中间件 存储 Java
【Kafka】Kafka 组件分析
【4月更文挑战第5天】【Kafka】Kafka 组件分析
|
6月前
|
消息中间件 存储 运维
王者归位:Kafka控制器组件解析
王者归位:Kafka控制器组件解析
71 0
|
1月前
|
消息中间件 监控 Kafka
Apache Kafka 成为实时数据流处理的关键组件
【10月更文挑战第8天】随着大数据技术的发展,Apache Kafka 成为实时数据流处理的关键组件。Kafka Manager 提供了一个简洁易用的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件修改、启动服务、创建和管理 Topic 等操作,帮助你快速上手。
50 3
|
22天前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
41 0
|
6月前
|
消息中间件 存储 监控
扒开kafka内部组件,咱瞅一瞅都有啥?
以上是 V 哥整理的关于 Kafka 核心组件的介绍,掌握 Kafka 中间件,应用在大型分布式项目中,这对于人个的项目经验积累是浓墨重彩的笔,换句话说,只要是有用到Kafka 的项目,必然是小不了,否则架构师脑袋长泡了。
|
消息中间件 Kafka
114 Kafka核心组件
114 Kafka核心组件
39 0
|
消息中间件 存储 运维
消息队列Kafka「检索组件」重磅上线!
本文对消息队列 Kafka「检索组件」进行详细介绍,首先通过对消息队列使用过程中的痛点问题进行介绍,然后针对痛点问题提出相应的解决办法,并对关键技术技术进行解读,旨在帮助大家对消息队列 Kafka「检索组件」的特点及使用方式更加熟悉,以期可以帮助大家更有效的解决在消息排查过程中遇到的痛点问题。
560 0
消息队列Kafka「检索组件」重磅上线!
|
消息中间件 存储 缓存
Kafka概念及组件介绍
1、分布式消息队列系统,先入先出,同时提供数据分布式缓存功能 2、消息持久化:数据读取速度可以达到O(1)——预读,后写(按顺序,ABCDE,正读A,预读B;尾部追加写)对磁盘的顺序访问比内存访问还快)
438 1
Kafka概念及组件介绍
|
消息中间件 存储 缓存
|
消息中间件 运维 安全
基于开源组件打造 Kafka 自治集群
基于开源组件打造 Kafka 自治集群
142 0
基于开源组件打造 Kafka 自治集群
下一篇
无影云桌面