Kafka原理解析-旧版本0.8高级Api的Demo和配置信息获取技巧

简介: 旧版本高级Api封装: package xxxxxx; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; public class KafkaProducerTest implements Runnable {

旧版本高级Api封装:

package xxxxxx;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import java.util.Properties;

public class KafkaProducerTest implements Runnable {

private final kafka.javaapi.producer.Producer producer;

private final String topic;

private final Properties props = new Properties();

private final String message;

public KafkaProducerTest(String topic, String message) {

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("metadata.broker.list", "127.0.0.1:9092");

// Use random partitioner. Don't need the key type. Just set it to Integer.

// The message is of type String.

producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props));

this.topic = topic;

this.message = message;

}

@Override

public void run() {

producer.send(new KeyedMessage(topic, message));

System.out.println("发送kafka消息:"+message);

}

public static void main(String[] arg){

for(int icount=0;icount<100;icount++){

KafkaProducerTest kafkaProducer = new KafkaProducerTest("myTopic", "hello world!");

Thread thread = new Thread(kafkaProducer);

thread.start();

}

}

}

package xxxxxxxxx;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

public class KafkaConsumerTest implements Runnable{

private final ConsumerConnector consumer;

private final String topic;

public KafkaConsumerTest(String topic)

{

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig());

this.topic = topic;

}

private static ConsumerConfig createConsumerConfig()

{

Properties props = new Properties();

props.put("zookeeper.connect", "127.0.0.1:2181");

props.put("group.id", "myGroup");

props.put("zookeeper.session.timeout.ms", "400");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);

}

@Override

public void run() {

Map topicCountMap = new HashMap();

topicCountMap.put(topic, new Integer(1));

Map>> consumerMap = consumer.createMessageStreams(topicCountMap);

System.out.println("开始启动.....");

try {

KafkaStream stream = consumerMap.get(topic).get(0);

ConsumerIterator it = stream.iterator();

while (it.hasNext()) {

System.out.println("消费信息:" + new String(it.next().message()));

}

}catch (Exception e){

e.printStackTrace();

}

}

public static void main(String [] arg){

KafkaConsumerTest kafkaConsumer = new KafkaConsumerTest("myTopic");

Thread thread = new Thread(kafkaConsumer);

thread.start();

}

}

注意配置信息:

1、pom.xml文件中增加plugin:

org.mortbay.jetty

jetty-maven-plugin

7.6.10.v20130312

${basedir}/webapp

stop

55855

0

8085

60000

/

global.config.path

${basedir}/src/main/resources/config

APPID

spider.mySpider

2、读取配置文件信息的工具类:

package xxxxxx;

import com.alibaba.fastjson.JSON;

import java.io.File;

import java.io.FileInputStream;

import java.io.IOException;

import java.util.Properties;

public class ReadConfigUtil {

private static Properties prop;

private static String globalConfigPath = System.getProperty("global.config.path");

static {

prop = new Properties();

// 遍历所有.properties文件

File cfgFiles = new File(globalConfigPath);

if (cfgFiles.exists()) {

File[] fs = cfgFiles.listFiles();

for (File confFile : fs) {

try {

String confName = confFile.getName();

if (confName.endsWith(".properties")) {

FileInputStream fis = new FileInputStream(confFile);

prop.load(fis);

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

public static Properties getProp() {

return prop;

}

public static String getValueByKey(String key) {

return getProp().getProperty(key);

}

}

kafka版本引用:

org.apache.kafka

kafka_2.11

0.8.2.1

slf4j-log4j12

org.slf4j

目录
相关文章
|
9月前
|
XML 缓存 API
eBay 商品详情 API 深度解析:从基础信息到变体数据获取全方案
本文详解如何通过 eBay 的 GetItem 和 GetMultipleItems 接口获取商品详情数据,涵盖基础属性、价格、变体、卖家信息等,并提供可复用的 Python 代码。内容包括 API 核心参数、响应结构、代码实现、实战注意事项及扩展方向,助力跨境电商开发。
|
8月前
|
JSON API 数据格式
亚马逊:调用商品上传API实现全球多站点商品信息一键发布,降低人工操作成本
在亚马逊多站点电商运营中,手动上传商品效率低且易出错。通过调用Selling Partner API,可实现商品信息一键全球发布,大幅提升效率、降低成本。本文详解API功能、数据准备、代码实现与优化策略,助力企业轻松掌握自动化发布流程,提升全球运营能力。
355 0
|
8月前
|
JSON 监控 供应链
京东商品详情API参数构造指南:必填参数与自定义字段配置
京东商品详情API由京东开放平台提供,支持获取商品基础信息、价格库存、SKU规格等120+字段,适用于价格监控、库存管理等场景。接口采用HTTPS协议、JSON格式,数据延迟≤30秒,支持高并发。提供Python请求示例,便于快速接入。
|
9月前
|
机器学习/深度学习 人工智能 缓存
电商 API 接口:开启全平台商品信息同步新时代
在数字化浪潮下,电商平台激增,消费者跨平台购物成为常态。然而,商品信息分散导致数据不一致、库存混乱等问题。电商 API 接口应运而生,通过标准化数据交换,实现多平台商品信息实时同步,提升运营效率、降低成本、增强用户体验,成为企业数字化转型的关键引擎。
463 0
|
7月前
|
缓存 数据可视化 定位技术
快递鸟快递API技术指南:获取物流轨迹信息与轨迹地图的解决方案
在当今电商竞争激烈的环境中,物流体验已成为提升用户满意度的关键因素。研究表明,超过 75% 的消费者会因物流信息不透明而放弃下单。
1585 1
|
8月前
|
机器学习/深度学习 存储 API
唯品会:利用订单地址API校验收货信息,降低因地址错误导致的退货率
在电商中,地址错误常导致退货率升高,影响用户体验与运营效率。唯品会通过集成订单地址API,在用户下单时实时校验收货信息,有效减少因地址问题引发的退货。本文解析该方案的工作原理与实际效益,展示其如何助力平台降低退货率、节约成本并提升用户满意度。
310 0
|
8月前
|
存储 JSON 监控
淘宝/天猫:通过商品详情API实现多店铺商品信息批量同步,确保价格、库存实时更新
在电商运营中,管理多个淘宝或天猫店铺的商品信息(如价格、库存)耗时易错。本文介绍如何通过淘宝/天猫开放平台的商品详情API,实现自动化批量同步,确保信息实时更新。内容涵盖API调用、多店铺数据处理、实时更新策略及注意事项,助您高效管理多店铺商品信息。
546 0
|
9月前
|
Java API 网络架构
java调用api接口自动判断节假日信息
java调用api接口自动判断节假日信息
3418 0
|
9月前
|
监控 安全 BI
电商 API 助力,多平台物流信息无缝对接
在电商多平台运营中,物流信息割裂导致效率低下、客服压力大。通过API技术,可实现订单抓取、状态同步与数据聚合,打通电商平台、快递系统与商家ERP,提升运营效率与用户体验。
475 0

推荐镜像

更多
  • DNS