Kafka实战-简单示例

简介:

1.概述

  上一篇博客《Kafka实战-Kafka Cluster》中,为大家介绍了Kafka集群的安装部署,以及对Kafka集群Producer/Consumer、HA等做了相关测试,今天我们来开发一个Kafka示例,练习如何在Kafka中进行编程,下面是今天的分享的目录结构:

  • 开发环境
  • ConfigureAPI
  • Consumer
  • Producer
  • 截图预览

  下面开始今天的内容分享。

2.开发环境

  在开发Kafka相关应用之前,我们得将Kafka得开发环境搭建完成,这里我所使用得开发环境如下所示:

基础软件 工具名称
IDE JBoss Studio 8
JDK 1.7

  关于基础软件的下载及相关配置,大家可参考我写的《高可用Hadoop平台-启航》一文的相关赘述,这里就不多做介绍了。在安装好相关基础软件后,我们开始项目工程的创建,这里我们所使用的工程结构是Maven,关于Maven环境的相关配置信息,可参考我在《Hadoop2源码分析-准备篇》一文对Maven环境配置的赘述。

  在准备完成相关基础软件以及Maven环境后,我们大家创建的工程,在pom.xml文件中,添加Kafka的依赖包,添加代码如下所示:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.8.2.1</version>
        </dependency>        

  下面开始编写今天的代码示例。

3.ConfigureAPI

  首先是一个配置结构类文件,配置Kafka的相关参数,代码如下所示:

复制代码
package cn.hadoop.hdfs.conf;

/**
 * @Date Apr 28, 2015
 *
 * @Author dengjie
 *
 * @Note Set param path
 */
public class ConfigureAPI {

    public interface KafkaProperties {
        public final static String ZK = "10.211.55.15:2181,10.211.55.17:2181,10.211.55.18:2181";
        public final static String GROUP_ID = "test_group1";
        public final static String TOPIC = "test2";
        public final static String BROKER_LIST = "10.211.55.15:9092,10.211.55.17:9092,10.211.55.18:9092";
        public final static int BUFFER_SIZE = 64 * 1024;
        public final static int TIMEOUT = 20000;
        public final static int INTERVAL = 10000;
    }

}
复制代码

4.Consumer

  然后是一个消费程序,用于消费Kafka的消息,代码如下所示:

  • JConsumer

复制代码
package cn.hadoop.hdfs.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/**
 * @Date May 22, 2015
 *
 * @Author dengjie
 *
 * @Note Kafka Consumer
 */
public class JConsumer extends Thread {

    private ConsumerConnector consumer;
    private String topic;
    private final int SLEEP = 1000 * 3;

    public JConsumer(String topic) {
        consumer = Consumer.createJavaConsumerConnector(this.consumerConfig());
        this.topic = topic;
    }

    private ConsumerConfig consumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.ZK);
        props.put("group.id", KafkaProperties.GROUP_ID);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("Receive->[" + new String(it.next().message()) + "]");
            try {
                sleep(SLEEP);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

}
复制代码

5.Producer

  接着是Kafka的生产消息程序,用于产生Kafka的消息供Consumer去消费,具体代码如下所示:

  • JProducer

复制代码
package cn.hadoop.hdfs.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * @Date May 22, 2015
 *
 * @Author dengjie
 *
 * @Note Kafka JProducer
 */
public class JProducer extends Thread {

    private Producer<Integer, String> producer;
    private String topic;
    private Properties props = new Properties();
    private final int SLEEP = 1000 * 3;

    public JProducer(String topic) {
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "10.211.55.18:9092");
        producer = new Producer<Integer, String>(new ProducerConfig(props));
        this.topic = topic;
    }

    @Override
    public void run() {
        int offsetNo = 1;
        while (true) {
            String msg = new String("Message_" + offsetNo);
            System.out.println("Send->[" + msg + "]");
            producer.send(new KeyedMessage<Integer, String>(topic, msg));
            offsetNo++;
            try {
                sleep(SLEEP);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

}
复制代码

6.截图预览

  在开发完Consumer和Producer的代码后,我们来测试相关应用,下面给大家编写了一个Client去测试Consumer和Producer,具体代码如下所示:

  • KafkaClient

复制代码
package cn.hadoop.hdfs.kafka.client;

import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties;
import cn.hadoop.hdfs.kafka.JConsumer;
import cn.hadoop.hdfs.kafka.JProducer;

/**
 * @Date May 22, 2015
 *
 * @Author dengjie
 *
 * @Note To run Kafka Code
 */
public class KafkaClient {

    public static void main(String[] args) {
        JProducer pro = new JProducer(KafkaProperties.TOPIC);
        pro.start();

        JConsumer con = new JConsumer(KafkaProperties.TOPIC);
        con.start();
    }

}
复制代码

  运行截图如下所示:

7.总结

  大家在开发Kafka的应用时,需要注意相关事项。若是使用Maven项目工程,在添加相关Kafka依赖JAR包时,有可能依赖JAR会下载失败,若出现这种情况,可手动将Kafka的依赖JAR包添加到Maven仓库即可,在编写Consumer和Producer程序,这里只是给出一个示例让大家先熟悉Kafka的代码如何去编写,后面会给大家更加详细复杂的代码模块案例。

8.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

 

  

 

联系方式: 
邮箱:smartloli.org@gmail.com 
Twitter: https://twitter.com/smartloli 
QQ群(Hadoop - 交流社区1): 424769183 
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢! 

热爱生活,享受编程,与君共勉!



本文转自哥不是小萝莉博客园博客,原文链接:http://www.cnblogs.com/smartloli/,如需转载请自行联系原作者

相关文章
|
2月前
|
消息中间件 缓存 Java
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
✈️【Kafka技术专题】「开发实战篇」深入实战探索Kafka的生产者的开发实现及实战指南
47 0
|
2月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
53 1
|
2月前
|
消息中间件 监控 Java
✈️【Kafka技术专题】「核心原理篇」深入实战探索Kafka的Broker的原理及可靠性机制分析
✈️【Kafka技术专题】「核心原理篇」深入实战探索Kafka的Broker的原理及可靠性机制分析
42 0
|
2月前
|
消息中间件 JSON Kafka
【十九】初学Kafka并实战整合SpringCloudStream进行使用
【十九】初学Kafka并实战整合SpringCloudStream进行使用
36 1
【十九】初学Kafka并实战整合SpringCloudStream进行使用
|
4月前
|
消息中间件 存储 Kafka
KafKa C++实战
KafKa C++实战
183 0
|
4月前
|
消息中间件 分布式计算 监控
腾讯技术官手撸笔记,全新演绎“Kafka部署实战”,已开源
我们知道,当下流行的MQ非常多,不过很多公司在技术选型上还是选择使用Kafka。与其他主流MQ进行对比,我们会发现Kafka最大的优点就是吞吐量高。实际上Kafka是高吞吐低延迟的高并发、高性能的消息中间件,配置良好的Kafka集群甚至可以做到每秒几十万、上百万的超高并发写入。
|
5月前
|
消息中间件 网络协议 Kafka
docker安装zk和kafka实战笔记
docker安装zk和kafka实战笔记
docker安装zk和kafka实战笔记
|
5月前
|
消息中间件 分布式计算 大数据
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
【大数据技术】Spark+Flume+Kafka实现商品实时交易数据统计分析实战(附源码)
98 0
|
6月前
|
消息中间件 关系型数据库 MySQL
FlinkSQL 实时采集Kafka内容到MySQL(实战记录)
FlinkSQL 实时采集Kafka内容到MySQL(实战记录)
57 0
|
7月前
|
消息中间件 JSON 关系型数据库
[实战系列]SelectDB Cloud Kafka Connect 最佳实践张家锋
[实战系列]SelectDB Cloud Kafka Connect 最佳实践张家锋
97 1

热门文章

最新文章