流式计算常见模块用法说明

简介: StreamingPro有非常多的模块可以直接在配置文件中使用,本文主要针对流式计算中涉及到的模块。

说明

Kafka Compositor
{
   "name": "streaming.core.compositor.spark.streaming.source.KafkaStreamingCompositor",
   "params": [{
                 "topics":"your topic",
                 "metadata.broker.list":"brokers",
                 "auto.offset.reset": "smallest|largest"
             }]
}

参数说明:
ad736470e2e494d0c7429dd9bd7368d35830ff5d

MockInputStreamCompositor

模拟数据源,主要为了方便测试。
{
        "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamCompositor",
        "params": [{
                      "batch-1":["1","2","3"],
                      "batch-2":["1","2","3"],
                      "batch-3":["1","2","3"],
                      "batch-4":["1","2","3"]
                  }]
}

MockInputStreamFromPathCompositor

模拟数据源,主要为了方便测试。可以接入一个外部文件作为mock数据
{
        "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamFromPathCompositor",
        "params": [{"path":"file:///tmp/test.txt"}]
}

SingleColumnJSONCompositor

把一条日志转化一个单列的json文件。
{
        "name": "streaming.core.compositor.spark.streaming.transformation.SingleColumnJSONCompositor",
        "params": [{
            "name": "a"
          }]
}
params.name 则是列名,方便后续的sql使用。

ScalaMapToJSONCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.ScalaMapToJSONCompositor",
        "params": [{}]
}
可以把scala Map转化为JSon

JavaMapToJSONCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.JavaMapToJSONCompositor",
        "params": [{}]
}
可以把java Map转化为JSon

FlatJSONCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.FlatJSONCompositor",
        "params": [{"a":"$['store']['book'][0]['title']"}]
}
从JSON里抽取字段,映射到新的列名上。主要是对复杂JSON结构进行扁平化。语法参考该库JsonPath

NginxParserCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.NginxParserCompositor",
        "params": [{"time":0,"url":1}]
}
Nginx 日志解析工具,按位置给列进行命名。

SQLCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
        "params": [
          {
            "sql": "select a, \"5\" as b from test",
            "outputTableName": "test2"
          }
        ]
}
29f3d4850060dc86e1c3f3c8209e3191f6de5344

SQLESOutputCompositor

将数据存储到ES中
{
        "name":"streaming.core.compositor.spark.streaming.output.SQLESOutputCompositor",
        "params":[
          {
            "es.nodes":"",
            "es.resource":"",
            "es.mapping.include":"",
            "timeFormat":"yyyyMMdd"
          }
        ]
}
bbaa3b03aedfd8da7fbdff7a85608bb8242e6a27

SQLPrintOutputCompositor(output)

{
        "name": "streaming.core.compositor.spark.streaming.output.SQLPrintOutputCompositor",
        "params": [{}]
}
把处理结果打印到终端控制台。主要是为了调试使用

JSONTableCompositor

{
        "name": "streaming.core.compositor.spark.streaming.transformation.JSONTableCompositor",
        "params": [{
            "tableName": "test"
          }]
}
把字符串(JSON格式)的数据注册成一张表。 params.tableName可以让你指定表名。

ConsoleOutputCompositor

{
        "name": "streaming.core.compositor.spark.streaming.output.ConsoleOutputCompositor",
        "params": [{ }]
}
控制台打印,非SQL类。

SQLCSVOutputCompositor

{
        "name": "streaming.core.compositor.spark.streaming.output.SQLCSVOutputCompositor",
        "params": [{
  "path":"",
  "mode":""
 }]
}
6c6e17ffa2b049a7de3a0fe6f75afac4c5bea3b4
作为CSV 输出,需要前面是一张表。

SQLParquetOutputCompositor

{
        "name": "streaming.core.compositor.spark.streaming.output.SQLParquetOutputCompositor",
        "params": [{
  "path":"",
  "mode":""
 }]
}
0446e334ab675414af70d6eff2e9fe6ff695b8d9
作为parquet 输出,需要前面是一张表。
目录
相关文章
|
5月前
|
存储 Go
Go 浅析主流日志库:从设计层学习如何集成日志轮转与切割功能
本文将探讨几个热门的 go 日志库如 logrus、zap 和官网的 slog,我将分析这些库的的关键设计元素,探讨它们是如何支持日志轮转与切割功能的配置。
271 0
Go 浅析主流日志库:从设计层学习如何集成日志轮转与切割功能
|
6天前
|
Java
ETL工具 Kettle 中 kettle循环传递变量_(最简单的方法)
本文详细介绍了如何在Kettle工具中使用循环传递变量,通过示例展示了如何将movies表数据按月插入到ods_movies表,涉及新建转换、获取变量、作业配置和执行,呈现了一个嵌套作业结构.
31 3
|
5天前
|
JSON 数据处理 Go
一文教会你如何使用 iLogtail SPL 处理日志
iLogtail 作为日志、时序数据采集器,在 2.0 版本中,全面支持了 SPL 。本文对处理插件进行了梳理,介绍了如何编写 SPL 语句,从插件处理模式迁移到 2.0 版本的 SPL 处理模式,帮助用户实现更加灵活的端上数据处理。
|
13天前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
22 0
|
13天前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
31 0
|
2月前
|
SQL 数据采集 JSON
使用对比!SLS 数据加工 SPL 与旧版 DSL 场景对照
本文讨论在不同的数据处理需求中,新版数据加工 SPL 与旧版数据加工 DSL 的使用对照。
7375 11
|
5月前
|
SQL 存储 监控
SLS 查询新范式:使用 SPL 对日志进行交互式探索
像 Unix 命令一样支持多级管道级联,像加工预览一样实时处理查询结果,更便捷的交互,更丰富的算子,更灵活的探索半结构化日志,快来试试使用 SPL 语言查询日志数据吧~
47119 10
|
5月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之采集选择增量(latest)读取模式,是否可以使用动态加载表功能
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
传感器 JSON Java
流计算中的流式图处理是什么?请解释其作用和常用操作。
流计算中的流式图处理是什么?请解释其作用和常用操作。
53 0
|
消息中间件 Java Kafka
Flink--4、DateStream API(执行环境、源算子、基本转换算子)
Flink--4、DateStream API(执行环境、源算子、基本转换算子)
Flink--4、DateStream API(执行环境、源算子、基本转换算子)