StreamingPro 支持多输入,多输出配置

简介: 最近正好有个需求,就是从不同的数据库以及表里拉出数据,经过一定的处理放到ES里供查询,最好还能放个到parquet里,这样可以支持更复杂的SQL。之前StreamingPro是只能配置一个数据源的,所以做了些改造,方便配置多个数据源,以及多个写出。
前言
最近正好有个需求,就是从不同的数据库以及表里拉出数据,经过一定的处理放到ES里供查询,最好还能放个到parquet里,这样可以支持更复杂的SQL。之前StreamingPro是只能配置一个数据源的,所以做了些改造,方便配置多个数据源,以及多个写出。

最新的下载地址: https://pan.baidu.com/s/1eRO5Wga 依然的,比较大,因为现在他还能支持Thrift JDBC /Rest SQL: 使用StreamingPro 快速构建Spark SQL on CarbonData


输入配置

{
        "name": "batch.sources",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test2",
            "header": "true"
          }
        ]
      },
以前用的是 batch.source, 如果你有多个输入源,则需要使用batch.sources 组件。每个源需要配置一个outputTable,也就是说这个源取个名字,方便后面使用。

如果是数据库,则可以这么写:
{
        "name": "batch.sources",
        "params": [
          {
             url:"jdbc:mysql://localhost/test?user=fred&password=secret",
            "dbtable":"table1",
            "driver":"com.mysql...",
            "path": "-",
            "format": "jdbc",
            "outputTable": "test",

          },
          {
            "path": "-",
            "format": "com.databricks.spark.csv",
            "outputTable": "test2",
            "header": "true"
          }
        ]
      },


输出

{
        "name": "batch.outputs",
        "params": [
          {
            "format": "json",
            "path": "file:///tmp/kk2",
            "inputTableName": "finalOutputTable"
          },
          {
            "format": "parquet",
            "path": "file:///tmp/kk3",
            "inputTableName": "finalOutputTable"
          }
        ]
      }
我这里同时输出为json以及parquet格式。


一个简单但是涉及点比较多的例子

{
  "convert-multi-csv-to-json": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "batch.sources",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test2",
            "header": "true"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select city as tp  from test limit 100",
            "outputTableName": "sqlTable"
          }
        ]
      },
      {
        "name": "batch.script",
        "params": [
          {
            "inputTableName": "sqlTable",
            "outputTableName": "scriptTable",
            "useDocMap": true
          },
          {
            "-": "val count = doc(\"tp\").toString.length;Map(\"count\"->count)"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select scriptTable.tp,scriptTable.count,test2.city,test2.name  from scriptTable,test2 limit 100",
            "outputTableName": "finalOutputTable"
          }
        ]
      },
      {
        "name": "batch.outputs",
        "params": [
          {
            "format": "json",
            "path": "file:///tmp/kk2",
            "inputTableName": "finalOutputTable"
          },
          {
            "format": "parquet",
            "path": "file:///tmp/kk3",
            "inputTableName": "finalOutputTable"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}
在 batch.sql 里你可以引用任何一个源的表,或者之前已经在batch.sql里申明的outputTable, 同理batch.script。 而在batch.outputs里,你则可以将任何一张表写入到MySQL,ES,HDFS等文件存储系统中。

将配置文件保存一下,然后就可以启动了:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-0.4.8-SNAPSHOT-online-1.6.1.jar    \
-streaming.name test    \
-streaming.platform spark \
-streaming.job.file.path file://$SHome/batch.json
目录
相关文章
|
弹性计算 Linux Shell
云服务器ECS 云盘缩容教程步骤来啦!
由于目前云服务器ECS不支持系统盘或者数据盘缩容,如果您有云盘缩容的需求,可用通过阿里云迁云工具达成目的。
3764 0
云服务器ECS 云盘缩容教程步骤来啦!
|
8天前
|
人工智能 JSON 自然语言处理
让教学更智慧:用阿里云百炼工作流,自动生成中小学教材内容#小有可为#有温度的AI
通过可视化工作流编排,将大模型推理能力转化为标准化的教学内容生成引擎。教师只需输入教材标题和适用学段,即可自动获得结构完整、符合课程标准的章节内容,大幅降低备课门槛,助力教育资源均衡化。
480 124
|
17天前
|
Linux 程序员 数据格式
【2026最新】Notepad++下载、安装和使用一篇搞定(附中文版安装包)
Notepad++ 是一款免费开源、轻量高效的 Windows 文本编辑器,支持 C/Python/HTML 等 80+ 语言语法高亮、代码折叠、正则替换、编码转换及插件扩展,专为程序员与文本处理用户打造,完美替代系统记事本。(239字)
|
4天前
|
人工智能 安全 Cloud Native
Higress 新发布:AI Gateway 能力增强,Gateway API 及其推理扩展持续打磨
增强 AI 网关能力,持续打磨 Gateway API 及其推理扩展。
315 124
|
12天前
|
机器学习/深度学习 人工智能 调度
🐴 HappyHorse 1.1 现已上线阿里云百炼!快来查收模型使用指南,现在调用享 6 折~
HappyHorse 1.1 是新一代视频生成大模型,全面升级动态表现力、角色一致性、指令遵循、视觉质感与音画协同能力。支持I2V/T2V/R2V三类生成,适配短剧、电商广告、品牌营销等场景,提供高质、流畅、可控的AI视频生产力。
799 5
🐴 HappyHorse 1.1 现已上线阿里云百炼!快来查收模型使用指南,现在调用享 6 折~
|
10天前
|
人工智能 定位技术 SEO
我学 GEO 第 15 天:终于知道AI GEO该如何做?
我是暴走的莉莉酱,边旅行边研究AI GEO的数字游民。专注普通人如何提升“AI可见度”——让AI在回答用户问题时准确识别、理解并推荐你。不讲玄学,只做可测、可调、可持续的GEO实践。
455 127
|
4天前
|
消息中间件 存储 Kafka
Kafka 原生消息入湖能力上线!一键打通实时流与数据湖
阿里云消息队列 Kafka 版正式上线原生消息入湖能力。
267 121