流式计算常见模块用法说明

简介: StreamingPro有非常多的模块可以直接在配置文件中使用,本文主要针对流式计算中涉及到的模块。

说明

Kafka Compositor
{
   "name": "streaming.core.compositor.spark.streaming.source.KafkaStreamingCompositor",
   "params": [{
                 "topics":"your topic",
                 "metadata.broker.list":"brokers",
                 "auto.offset.reset": "smallest|largest"
             }]
}

参数说明:
ad736470e2e494d0c7429dd9bd7368d35830ff5d

MockInputStreamCompositor

模拟数据源,主要为了方便测试。
{
        "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamCompositor",
        "params": [{
                      "batch-1":["1","2","3"],
                      "batch-2":["1","2","3"],
                      "batch-3":["1","2","3"],
                      "batch-4":["1","2","3"]
                  }]
}

MockInputStreamFromPathCompositor

模拟数据源,主要为了方便测试。可以接入一个外部文件作为mock数据
{
        "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamFromPathCompositor",
        "params": [{"path":"file:///tmp/test.txt"}]
}

SingleColumnJSONCompositor

把一条日志转化一个单列的json文件。
{
        "name": "streaming.core.compositor.spark.streaming.transformation.SingleColumnJSONCompositor",
        "params": [{
            "name": "a"
          }]
}
params.name 则是列名,方便后续的sql使用。

ScalaMapToJSONCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.ScalaMapToJSONCompositor",
        "params": [{}]
}
可以把scala Map转化为JSon

JavaMapToJSONCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.JavaMapToJSONCompositor",
        "params": [{}]
}
可以把java Map转化为JSon

FlatJSONCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.FlatJSONCompositor",
        "params": [{"a":"$['store']['book'][0]['title']"}]
}
从JSON里抽取字段,映射到新的列名上。主要是对复杂JSON结构进行扁平化。语法参考该库JsonPath

NginxParserCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.NginxParserCompositor",
        "params": [{"time":0,"url":1}]
}
Nginx 日志解析工具,按位置给列进行命名。

SQLCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
        "params": [
          {
            "sql": "select a, \"5\" as b from test",
            "outputTableName": "test2"
          }
        ]
}
29f3d4850060dc86e1c3f3c8209e3191f6de5344

SQLESOutputCompositor

将数据存储到ES中
{
        "name":"streaming.core.compositor.spark.streaming.output.SQLESOutputCompositor",
        "params":[
          {
            "es.nodes":"",
            "es.resource":"",
            "es.mapping.include":"",
            "timeFormat":"yyyyMMdd"
          }
        ]
}
bbaa3b03aedfd8da7fbdff7a85608bb8242e6a27

SQLPrintOutputCompositor(output)

{
        "name": "streaming.core.compositor.spark.streaming.output.SQLPrintOutputCompositor",
        "params": [{}]
}
把处理结果打印到终端控制台。主要是为了调试使用

JSONTableCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.JSONTableCompositor",
        "params": [{
            "tableName": "test"
          }]
}
把字符串(JSON格式)的数据注册成一张表。 params.tableName可以让你指定表名。

ConsoleOutputCompositor

{
        "name": "streaming.core.compositor.spark.streaming.output.ConsoleOutputCompositor",
        "params": [{ }]
}
控制台打印,非SQL类。

SQLCSVOutputCompositor

{
        "name": "streaming.core.compositor.spark.streaming.output.SQLCSVOutputCompositor",
        "params": [{
  "path":"",
  "mode":""
 }]
}
6c6e17ffa2b049a7de3a0fe6f75afac4c5bea3b4
作为CSV 输出,需要前面是一张表。

SQLParquetOutputCompositor

{
        "name": "streaming.core.compositor.spark.streaming.output.SQLParquetOutputCompositor",
        "params": [{
  "path":"",
  "mode":""
 }]
}
0446e334ab675414af70d6eff2e9fe6ff695b8d9
作为parquet 输出,需要前面是一张表。
目录
相关文章
|
存储 缓存 监控
缓存击穿、缓存穿透、缓存雪崩 3大问题,如何彻底解决?
【10月更文挑战第8天】在分布式系统中,缓存的使用极大地提高了系统的性能和响应速度。然而,缓存击穿、缓存穿透和缓存雪崩是三个常见的缓存相关问题,它们可能导致系统性能下降,甚至引发系统崩溃。本文将深入探讨这三个问题的成因、影响以及彻底的解决方案。
2142 1
|
4月前
|
人工智能 安全 Java
Spring AI 核心架构解析:构建企业级 AI 应用的 Java 新范式
Spring AI 为 Java 开发者提供企业级 AI 应用新范式,通过分层架构、统一抽象(如 ChatClient、PromptTemplate)与 Spring 生态深度集成,支持 RAG、函数调用、流式响应等核心功能,实现安全、可观测、可维护的智能系统构建。
1180 8
|
存储 人工智能 Serverless
人生剧本模拟器?一键穿越,动画片里也有你!
通过AI技术,只需一张图片和几个关键词,即可生成以自己为主角的剧本并制作成动画片。这项技术简化了创作流程,降低了技术门槛,使每个人都能成为创作者。无论是创意故事、珍贵瞬间还是专属动画,所有人物和故事都由你定义。借助函数计算FC构建Web服务,结合百炼模型服务和ComfyUI工具,实现从剧本撰写到视频合成的一站式自动化流程。点击“阅读原文”参与活动,免费生成你的专属动画,还有礼物和互动奖励等你来拿!
456 11
|
XML Java 数据库连接
Mybatis实现RBAC权限模型查询
通过对RBAC权限模型的理解和MyBatis的灵活使用,我们可以高效地实现复杂的权限管理功能,为应用程序的安全性和可维护性提供有力支持。
425 5
数电模电(三) 时序电路触发器 基本RS触发器 同步RS触发器 主从RS触发器 JK触发器 主从D触发器
数电模电(三) 时序电路触发器 基本RS触发器 同步RS触发器 主从RS触发器 JK触发器 主从D触发器
641 0
|
传感器 前端开发 JavaScript
前端开发者必备的VS Code插件推荐
前端开发者必备的VS Code插件推荐
|
Android开发
Android Mediatek USB 核心驱动中增加设备 PID/VID 检查
Android Mediatek USB 核心驱动中增加设备 PID/VID 检查
543 0
|
算法 C++ UED
QML布局:如何恰当设置间隙与合理布局 (QML Layout: Proper Spacing and Alignment)
QML布局:如何恰当设置间隙与合理布局 (QML Layout: Proper Spacing and Alignment)
2682 0
|
存储 搜索推荐 算法
Flink / Scala - DataStream Broadcast State 模式示例详解
一.引言 上一篇文章Flink / Scala - DataSet 应用 Broadcast Variables介绍了 DataSet 场景下 Broadcast 的使用,这一边
768 0
Flink / Scala - DataStream Broadcast State 模式示例详解
|
数据采集 Java Python
Java调用Python爬虫
用java调用python的爬虫程序,是一件很有意思的事情, 但解决方法大多不靠谱,作者花了两天的时间,动手实践,最终完全解决了问题 java-python Java调用Python爬虫需要解决的问题: 参数传递问题 由python脚本通过, sys.
1744 0

热门文章

最新文章