storm 读取不到对应的kafka数据

简介: 坑一:pom文件主要内容:注意里面 需要 使用 “exclusion”排除相关的依赖 UTF-8 1.

坑一:pom文件主要内容:注意里面 需要 使用 “exclusion”排除相关的依赖

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <storm.version>1.1.1</storm.version>
    <kafka.version>0.9.0.0</kafka.version>
</properties>

<dependencies>

    <!--storm-core依赖-->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>${storm.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>log4j-over-slf4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <!--storm-kafka 依赖-->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka</artifactId>
        <version>${storm.version}</version>
    </dependency>

    <!-- kafka 依赖-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>${kafka.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>


    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>
</dependencies>

坑二: input.getBinaryByField(“bytes”); 里面一定要写成bytes,这是上游kafkaSpout 传递过来,源码中也可以看到。
对应位置如下图

这里写图片描述

业务代码体现:
public void execute(Tuple input) {
try {
byte[] bytes = input.getBinaryByField(“bytes”);
String value = new String(bytes);
System.out.println(“value ” + value);
this.collector.ack(input);
} catch (Exception e) {
e.printStackTrace();
this.collector.fail(input);
}
}

坑三:本地测试是,一直接收不到kafkaSpout发送过来的消息:
1)问题是已经连接上了kafka,也读到了对应的分区
2)推断可能是上游的数据发送不过来—》 可能原因shuffleGrouping时 的参数传递错误。
3)最终发现 原来就是SPOUT_ID 获取错了
应该将下面代码中的

     String SPOUT_ID = kafkaSpout.getClass().getSimpleName()
     替换成
     String SPOUT_ID = KafkaSpout.class.getSimpleName();
     即可。



         // kafka 使用的zk hosts
        BrokerHosts hosts = new ZkHosts("hadoop000:2181");
//        指定的kafak的一个根目录,存储的是kafkaSpout读取数据的位置信息(offset)
        SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());

        spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置从最近的消息开始消费

        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
        String SPOUT_ID = kafkaSpout.getClass().getSimpleName();
        builder.setSpout(SPOUT_ID, kafkaSpout);

        String BOLD_ID = LogProcessBolt.class.getSimpleName();
        builder.setBolt(BOLD_ID, new LogProcessBolt()).shuffleGrouping(SPOUT_ID);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("StormToKafkaTopology", new Config(), builder.createTopology());

坑四: storm重复消费kafak数据:

官网解释如下:  

这里写图片描述

代码中配置为如下即可
   SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());

  spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置从最近的消息开始消费

坑五: storm消费数据,ack,fail这些比配,如果出现问题还可以重试

这里写图片描述

相关文章
|
3月前
|
消息中间件 JSON druid
Druid:通过 Kafka 加载流数据
Druid:通过 Kafka 加载流数据
39 0
|
7天前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
24 0
|
2月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
71 2
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
28 1
|
2月前
|
分布式计算 资源调度 Hadoop
Flink报错问题之Sql往kafka表写聚合数据报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
机器学习/深度学习 消息中间件 人工智能
机器学习PAI报错问题之读取kafka数据报错如何解决
人工智能平台PAI是是面向开发者和企业的机器学习/深度学习工程平台,提供包含数据标注、模型构建、模型训练、模型部署、推理优化在内的AI开发全链路服务;本合集将收录PAI常见的报错信息和解决策略,帮助用户迅速定位问题并采取相应措施,确保机器学习项目的顺利推进。
|
2月前
|
SQL 消息中间件 关系型数据库
Flink CDC数据同步问题之向kafka同步数据报错如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
3月前
|
NoSQL Java 关系型数据库
使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致
使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致
41 0
|
4月前
|
消息中间件 Kafka 流计算
Flink消费kafka数据
Flink消费kafka数据
43 0
|
4月前
|
消息中间件 Kafka Windows
使用Kafka Connect 导入导出数据
使用Kafka Connect 导入导出数据
66 0