流式计算常见模块用法说明

简介: StreamingPro有非常多的模块可以直接在配置文件中使用,本文主要针对流式计算中涉及到的模块。

说明

Kafka Compositor
{
   "name": "streaming.core.compositor.spark.streaming.source.KafkaStreamingCompositor",
   "params": [{
                 "topics":"your topic",
                 "metadata.broker.list":"brokers",
                 "auto.offset.reset": "smallest|largest"
             }]
}

参数说明:
ad736470e2e494d0c7429dd9bd7368d35830ff5d

MockInputStreamCompositor

模拟数据源,主要为了方便测试。
{
        "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamCompositor",
        "params": [{
                      "batch-1":["1","2","3"],
                      "batch-2":["1","2","3"],
                      "batch-3":["1","2","3"],
                      "batch-4":["1","2","3"]
                  }]
}

MockInputStreamFromPathCompositor

模拟数据源,主要为了方便测试。可以接入一个外部文件作为mock数据
{
        "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamFromPathCompositor",
        "params": [{"path":"file:///tmp/test.txt"}]
}

SingleColumnJSONCompositor

把一条日志转化一个单列的json文件。
{
        "name": "streaming.core.compositor.spark.streaming.transformation.SingleColumnJSONCompositor",
        "params": [{
            "name": "a"
          }]
}
params.name 则是列名,方便后续的sql使用。

ScalaMapToJSONCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.ScalaMapToJSONCompositor",
        "params": [{}]
}
可以把scala Map转化为JSon

JavaMapToJSONCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.JavaMapToJSONCompositor",
        "params": [{}]
}
可以把java Map转化为JSon

FlatJSONCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.FlatJSONCompositor",
        "params": [{"a":"$['store']['book'][0]['title']"}]
}
从JSON里抽取字段,映射到新的列名上。主要是对复杂JSON结构进行扁平化。语法参考该库JsonPath

NginxParserCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.NginxParserCompositor",
        "params": [{"time":0,"url":1}]
}
Nginx 日志解析工具,按位置给列进行命名。

SQLCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
        "params": [
          {
            "sql": "select a, \"5\" as b from test",
            "outputTableName": "test2"
          }
        ]
}
29f3d4850060dc86e1c3f3c8209e3191f6de5344

SQLESOutputCompositor

将数据存储到ES中
{
        "name":"streaming.core.compositor.spark.streaming.output.SQLESOutputCompositor",
        "params":[
          {
            "es.nodes":"",
            "es.resource":"",
            "es.mapping.include":"",
            "timeFormat":"yyyyMMdd"
          }
        ]
}
bbaa3b03aedfd8da7fbdff7a85608bb8242e6a27

SQLPrintOutputCompositor(output)

{
        "name": "streaming.core.compositor.spark.streaming.output.SQLPrintOutputCompositor",
        "params": [{}]
}
把处理结果打印到终端控制台。主要是为了调试使用

JSONTableCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.JSONTableCompositor",
        "params": [{
            "tableName": "test"
          }]
}
把字符串(JSON格式)的数据注册成一张表。 params.tableName可以让你指定表名。

ConsoleOutputCompositor

{
        "name": "streaming.core.compositor.spark.streaming.output.ConsoleOutputCompositor",
        "params": [{ }]
}
控制台打印,非SQL类。

SQLCSVOutputCompositor

{
        "name": "streaming.core.compositor.spark.streaming.output.SQLCSVOutputCompositor",
        "params": [{
  "path":"",
  "mode":""
 }]
}
6c6e17ffa2b049a7de3a0fe6f75afac4c5bea3b4
作为CSV 输出,需要前面是一张表。

SQLParquetOutputCompositor

{
        "name": "streaming.core.compositor.spark.streaming.output.SQLParquetOutputCompositor",
        "params": [{
  "path":"",
  "mode":""
 }]
}
0446e334ab675414af70d6eff2e9fe6ff695b8d9
作为parquet 输出,需要前面是一张表。
目录
相关文章
|
1月前
|
Java
ETL工具 Kettle 中 kettle循环传递变量_(最简单的方法)
本文详细介绍了如何在Kettle工具中使用循环传递变量,通过示例展示了如何将movies表数据按月插入到ods_movies表,涉及新建转换、获取变量、作业配置和执行,呈现了一个嵌套作业结构.
76 3
|
4月前
|
SQL 运维 监控
SLS 数据加工全面升级,集成 SPL 语法
在系统开发、运维过程中,日志是最重要的信息之一,其最大的优点是简单直接。SLS 数据加工功能旨在解决非结构化的日志数据处理,当前全面升级,集成 SPL 语言、更强的数据处理性能、更优的使用成本。
18174 138
|
1月前
|
SQL 关系型数据库 MySQL
ETL工具 Kettle 中怎么通过变量传参
ETL工具 Kettle 中怎么通过变量传参
24 0
|
3月前
|
SQL 数据采集 JSON
使用对比!SLS 数据加工 SPL 与旧版 DSL 场景对照
本文讨论在不同的数据处理需求中,新版数据加工 SPL 与旧版数据加工 DSL 的使用对照。
7417 11
|
3月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
消息中间件 网络协议 大数据
[flink 实时流基础]源算子和转换算子
[flink 实时流基础]源算子和转换算子
|
6月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之采集选择增量(latest)读取模式,是否可以使用动态加载表功能
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如果窗口中没有数据,但是想要在UDAF中输出一个默认值,该怎么操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
传感器 JSON Java
流计算中的流式图处理是什么?请解释其作用和常用操作。
流计算中的流式图处理是什么?请解释其作用和常用操作。
59 0
|
6月前
|
消息中间件 分布式计算 Java
流计算与批处理的区别是什么?请举例说明。
流计算与批处理的区别是什么?请举例说明。
108 0