StreamingPro 简化流式计算配置

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 前些天可以让批处理的配置变得更优雅StreamingPro 支持多输入,多输出配置,现在流式计算也支持相同的配置方式了。另外未来等另外一个项目稳定,会释放出来配合StreamingPro使用,它可以让你很方便的读写HBase,比如可以为HBase 表 添加mapping,类似ES的做法,也可以不用mapping,系统会自动为你创建列(familly:column作为列名),或者将所有列合并成一个字段让你做处理。
前言
前些天可以让批处理的配置变得更优雅 StreamingPro 支持多输入,多输出配置,现在流式计算也支持相同的配置方式了。

另外未来等另外一个项目稳定,会释放出来配合StreamingPro使用,它可以让你很方便的读写HBase,比如可以为HBase 表 添加mapping,类似ES的做法,也可以不用mapping,系统会自动为你创建列(familly:column作为列名),或者将所有列合并成一个字段让你做处理。


配置

首先需要配置源:

{
        "name": "stream.sources.kafka",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "topics":"test",
            "zk":"127.0.0.1",
            "groupId":"kk3",
            "outputTable": "abc"

          }
        ]
      }

我们配置了一个Kafka流,一个普通的CSV文件。目前StreamingPro只允许配置一个Kafka流,但是支持多个topic,按逗号分隔即可。你可以配置多个其他非流式源,比如从MySQL,Parquet,CSV同时读取数据并且映射成表。

之后你就可以写SQL进行处理了。

{
        "name": "stream.sql",
        "params": [
          {
            "sql": "select abc.content,'abc' as dd from abc left join test on test.content = abc.content",
            "outputTableName": "finalOutputTable"
          }
        ]
      },

我这里做了简单的join。

{
        "name": "stream.outputs",
        "params": [
          {
            "format": "jdbc",
            "path": "-",
            "driver":"com.mysql.jdbc.Driver",
            "url":"jdbc:mysql://127.0.0.1/~?characterEncoding=utf8",
            "inputTableName": "finalOutputTable",
            "user":"~",
            "password":"~",
            "dbtable":"aaa",
            "mode":"Append"
          }
        ]
      }
然后把数据追加到Mysql里去。其实你也可以配置多个输出。


完整配置

{
  "example": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "stream.sources.kafka",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "topics":"test",
            "zk":"127.0.0.1",
            "groupId":"kk3",
            "outputTable": "abc"

          }
        ]
      },
      {
        "name": "stream.sql",
        "params": [
          {
            "sql": "select abc.content,'abc' as dd from abc left join test on test.content = abc.content",
            "outputTableName": "finalOutputTable"
          }
        ]
      },
      {
        "name": "stream.outputs",
        "params": [
          {
            "format": "jdbc",
            "path": "-",
            "driver":"com.mysql.jdbc.Driver",
            "url":"jdbc:mysql://127.0.0.1/~?characterEncoding=utf8",
            "inputTableName": "finalOutputTable",
            "user":"~",
            "password":"~",
            "dbtable":"aaa",
            "mode":"Append"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}
你可以在 StreamingPro-0.4.11 下载到包,然后用命令启动:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-0.4.11-SNAPSHOT-online-1.6.1-jar-with-dependencies.jar    \
-streaming.name test    \
-streaming.platform spark \
-streaming.job.file.path file://$SHome/batch.json
目录
相关文章
|
15天前
|
存储 NoSQL Java
流计算需要框架吗?SPL 可能是更好的选择
流数据源的动态无界特性使得传统数据库技术难以直接处理,而Heron、Samza、Storm、Spark、Flink等计算框架在流计算领域取得了先发优势。然而,这些框架往往侧重于访问能力,计算能力不足,尤其在高级计算如流批混算、复杂计算和高性能计算方面表现欠佳。esProc SPL作为基于JVM的轻量级开源计算类库,专注于提升流计算的计算能力,支持丰富的流数据访问、灵活的集成接口和高效的内外存存储格式,具备强大的高级计算功能,能够简化业务逻辑开发并适应多样的应用场景。SPL通过专业的计算语言和结构化数据处理能力,为流计算提供了更优的解决方案。
|
6月前
|
中间件 数据处理 Apache
|
3月前
|
消息中间件 监控 Kafka
构建高效的数据流处理系统
【9月更文挑战第32天】本文将带你进入数据流处理的奇妙世界,探讨如何构建一个既高效又可靠的数据处理系统。我们将通过一个简单的例子来演示如何从概念到实现,一步步打造你的数据流处理系统。无论你是初学者还是有经验的开发者,这篇文章都将为你提供有价值的见解和技巧。
62 2
|
3月前
|
前端开发 JavaScript API
Gulp:高效构建流程中的流式处理利器
【10月更文挑战第13天】Gulp:高效构建流程中的流式处理利器
57 0
|
4月前
|
SQL 监控 大数据
通过Google Dataflow,我们能够构建一个高效、可扩展且易于维护的实时数据处理系统
【9月更文挑战第7天】随着大数据时代的到来,企业对高效数据处理的需求日益增加,特别是在实时分析和事件驱动应用中。Google Dataflow作为Google Cloud Platform的一项服务,凭借其灵活、可扩展的特点,成为实时大数据处理的首选。本文将介绍Dataflow的基本概念、优势,并通过一个电商日志分析的实际案例和示例代码,展示如何构建高效的数据处理管道。Dataflow不仅支持自动扩展和高可用性,还提供了多种编程语言支持和与GCP其他服务的紧密集成,简化了整个数据处理流程。通过Dataflow,企业可以快速响应业务需求,优化用户体验。
116 3
|
6月前
|
弹性计算 负载均衡 中间件
|
8月前
|
消息中间件 监控 安全
【天衍系列 05】Flink集成KafkaSink组件:实现流式数据的可靠传输 & 高效协同
【天衍系列 05】Flink集成KafkaSink组件:实现流式数据的可靠传输 & 高效协同
405 5
|
Shell
添加批处理扩展
添加批处理扩展
53 1
|
数据可视化 关系型数据库 数据挖掘
集成架构」2020年最好的15个ETL工具(第三部)
集成架构」2020年最好的15个ETL工具(第三部)
|
数据可视化 关系型数据库 数据挖掘
「集成架构」2020年最好的15个ETL工具(第三部)
「集成架构」2020年最好的15个ETL工具(第三部)