kafka生产者和消费者的javaAPI的示例代码

简介: kafka生产者和消费者的javaAPI的示例代码

这篇文章主要介绍了kafka生产者和消费者的javaAPI的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

写了个kafka的java demo 顺便记录下,仅供参考

1.创建maven项目

目录如下:

图片.png

2.pom文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>Kafka-Maven</groupId>
  <artifactId>Kafka-Maven</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.10.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupId>jdk.tools</groupId>
      <artifactId>jdk.tools</artifactId>
      <version>1.7</version>
      <scope>system</scope>
      <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.3.6</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

3.kafka生产者KafkaProduce:

package com.lijie.producer;
 
import java.io.File;
import java.io.FileInputStream;
import java.util.Properties;
 
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class KafkaProduce {
  private static Properties properties;
 
  static {
    properties = new Properties();
    String path = KafkaProducer.class.getResource("/").getFile().toString()
        + "kafka.properties";
    try {
      FileInputStream fis = new FileInputStream(new File(path));
      properties.load(fis);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
 
  /**
   * 发送消息
   * 
   * @param topic
   * @param key
   * @param value
   */
  public void sendMsg(String topic, byte[] key, byte[] value) {
 
    // 实例化produce
    KafkaProducer<byte[], byte[]> kp = new KafkaProducer<byte[], byte[]>(
        properties);
 
    // 消息封装
    ProducerRecord<byte[], byte[]> pr = new ProducerRecord<byte[], byte[]>(
        topic, key, value);
 
    // 发送数据
    kp.send(pr, new Callback() {
      // 回调函数
      @Override
      public void onCompletion(RecordMetadata metadata,
          Exception exception) {
        if (null != exception) {
          System.out.println("记录的offset在:" + metadata.offset());
          System.out.println(exception.getMessage() + exception);
        }
      }
    });
 
    // 关闭produce
    kp.close();
  }
}

4.kafka消费者KafkaConsume:

package com.lijie.consumer;
 
import java.io.File;
import java.io.FileInputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
 
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
 
import com.lijie.pojo.User;
import com.lijie.utils.JsonUtils;
 
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
 
public class KafkaConsume {
 
  private final static String TOPIC = "lijietest";
 
  private static Properties properties;
 
  static {
    properties = new Properties();
    String path = KafkaConsume.class.getResource("/").getFile().toString()
        + "kafka.properties";
    try {
      FileInputStream fis = new FileInputStream(new File(path));
      properties.load(fis);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
 
  /**
   * 获取消息
   * 
   * @throws Exception
   */
  public void getMsg() throws Exception {
    ConsumerConfig config = new ConsumerConfig(properties);
 
    ConsumerConnector consumer = kafka.consumer.Consumer
        .createJavaConsumerConnector(config);
 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
 
    topicCountMap.put(TOPIC, new Integer(1));
 
    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
 
    StringDecoder valueDecoder = new StringDecoder(
        new VerifiableProperties());
 
    Map<String, List<KafkaStream<String, String>>> consumerMap = consumer
        .createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
 
    KafkaStream<String, String> stream = consumerMap.get(TOPIC).get(0);
 
    ConsumerIterator<String, String> it = stream.iterator();
 
    while (it.hasNext()) {
      String json = it.next().message();
      User user = (User) JsonUtils.JsonToObj(json, User.class);
      System.out.println(user);
    }
  }
}

5.kafka.properties文件

##produce
bootstrap.servers=192.168.80.123:9092
producer.type=sync
request.required.acks=1
serializer.class=kafka.serializer.DefaultEncoder
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
bak.partitioner.class=kafka.producer.DefaultPartitioner
bak.key.serializer=org.apache.kafka.common.serialization.StringSerializer
bak.value.serializer=org.apache.kafka.common.serialization.StringSerializer
 
##consume
zookeeper.connect=192.168.80.123:2181 
group.id=lijiegroup 
zookeeper.session.timeout.ms=4000 
zookeeper.sync.time.ms=200 
auto.commit.interval.ms=1000 
auto.offset.reset=smallest 
serializer.class=kafka.serializer.StringEncoder

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持"java开发全栈"。

相关文章
|
3月前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
141 2
|
2月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
99 2
|
3月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
45 2
|
3月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
57 1
|
3月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
56 1
|
3月前
|
消息中间件 大数据 Java
大数据-55 Kafka sh脚本使用 与 JavaAPI使用 topics.sh producer.sh consumer.sh kafka-clients
大数据-55 Kafka sh脚本使用 与 JavaAPI使用 topics.sh producer.sh consumer.sh kafka-clients
43 2
|
4月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
5月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
5月前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
135 0
|
5月前
|
消息中间件 安全 机器人
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
121 0