StreamingPro 简化流式计算配置

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 前些天可以让批处理的配置变得更优雅StreamingPro 支持多输入,多输出配置,现在流式计算也支持相同的配置方式了。另外未来等另外一个项目稳定,会释放出来配合StreamingPro使用,它可以让你很方便的读写HBase,比如可以为HBase 表 添加mapping,类似ES的做法,也可以不用mapping,系统会自动为你创建列(familly:column作为列名),或者将所有列合并成一个字段让你做处理。
前言
前些天可以让批处理的配置变得更优雅 StreamingPro 支持多输入,多输出配置,现在流式计算也支持相同的配置方式了。

另外未来等另外一个项目稳定,会释放出来配合StreamingPro使用,它可以让你很方便的读写HBase,比如可以为HBase 表 添加mapping,类似ES的做法,也可以不用mapping,系统会自动为你创建列(familly:column作为列名),或者将所有列合并成一个字段让你做处理。


配置

首先需要配置源:

{
        "name": "stream.sources.kafka",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "topics":"test",
            "zk":"127.0.0.1",
            "groupId":"kk3",
            "outputTable": "abc"

          }
        ]
      }
AI 代码解读

我们配置了一个Kafka流,一个普通的CSV文件。目前StreamingPro只允许配置一个Kafka流,但是支持多个topic,按逗号分隔即可。你可以配置多个其他非流式源,比如从MySQL,Parquet,CSV同时读取数据并且映射成表。

之后你就可以写SQL进行处理了。

{
        "name": "stream.sql",
        "params": [
          {
            "sql": "select abc.content,'abc' as dd from abc left join test on test.content = abc.content",
            "outputTableName": "finalOutputTable"
          }
        ]
      },
AI 代码解读

我这里做了简单的join。

{
        "name": "stream.outputs",
        "params": [
          {
            "format": "jdbc",
            "path": "-",
            "driver":"com.mysql.jdbc.Driver",
            "url":"jdbc:mysql://127.0.0.1/~?characterEncoding=utf8",
            "inputTableName": "finalOutputTable",
            "user":"~",
            "password":"~",
            "dbtable":"aaa",
            "mode":"Append"
          }
        ]
      }
AI 代码解读
然后把数据追加到Mysql里去。其实你也可以配置多个输出。


完整配置

{
  "example": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "stream.sources.kafka",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "topics":"test",
            "zk":"127.0.0.1",
            "groupId":"kk3",
            "outputTable": "abc"

          }
        ]
      },
      {
        "name": "stream.sql",
        "params": [
          {
            "sql": "select abc.content,'abc' as dd from abc left join test on test.content = abc.content",
            "outputTableName": "finalOutputTable"
          }
        ]
      },
      {
        "name": "stream.outputs",
        "params": [
          {
            "format": "jdbc",
            "path": "-",
            "driver":"com.mysql.jdbc.Driver",
            "url":"jdbc:mysql://127.0.0.1/~?characterEncoding=utf8",
            "inputTableName": "finalOutputTable",
            "user":"~",
            "password":"~",
            "dbtable":"aaa",
            "mode":"Append"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}
AI 代码解读
你可以在 StreamingPro-0.4.11 下载到包,然后用命令启动:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-0.4.11-SNAPSHOT-online-1.6.1-jar-with-dependencies.jar    \
-streaming.name test    \
-streaming.platform spark \
-streaming.job.file.path file://$SHome/batch.json
AI 代码解读
目录
打赏
0
0
0
0
110
分享
相关文章
构建高效的数据流处理系统
【9月更文挑战第32天】本文将带你进入数据流处理的奇妙世界,探讨如何构建一个既高效又可靠的数据处理系统。我们将通过一个简单的例子来演示如何从概念到实现,一步步打造你的数据流处理系统。无论你是初学者还是有经验的开发者,这篇文章都将为你提供有价值的见解和技巧。
76 2
Gulp:高效构建流程中的流式处理利器
【10月更文挑战第13天】Gulp:高效构建流程中的流式处理利器
73 0
通过Google Dataflow,我们能够构建一个高效、可扩展且易于维护的实时数据处理系统
【9月更文挑战第7天】随着大数据时代的到来,企业对高效数据处理的需求日益增加,特别是在实时分析和事件驱动应用中。Google Dataflow作为Google Cloud Platform的一项服务,凭借其灵活、可扩展的特点,成为实时大数据处理的首选。本文将介绍Dataflow的基本概念、优势,并通过一个电商日志分析的实际案例和示例代码,展示如何构建高效的数据处理管道。Dataflow不仅支持自动扩展和高可用性,还提供了多种编程语言支持和与GCP其他服务的紧密集成,简化了整个数据处理流程。通过Dataflow,企业可以快速响应业务需求,优化用户体验。
172 3
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
【天衍系列 05】Flink集成KafkaSink组件:实现流式数据的可靠传输 & 高效协同
【天衍系列 05】Flink集成KafkaSink组件:实现流式数据的可靠传输 & 高效协同
461 5
Mysql基础第十四天,使用数据处理函数
Mysql基础第十四天,使用数据处理函数
87 0
构建与应用大数据环境:从搭建到开发与组件使用的全面指南
构建与应用大数据环境:从搭建到开发与组件使用的全面指南
399 0