Kafka实战-数据持久化

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介:

1.概述

  经过前面Kafka实战系列的学习,我们通过学习《Kafka实战-入门》了解Kafka的应用场景和基本原理,《Kafka实战-Kafka Cluster》一文给大家分享了Kafka集群的搭建部署,让大家掌握了集群的搭建步骤,《Kafka实战-实时日志统计流程》一文给大家讲解一个项目(或者说是系统)的整体流程,《Kafka实战-Flume到Kafka》一文给大家介绍了Kafka的数据生产过程,《Kafka实战-Kafka到Storm》一文给大家介绍了Kafka的数据消费,通过Storm来实时计算处理。今天进入Kafka实战的最后一个环节,那就是Kafka实战的结果的数据持久化。下面是今天要分享的内容目录:

  • 结果持久化
  • 实现过程
  • 结果预览

  下面开始今天的分享内容。

2.结果持久化

  一般,我们在进行实时计算,将结果统计处理后,需要将结果进行输出,供前端工程师去展示我们统计的结果(所说的报表)。结果的存储,这里我们选择的是Redis+MySQL进行存储,下面用一张图来展示这个持久化的流程,如下图所示:

  从途中可以看出,实时计算的部分由Storm集群去完成,然后将计算的结果输出到Redis和MySQL库中进行持久化,给前端展示提供数据源。接下来,我给大家介绍如何实现这部分流程。

3.实现过程

  首先,我们去实现Storm的计算结果输出到Redis库中,代码如下所示:

复制代码
package cn.hadoop.hdfs.storm;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import redis.clients.jedis.Jedis;
import cn.hadoop.hdfs.util.JedisFactory;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

/**
 * @Date Jun 10, 2015
 *
 * @Author dengjie
 *
 * @Note Calc WordsCount eg.
 */
public class WordsCounterBlots implements IRichBolt {

    /**
     * 
     */
    private static final long serialVersionUID = -619395076356762569L;

    OutputCollector collector;
    Map<String, Integer> counter;

    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.counter = new HashMap<String, Integer>();
    }

    public void execute(Tuple input) {
        String word = input.getString(0);
        Integer integer = this.counter.get(word);
        if (integer != null) {
            integer += 1;
            this.counter.put(word, integer);
        } else {
            this.counter.put(word, 1);
        }
        for (Entry<String, Integer> entry : this.counter.entrySet()) {
           // write result to redis
            Jedis jedis = JedisFactory.getJedisInstance("real-time");
            jedis.set(entry.getKey(), entry.getValue().toString());
            
            // write result to mysql
            // ...
        }
this.collector.ack(input);
    }

    public void cleanup() {
        // TODO Auto-generated method stub
        
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }

    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

}
复制代码

  注:这里关于输出到MySQL就不赘述了,大家可以按需处理即可。

4.结果预览

  在实现持久化到Redis的代码实现后,接下来,我们通过提交Storm作业,来观察是否将计算后的结果持久化到了Redis集群中。结果如下图所示:

  通过Redis的Client来浏览存储的Key值,可以观察统计的结果持久化到来Redis中。

5.总结

  我们在提交作业到Storm集群的时候需要观察作业运行状况,有可能会出现异常,我们可以通过Storm UI界面来观察,会有提示异常信息的详细描述。若是出错,大家可以通过Storm UI的错误信息和Log日志打印的错误信息来定位出原因,从而找到对应的解决办法。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

 

联系方式: 
邮箱:smartloli.org@gmail.com 
Twitter: https://twitter.com/smartloli 
QQ群(Hadoop - 交流社区1): 424769183 
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢! 

热爱生活,享受编程,与君共勉!



本文转自哥不是小萝莉博客园博客,原文链接:http://www.cnblogs.com/smartloli/,如需转载请自行联系原作者

相关文章
|
1月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
91 5
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
103 1
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
51 3
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
61 1
|
4月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
170 0
|
4月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 缓存 Kafka
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
|
4月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
354 9