Kafka原理解析-旧版本0.8高级Api的Demo和配置信息获取技巧

简介: 旧版本高级Api封装: package xxxxxx; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; public class KafkaProducerTest implements Runnable {

旧版本高级Api封装:

package xxxxxx;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import java.util.Properties;

public class KafkaProducerTest implements Runnable {

private final kafka.javaapi.producer.Producer producer;

private final String topic;

private final Properties props = new Properties();

private final String message;

public KafkaProducerTest(String topic, String message) {

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("metadata.broker.list", "127.0.0.1:9092");

// Use random partitioner. Don't need the key type. Just set it to Integer.

// The message is of type String.

producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props));

this.topic = topic;

this.message = message;

}

@Override

public void run() {

producer.send(new KeyedMessage(topic, message));

System.out.println("发送kafka消息:"+message);

}

public static void main(String[] arg){

for(int icount=0;icount<100;icount++){

KafkaProducerTest kafkaProducer = new KafkaProducerTest("myTopic", "hello world!");

Thread thread = new Thread(kafkaProducer);

thread.start();

}

}

}

package xxxxxxxxx;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

public class KafkaConsumerTest implements Runnable{

private final ConsumerConnector consumer;

private final String topic;

public KafkaConsumerTest(String topic)

{

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig());

this.topic = topic;

}

private static ConsumerConfig createConsumerConfig()

{

Properties props = new Properties();

props.put("zookeeper.connect", "127.0.0.1:2181");

props.put("group.id", "myGroup");

props.put("zookeeper.session.timeout.ms", "400");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);

}

@Override

public void run() {

Map topicCountMap = new HashMap();

topicCountMap.put(topic, new Integer(1));

Map>> consumerMap = consumer.createMessageStreams(topicCountMap);

System.out.println("开始启动.....");

try {

KafkaStream stream = consumerMap.get(topic).get(0);

ConsumerIterator it = stream.iterator();

while (it.hasNext()) {

System.out.println("消费信息:" + new String(it.next().message()));

}

}catch (Exception e){

e.printStackTrace();

}

}

public static void main(String [] arg){

KafkaConsumerTest kafkaConsumer = new KafkaConsumerTest("myTopic");

Thread thread = new Thread(kafkaConsumer);

thread.start();

}

}

注意配置信息:

1、pom.xml文件中增加plugin:

org.mortbay.jetty

jetty-maven-plugin

7.6.10.v20130312

${basedir}/webapp

stop

55855

0

8085

60000

/

global.config.path

${basedir}/src/main/resources/config

APPID

spider.mySpider

2、读取配置文件信息的工具类:

package xxxxxx;

import com.alibaba.fastjson.JSON;

import java.io.File;

import java.io.FileInputStream;

import java.io.IOException;

import java.util.Properties;

public class ReadConfigUtil {

private static Properties prop;

private static String globalConfigPath = System.getProperty("global.config.path");

static {

prop = new Properties();

// 遍历所有.properties文件

File cfgFiles = new File(globalConfigPath);

if (cfgFiles.exists()) {

File[] fs = cfgFiles.listFiles();

for (File confFile : fs) {

try {

String confName = confFile.getName();

if (confName.endsWith(".properties")) {

FileInputStream fis = new FileInputStream(confFile);

prop.load(fis);

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

public static Properties getProp() {

return prop;

}

public static String getValueByKey(String key) {

return getProp().getProperty(key);

}

}

kafka版本引用:

org.apache.kafka

kafka_2.11

0.8.2.1

slf4j-log4j12

org.slf4j

目录
相关文章
|
23天前
|
监控 前端开发 JavaScript
实战篇:商品API接口在跨平台销售中的有效运用与案例解析
随着电子商务的蓬勃发展,企业为了扩大市场覆盖面,经常需要在多个在线平台上展示和销售产品。然而,手工管理多个平台的库存、价格、商品描述等信息既耗时又容易出错。商品API接口在这一背景下显得尤为重要,它能够帮助企业在不同的销售平台之间实现商品信息的高效同步和管理。本文将通过具体的淘宝API接口使用案例,展示如何在跨平台销售中有效利用商品API接口,以及如何通过代码实现数据的统一管理。
|
30天前
|
API 数据库 C语言
【C/C++ 数据库 sqlite3】SQLite C语言API返回值深入解析
【C/C++ 数据库 sqlite3】SQLite C语言API返回值深入解析
169 0
|
2月前
|
API PHP 开发者
大麦网 API 接口商品详情信息 API
为了让更多用户了解到大麦网的商品详情,并能够方便地获取相关信息,大麦网推出了商品详情 API 接口。本文将介绍大麦网商品详情 API 接口的作用、使用方法和注意事项,帮助广大开发者更加方便地接入大麦网的产品。
|
1天前
|
安全 Java API
Spring工厂API与原理
Spring工厂API与原理
22 10
|
8天前
|
机器学习/深度学习 API TensorFlow
TensorFlow的高级API:tf.keras深度解析
【4月更文挑战第17天】本文深入解析了TensorFlow的高级API `tf.keras`,包括顺序模型和函数式API的模型构建,以及模型编译、训练、评估和预测的步骤。`tf.keras`结合了Keras的易用性和TensorFlow的性能,支持回调函数、模型保存与加载等高级特性,助力提升深度学习开发效率。
|
16天前
|
JavaScript API UED
Vue3.0新特性解析与实战:Composition API、Teleport与Suspense
【4月更文挑战第6天】Vue3.0引入了颠覆性的Composition API,通过函数式方法提升代码可读性和复用性,例如`setup()`、`ref`等,便于逻辑模块化。实战中,自定义的`useUser`函数可在多个组件中共享用户信息逻辑。另外,Teleport允许组件渲染到DOM特定位置,解决模态框等场景的上下文问题。再者,Suspense提供异步组件加载的延迟渲染,使用fallback内容改善用户体验。这些新特性显著优化了开发和性能,适应现代Web需求。
19 0
|
25天前
|
监控 API 开发者
邮件发送API接口配置步骤?
`邮件发送API让开发者轻松集成邮件功能。选择服务提供商如SendGrid、Mailgun或AWS SES,注册获取API密钥。配置发件人、收件人、主题和内容,调用API发送邮件。处理响应以确认发送成功,并监控性能进行优化。API简化了邮件发送,提升开发效率。`
|
1月前
|
Java API Maven
email api java编辑方法?一文教你学会配置步骤
在Java开发中,Email API是简化邮件功能的关键工具。本文指导如何配置和使用Email API Java:首先,在项目中添加javax.mail-api和javax.mail依赖;接着,配置SMTP服务器和端口;然后,创建邮件,设定收件人、发件人、主题和正文;最后,使用Transport.send()发送邮件。借助Email API Java,可为应用添加高效邮件功能。
|
1月前
|
安全 API 数据安全/隐私保护
email api接口配置教程步骤详解
Email API是用于程序化访问邮件服务的工具,让开发者能集成邮件功能到应用中。配置Email API包括选择供应商(如SendGrid、Mailgun、AokSend),注册获取API密钥,配置API参数,及测试邮件发送。使用Email API能提升邮件发送的可靠性和效率,便于邮件管理及营销活动。AokSend支持大量验证码发送,适合高效邮件运营。
|
1月前
|
JavaScript 前端开发 Java
淘宝/天猫获取sku详细信息 API接口(如何抓取别人的sku图淘宝)
淘宝/天猫平台提供了获取商品SKU(Stock Keeping Unit,库存量单位)详细信息的API接口。SKU通常代表一种具有独特属性的商品变体,如颜色、尺寸等。为了获取淘宝/天猫商品的SKU详细信息,您可以遵循以下步骤: