大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(已更完)

ClickHouse(已更完)

Kudu(已更完)

Druid(正在更新…)

章节内容

上节我们完成了如下的内容:


通过两篇来完成 集群模式配置、集群模式启动

基本介绍

Apache Druid 从 Kafka 中获取数据并进行分析的流程通常分为以下几个步骤:


Kafka 数据流的接入: Druid 通过 Kafka Indexing Service 直接从 Kafka 中摄取实时流数据。Kafka 是一个高吞吐量的消息队列,适合处理大量实时数据。Druid 会订阅 Kafka 的 topic,每当新数据到达时,它会自动从 Kafka 中读取数据。


数据解析与转换: 数据从 Kafka 进入 Druid 后,首先会进行数据解析,通常采用 JSON、Avro 或 CSV 格式。解析的过程中,Druid 可以根据预定义的 schema 进行字段映射、过滤和数据转换,比如将字符串转为数值类型、提取时间戳等。这一步允许对数据进行初步处理,比如数据清洗或格式化。


实时数据摄取与索引: Druid 将解析后的数据放入一个实时索引中,同时也将数据存储在内存中。Druid 的一个核心特点是,它会为每条记录生成倒排索引和 bitmap 索引,这样可以大大加快查询速度。实时摄取的数据在内存中保存一段时间,直到满足一定条件(比如时间或数据量),然后会以段的形式写入深度存储(如 HDFS 或 S3)。


批处理与历史数据合并: Druid 支持实时和批处理的混合模式。当实时摄取的数据段被持久化到深度存储后,Druid 可以自动将这些段与批处理数据合并。这种设计确保了在数据分析时,既能查询到最新的实时数据,也能访问历史数据。批处理数据可以通过 Hadoop 或 Spark 等框架预先批量加载到 Druid 中。


数据分片与副本管理: Druid 支持水平扩展,通过分片将数据分布在多个节点上。每个分片可以有多个副本,这样可以保证系统的高可用性和容错性。通过负载均衡,Druid 可以有效处理大规模查询请求,尤其是在数据量非常大的情况下。


查询与分析: Druid 的查询系统基于 HTTP/JSON API,支持多种类型的查询,如时间序列查询、分组聚合查询、过滤查询等。Druid 的查询引擎设计非常高效,可以处理大规模的 OLAP(在线分析处理)查询。由于 Kafka 中的数据是实时流式的,Druid 的查询结果通常可以反映出最新的业务指标和分析结果。


可视化与监控: Druid 的数据可以与 BI 工具(如 Superset、Tableau)集成,生成实时的报表和仪表盘。用户可以通过这些可视化工具,实时监控业务指标,做出数据驱动的决策。


整个流程中,Druid 负责将 Kafka 中的数据转化为高效的、可查询的 OLAP 格式,并且通过索引和分布式架构实现高效查询。这个系统可以被广泛应用于实时监控、用户行为分析、金融交易分析等场景。


从Kafka中加载数据

典型架构

日志业务中,我们不会在Druid中处理复杂的数据转换清晰工作

案例测试

假设有以下网络流量数据:


ts:时间戳

srcip:发送端IP地址

srcport:发送端端口号

dstip:接收端IP地址

dstport:接收端端口号

protocol:协议

packets:传输包

bytes:传输的字节数

cost: 传输耗费的时间

数据是JSON格式,通过Kafka传输

每行数据包含:


时间戳

维度列

指标列

需要计算的指标:


记录的条数:count

packets:max

bytes:min

cost:sum

数据汇总粒度:分钟


测试数据

{"ts":"2020-10-01T00:01:35Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":1, "bytes":1000, "cost": 0.1}

{"ts":"2020-10-01T00:01:36Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":2, "bytes":2000, "cost": 0.1}

{"ts":"2020-10-01T00:01:37Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":3, "bytes":3000, "cost": 0.1}

{"ts":"2020-10-01T00:01:38Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":4, "bytes":4000, "cost": 0.1}

{"ts":"2020-10-01T00:02:08Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":5, "bytes":5000, "cost": 0.2}

{"ts":"2020-10-01T00:02:09Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":6, "bytes":6000, "cost": 0.2}

{"ts":"2020-10-01T00:02:10Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":7, "bytes":7000, "cost": 0.2}

{"ts":"2020-10-01T00:02:11Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":8, "bytes":8000, "cost": 0.2}

{"ts":"2020-10-01T00:02:12Z","srcip":"1.1.1.1", "dstip":"2.2.2.2", "srcport":6666,"dstPort":8888, "protocol": "udp", "packets":9, "bytes":9000, "cost": 0.2}

写入的数据如下所示:

启动Kafka

这里由于资源比较紧张,我就只启动一台Kafka了:

我在 h121 节点上启动

kafka-server-start.sh /opt/servers/kafka_2.12-2.7.2/config/server.properties

创建 Topic

kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partitions 1 --topic druid1

推送消息

kafka-console-producer.sh --broker-list h121.wzk.icu:9092 --topic druid1

输出我们刚才的数据,一行一行的写入输入进行(后续要用)。

提取数据

浏览器打开我们之前启动的Druid服务

http://h121.wzk.icu:8888/

LoadData

点击控制台中的 LoadData 模块:

Streaming

选择 Streaming:

Kafka

继续选择Kafka,点击 ConnectData,在右侧输入对应的信息,点级Apply:

  • h121.wzk.icu:9092
  • druid1

ParserData

此时可以看到右下角有:Next: Parse Data:

接下篇:https://developer.aliyun.com/article/1623065

目录
相关文章
|
5天前
|
存储 机器学习/深度学习 SQL
大数据处理与分析技术
大数据处理与分析技术
23 2
|
13天前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
8天前
|
存储 监控 数据挖掘
【Clikhouse 探秘】ClickHouse 物化视图:加速大数据分析的新利器
ClickHouse 的物化视图是一种特殊表,通过预先计算并存储查询结果,显著提高查询性能,减少资源消耗,适用于实时报表、日志分析、用户行为分析、金融数据分析和物联网数据分析等场景。物化视图的创建、数据插入、更新和一致性保证通过事务机制实现。
46 14
|
14天前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
13天前
|
消息中间件 分布式计算 大数据
数据为王:大数据处理与分析技术在企业决策中的力量
【10月更文挑战第29天】在信息爆炸的时代,大数据处理与分析技术为企业提供了前所未有的洞察力和决策支持。本文探讨了大数据技术在企业决策中的重要性和实际应用,包括数据的力量、实时分析、数据驱动的决策以及数据安全与隐私保护。通过这些技术,企业能够从海量数据中提取有价值的信息,预测市场趋势,优化业务流程,从而在竞争中占据优势。
45 2
|
14天前
|
数据采集 机器学习/深度学习 搜索推荐
大数据与社交媒体:用户行为分析
【10月更文挑战第31天】在数字化时代,社交媒体成为人们生活的重要部分,大数据技术的发展使其用户行为分析成为企业理解用户需求、优化产品设计和提升用户体验的关键手段。本文探讨了大数据在社交媒体用户行为分析中的应用,包括用户画像构建、情感分析、行为路径分析和社交网络分析,以及面临的挑战与机遇。
|
14天前
|
机器学习/深度学习 搜索推荐 大数据
大数据与教育:学生表现分析的工具
【10月更文挑战第31天】在数字化时代,大数据成为改善教育质量的重要工具。本文探讨了大数据在学生表现分析中的应用,介绍学习管理系统、智能评估系统、情感分析技术和学习路径优化等工具,帮助教育者更好地理解学生需求,制定个性化教学策略,提升教学效果。尽管面临数据隐私等挑战,大数据仍为教育创新带来巨大机遇。
|
17天前
|
人工智能 供应链 搜索推荐
大数据分析:解锁商业智能的秘密武器
【10月更文挑战第31天】在信息爆炸时代,大数据分析成为企业解锁商业智能的关键工具。本文探讨了大数据分析在客户洞察、风险管理、供应链优化、产品开发和决策支持等方面的应用,强调了明确分析目标、选择合适工具、培养专业人才和持续优化的重要性,并展望了未来的发展趋势。
|
20天前
|
数据采集 分布式计算 OLAP
最佳实践:AnalyticDB在企业级大数据分析中的应用案例
【10月更文挑战第22天】在数字化转型的大潮中,企业对数据的依赖程度越来越高。如何高效地处理和分析海量数据,从中提取有价值的洞察,成为企业竞争力的关键。作为阿里云推出的一款实时OLAP数据库服务,AnalyticDB(ADB)凭借其强大的数据处理能力和亚秒级的查询响应时间,已经在多个行业和业务场景中得到了广泛应用。本文将从个人的角度出发,分享多个成功案例,展示AnalyticDB如何助力企业在广告投放效果分析、用户行为追踪、财务报表生成等领域实现高效的数据处理与洞察发现。
48 0
|
28天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
575 13
Apache Flink 2.0-preview released

推荐镜像

更多