StreamingPro 简化流式计算配置

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 前些天可以让批处理的配置变得更优雅StreamingPro 支持多输入,多输出配置,现在流式计算也支持相同的配置方式了。另外未来等另外一个项目稳定,会释放出来配合StreamingPro使用,它可以让你很方便的读写HBase,比如可以为HBase 表 添加mapping,类似ES的做法,也可以不用mapping,系统会自动为你创建列(familly:column作为列名),或者将所有列合并成一个字段让你做处理。
前言
前些天可以让批处理的配置变得更优雅 StreamingPro 支持多输入,多输出配置,现在流式计算也支持相同的配置方式了。

另外未来等另外一个项目稳定,会释放出来配合StreamingPro使用,它可以让你很方便的读写HBase,比如可以为HBase 表 添加mapping,类似ES的做法,也可以不用mapping,系统会自动为你创建列(familly:column作为列名),或者将所有列合并成一个字段让你做处理。


配置

首先需要配置源:

{
        "name": "stream.sources.kafka",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "topics":"test",
            "zk":"127.0.0.1",
            "groupId":"kk3",
            "outputTable": "abc"

          }
        ]
      }

我们配置了一个Kafka流,一个普通的CSV文件。目前StreamingPro只允许配置一个Kafka流,但是支持多个topic,按逗号分隔即可。你可以配置多个其他非流式源,比如从MySQL,Parquet,CSV同时读取数据并且映射成表。

之后你就可以写SQL进行处理了。

{
        "name": "stream.sql",
        "params": [
          {
            "sql": "select abc.content,'abc' as dd from abc left join test on test.content = abc.content",
            "outputTableName": "finalOutputTable"
          }
        ]
      },

我这里做了简单的join。

{
        "name": "stream.outputs",
        "params": [
          {
            "format": "jdbc",
            "path": "-",
            "driver":"com.mysql.jdbc.Driver",
            "url":"jdbc:mysql://127.0.0.1/~?characterEncoding=utf8",
            "inputTableName": "finalOutputTable",
            "user":"~",
            "password":"~",
            "dbtable":"aaa",
            "mode":"Append"
          }
        ]
      }
然后把数据追加到Mysql里去。其实你也可以配置多个输出。


完整配置

{
  "example": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "stream.sources.kafka",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "topics":"test",
            "zk":"127.0.0.1",
            "groupId":"kk3",
            "outputTable": "abc"

          }
        ]
      },
      {
        "name": "stream.sql",
        "params": [
          {
            "sql": "select abc.content,'abc' as dd from abc left join test on test.content = abc.content",
            "outputTableName": "finalOutputTable"
          }
        ]
      },
      {
        "name": "stream.outputs",
        "params": [
          {
            "format": "jdbc",
            "path": "-",
            "driver":"com.mysql.jdbc.Driver",
            "url":"jdbc:mysql://127.0.0.1/~?characterEncoding=utf8",
            "inputTableName": "finalOutputTable",
            "user":"~",
            "password":"~",
            "dbtable":"aaa",
            "mode":"Append"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}
你可以在 StreamingPro-0.4.11 下载到包,然后用命令启动:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-0.4.11-SNAPSHOT-online-1.6.1-jar-with-dependencies.jar    \
-streaming.name test    \
-streaming.platform spark \
-streaming.job.file.path file://$SHome/batch.json
目录
相关文章
|
定位技术
基于Leaflet的轨迹模拟回放
在gis场景中,轨迹回放是一个很常见的场景。比如在导航软件中,在行程结束后根据运动轨迹生成运动图,在keep软件中,会自动记录用户的锻炼轨迹,在结束后可以看到自己的运动轨迹等等。这些需求都是设备在运行中自动记录相应的点,在结合gis模拟即可。
1547 0
基于Leaflet的轨迹模拟回放
|
机器学习/深度学习 虚拟化 数据中心
NVIDIA T4和A10:不同应用场景下的GPU加速器选择
在数据中心和云计算领域,GPU加速器扮演着至关重要的角色。NVIDIA T4和A10是两款适用于不同应用场景的GPU加速器。本文将比较它们的性能和适用场景,帮助读者更好地选择适合自己需求的GPU实例。
6635 0
|
10月前
|
搜索推荐 API 开发者
京东商品视频数据接口(JD.item_video)丨京东 API 接口指南
京东商品视频数据接口(JD.item_video)是京东开放平台提供的API,开发者可通过指定商品ID(num_iid)获取商品视频资源,用于丰富电商平台展示、提升用户体验。该接口适用于电商平台建设、商品推荐系统、市场研究与竞品分析及价格监测平台等场景,帮助用户更直观了解商品,提高购买转化率。示例代码展示了如何使用Python调用此接口并解析返回的JSON数据。
482 16
|
10月前
|
搜索推荐 C++
【C++数据结构——内排序】快速排序(头歌实践教学平台习题)【合集】
快速排序是一种高效的排序算法,基于分治策略。它的主要思想是通过选择一个基准元素(pivot),将数组划分成两部分。一部分的元素都小于等于基准元素,另一部分的元素都大于等于基准元素。然后对这两部分分别进行排序,最终使整个数组有序。(第一行是元素个数,第二行是待排序的原始关键字数据。本关任务:实现快速排序算法。开始你的任务吧,祝你成功!
259 7
|
前端开发 Docker 容器
主机host服务器和Docker容器之间的文件互传方法汇总
Docker 成为前端工具,可实现跨设备兼容。本文介绍主机与 Docker 容器/镜像间文件传输的三种方法:1. 构建镜像时使用 `COPY` 或 `ADD` 指令;2. 启动容器时使用 `-v` 挂载卷;3. 运行时使用 `docker cp` 命令。每种方法适用于不同场景,如静态文件打包、开发时文件同步及临时文件传输。注意权限问题、容器停止后的文件传输及性能影响。
3237 1
|
数据采集 小程序 JavaScript
大家买的阿里云服务器干什么用?能为大家带来什么作用
现在到处看到各种云服务,云计算,感觉进入了云时代 最近腾讯和阿里云各种优惠活动,一年只需要86块钱,很多人都买了自己的服务器 他们买服务器干什么 服务器又能干些什么 不知道服务器买了可以干啥,怕跟着买了却白花了钱,放到那里吃灰 所以我花时间给大家整理凑出十大用途 :
19370 1
|
人工智能 运维 Serverless
阿里云函数计算 GPU 宣布降价,最高幅度达 93%,阶梯计费越用越便宜!
自2023年9月1日0时起,阿里云函数计算的 GPU 使用量和函数调用次数宣布降价。本次降价幅度达到10%~93%,并且通过阶梯计费模式帮助客户在使用函数计算时,越用越便宜。
|
数据采集 存储 监控
ChatGPT工作提效之生成开发需求和报价单并转为Excel格式
ChatGPT工作提效之生成开发需求和报价单并转为Excel格式
604 0
|
分布式计算 Spark 容器
Spark Operator浅析
Spark Operator浅析 本文介绍Spark Operator的设计和实现相关的内容. Spark运行时架构 经过近几年的高速发展,分布式计算框架的架构逐渐趋同. 资源管理模块作为其中最通用的模块逐渐与框架解耦,独立成通用的组件.
|
存储 安全 新能源
IGBT工作原理和特性介绍
IGBT,Insulated Gate Bipolar Transistor,是由BJT(双极晶体管)和IGFET(Insulated Gate Field Effect Transistor)组成的复合全控电压驱动功率半导体器件。它兼有 MOSFET 的高输入阻抗和 GTR 的低导通压降的优点。
2978 0
IGBT工作原理和特性介绍