从 ES Kafka Mongodb Restful ... 取到 json 之后

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 Tair(兼容Redis),内存型 2GB
简介: JSON 是一种广泛使用的数据交换格式,但其计算和处理能力有限。esProc SPL 是一款强大的开源计算引擎,能够高效解析 JSON 数据,并支持复杂的过滤、分组、连接等操作。它不仅兼容多种数据源,如 RESTful、ElasticSearch、MongoDB 和 Kafka,还提供了游标对象处理大数据流,支持与 Java 应用无缝集成,实现灵活的业务逻辑处理。

json 是个好东西,它可以使用公共的文本形式承载了丰富的结构化数据的信息。现代很多技术都在喜欢使用 json 作为数据传输格式,比如 Elastic Search,Restful,Kafka 等,Mongodb 这类对性能较在意的技术则使用了二进制化的 json。

结构化的数据常常是批量的,也常常是需要再计算的。
但是,json 相关的类库却没什么方便用来做计算的。jsonpath 解析 json 没问题,却没什么计算能力,简单的过滤聚合还可以,稍复杂到分组汇总就不灵了,基本上是靠自己硬编码完成了。
写进数据库来算?这也太沉重了。何况,json 经常是多层的结构化数据,写进关系数据库要建几个关联的表,入库的成本远高于计算本身了。

esProc SPL 来帮你。
esProc SPL 是纯 Java 开发的开源计算引擎,可前往乾学院了解更多!

esProc SPL 对 json 库进行了封装,一句话就可以把 json 文本解析成可计算的 SPL 序表(SPL 的内存结构化数据对象):
QQ_1732170185196.png

SPL 序表天然多层结构,即字段取值可以是另一个序表,这和 json 天然契合:

一旦转换成 SPL 序表之后,计算本身就是 esProc 的强项了。过滤、分组、连接都不在话下,大部分计算目标都可以一句话完成:

Filter:T.select(Amount>1000 && Amount<=3000 && like(Client,"*s*"))
Sort:T.sort(Client,-Amount)
Distinct:T.id(Client)
Group:T.groups(year(OrderDate);sum(Amount))
Joinjoin(T1:O,SellerId; T2:E,EId)
TopN:T.top(-3;Amount)
TopN in group:T.groups(Client;top(3,Amount))
AI 代码解读

这些内容很多,这里就不展开了,感兴趣的小伙伴可以到 esProc SPL 的官网去参考相关资料。

esProc SPL 已经封装了很多常见的 json 数据源的访问接口。
Restful:纯文本式的 json,计算完了还可以反向生成 json 文本
QQ_1732170343869.png

Elastic Search:可以直接在 SPL 代码中写 json 常数后参与传输和计算
QQ_1732170604405.png

Mongodb:二进制化的 json 也没问题
QQ_1732170644028.png

Kafka:SPL 也封装了向这些数据源写出的接口,形成 IO 闭环
QQ_1732170686266.png

对于 Mongodb,Kafka 这类可能返回大数据量的数据源,esProc SPL 还提供游标对象和方法,可以逐步读取,边读边处理。这里就不详细举例了,小伙伴也可以去官网查阅资料。

通常 json 数据不会单独存在,还会和其它数据源交换数据以及混合计算。esProc SPL 当然也不只专门为了对付 json 而发明的,它是专业的计算引擎,能支持的数据源非常丰富:

a4778238f6bea06894a4cf6b04fe54cf_a961325c8f484e61913e5ed41a45bf7a_clipboard.png

这些数据源都有被 SPL 读成序表和游标,再实现混合计算以及交换数据就非常容易了。

那么,esProc SPL 写出来的代码如何集成到应用程序中呢?
很简单,esProc 提供了标准的 JDBC 驱动,被 Java 程序引入后,就可以使用 SPL 语句了,和调用数据库 SQL 一样。

Class.forName("com.esproc.jdbc.InternalDriver");
Connection conn =DriverManager.getConnection("jdbc:esproc:local://");
Statement statement = conn.createStatement();
ResultSet result = statement.executeQuery("=json(file(\"Orders.csv\")).select(Amount>1000 && like(Client,\"*s*\")
AI 代码解读

较复杂的 SPL 脚本可以存成文件,然后就像调用存储过程一样:

Class.forName("com.esproc.jdbc.InternalDriver");
Connection conn =DriverManager.getConnection("jdbc:esproc:local://");
CallableStatement statement = conn.prepareCall("call queryOrders()");
statement.execute();
AI 代码解读

作为纯 Java 开发的软件,esProc SPL 可以完全无缝地集成进 Java 应用中,就和应用程序员自己写的代码一样,一起享受成熟 Java 框架的优势。SPL 本身有完善的流程控制语句,像 for 循环,if 分支都不在话下,还支持子程序调用。只用 SPL 就能实现非常复杂的业务逻辑,直接构成完整的业务单元,不需要上层 Java 代码来配合,主程序只要简单地调用 SPL 脚本就可以了。

将 SPL 脚本存储成文件,置于主应用程序之外,代码修改可以独立进行且立即生效,不像 Java 代码在修改代码后还要重新编译,整个应用都要停机重启。这样可以做到业务逻辑的热切换,特别适合支持变化频繁的业务,而这也是 json 广泛应用的地方。

目录
打赏
0
1
1
0
73
分享
相关文章
实时计算 Flink版产品使用合集之要将收集到的 MongoDB 数据映射成 JSON 对象而非按字段分割,该怎么操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
109 1
|
10月前
|
【文档数据库】ES和MongoDB的对比
【文档数据库】ES和MongoDB的对比
589 1
实时计算 Flink版产品使用合集之读取 Kafka 和 MongoDB 的 Managed Memory 使用情况不同是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
DataWorks产品使用合集之DataWorks将 MongoDB 中的数组类型写入到 DataWorks 的单个字段时,表示为字符串格式而非 JSON 格式如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
102 3
MaxCompute产品使用合集之如何将JSON格式数据同步到MongoDB
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
MongoDB的文档存储格式BSON和JSON的区别
MongoDB的文档存储格式BSON和JSON的区别
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
265 1
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
155 1
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
511 9

数据库

+关注
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等