StreamingPro添加Scala script 模块支持-阿里云开发者社区

开发者社区> 祝威廉> 正文

StreamingPro添加Scala script 模块支持

简介: SQL 在解析字符串方面,能力还是有限,因为支持的算子譬如substring,split等有限,且不具备复杂的流程表达能力。我们内部有个通过JSON描述的DSL引擎方便配置化解析,然而也有一定的学习时间成本。
+关注继续查看
SQL 在解析字符串方面,能力还是有限,因为支持的算子譬如substring,split等有限,且不具备复杂的流程表达能力。我们内部有个通过JSON描述的DSL引擎方便配置化解析,然而也有一定的学习时间成本。

我们当然可以通过SQL的 UDF函数等来完成字符串解析,在streamingpro中也很简单,只要注册下你的UDF函数库即可:

"udf_register": {
    "desc": "测试",
    "strategy": "....SparkStreamingRefStrategy",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "...SQLUDFCompositor",
        "params": [
          {
            "analysis": "streaming.core.compositor.spark.udf.func.MLFunctions"
          }
        ]
      }
    ]
  }

这样你就可以在SQL中使用MLfunctions里面所有的udf函数了。然而为此专门提供一个jar包也是略显麻烦。

这个时候如果能直接写脚本解析就好了,最好是能支持各种脚本,比如groovy,javascript,python,scala,java等。任何一个会编程的人都可以实现一个比较复杂的解析逻辑。

核心是ScriptCompositor模块:

{
        "name": "...ScriptCompositor",
        "params": [
          {
            "inputTableName": "test",
            "outputTableName": "test3"
          },
          {
            "raw": [
              "val Array(a,b)=rawLine.split(\"\t\");",
              "Map(\"a\"->a,\"b\"->b)"
            ]
          }
        ]
      }
如果我想在代码里直接处理所有的列,则如下:
{
        "name": "streaming.core.compositor.spark.transformation.ScriptCompositor",
        "params": [
          {
            "inputTableName": "test2",
            "outputTableName": "test3",
            "useDocMap": true
          },
          {
            "anykey": "val Array(a,b)=doc(\"raw\").toString.split(\"\t\");Map(\"a\"->a,\"b\"->b)"
          }
        ]
}
通过添加useDocMap为true,则你在代码里可以通过doc(doc是个Map[String,Any]) 来获取你想要的任何字段,然后形成一个新的Map。

如果你只要新生成Map里的字段,忽略掉旧的,则设置ignoreOldColumns=true 即可。

你可以把代码放到一个文件里,如下:
{
        "name": "....ScriptCompositor",
        "params": [
          {
            "inputTableName": "test",
            "outputTableName": "test3"
          },
          {
            "raw": "file:///tmp/raw_process.scala"
          }
        ]
      }
通过inputTableName指定输入的表,outputTableName作为输出结果表。 raw代表inputTableName中你需要解析的字段,然后通过你的scala脚本进行解析。在脚本中 rawLine 是固定的,对应raw字段(其他字段也是一样)的值。脚本只有一个要求,最后的返回结果暂时需要是个Map[String,Any]。

这里,你只是提供了一个map作为返回值,作为一行,然后以outputTableName指定的名字输出,作为下一条SQL的输入,所以StreamingPro需要推测出你的Schema。 数据量大到一定程度,推测Schema的效率就得不到保证,这个时候,你可以通过配置schema来提升性能:
{
        "name": "....ScriptCompositor",
        "params": [
          {
            "inputTableName": "test",
            "outputTableName": "test3",
            "schema": "file:///tmp/schema.scala",
            "useDocMap": true
          },
          {
            "raw": "file:///tmp/raw_process.scala"
          }
        ]
      }
schema.scala的内容大致如下:
Some(
StructType(
Array(
StructField("a", StringType, true),
StructField("b", StringType, true)))
)
后续roadmap是:
  1. 支持外部脚本,比如放在hdfs或者http服务器上。
  2. 支持java 脚本
  3. 支持javascript脚本
  4. 支持 python 脚本
  5. 支持 ruby脚本
  6. 支持 groovy 脚本

举个案例,从HDFS读取一个文件,并且映射为只有一个raw字段的表,接着通过ScriptCompositor配置的scala代码解析raw字段,展开成a,b两个字段,然后继续用SQL继续处理,最后输出。

{
  "convert_data_parquet": {
    "desc": "测试",
    "strategy": "...SparkStreamingStrategy",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "...SQLSourceCompositor",
        "params": [
          {
            "path": "file:///tmp/hdfsfile",
            "format": "org.apache.spark.sql.execution.datasources.hdfs",
            "fieldName": "raw"
          }
        ]
      },
      {
        "name": "...JSONTableCompositor",
        "params": [
          {
            "tableName": "test"
          }
        ]
      },
      {
        "name": "...ScriptCompositor",
        "params": [
          {
            "inputTableName": "test",
            "outputTableName": "test3"
          },
          {
            "raw": [
              "val Array(a,b)=rawLine.split(\"\t\");",
              "Map(\"a\"->a,\"b\"->b)"
            ]
          }
        ]
      },
      {
        "name": "...transformation.SQLCompositor",
        "params": [
          {
            "sql": "select a,b  from test3 "
          }
        ]
      },
      {
        "name": "...streaming.core.compositor.spark.output.SQLUnitTestCompositor",
        "params": [
          {
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}
体验地址: https://github.com/allwefantasy/streamingpro/blob/master/README.md#downloads

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
【ECS】好消息,弹性网卡支持添加辅助私网IP啦
背景 阿里云的ECS服务器,很早就支持了多网卡的挂载,卸载。近日我们又推出了新功能,每块弹性网卡又支持分配和回收多辅助私网IP地址啦。 实例列表页以及详情页支持管理主网卡的辅助私网IP 在实例列表页,在每一个实例的操作栏里点击 更多 -> 网络和安全组 -> 管理辅助私网IP,可看到,弹出了相应的...
4397 0
[教程]在CentOS7上配置 FTP服务器 Proftpd 支持 MySQL 虚拟用户加密认证以及磁盘限额(Quota)
本文软件采用 yum 安装,不需要编译,而且随时都可以跟随 CentOS 升级 Proftpd 到最新版本,以避免可能的漏洞攻击。利用 Proftpd 现成的配置以及设置好的各种模块,可以实现 sftp 和 ssh 的结合,完美的实现虚拟用户加密密码存放于数据库。
3876 0
Qt .pro文件之defineReplace函数的用法,实现lib文件名自动添加后缀“d“
Qt .pro文件之defineReplace函数的用法,实现lib文件名自动添加后缀“d“
37 0
怎么设置阿里云服务器安全组?阿里云安全组规则详细解说
阿里云服务器安全组设置规则分享,阿里云服务器安全组如何放行端口设置教程
8417 0
如何给JavaScript代码添加断点调试
之前打断点都只会看他自己出现的数据,而且一段时间不用总是找不到在哪里,今天就本着好脑袋不如赖笔头的想法来记录下浏览器断点的使用。本文以谷歌浏览器为准: 首先我们使用chrome浏览器打开我们需要调试的页面并按“F12”键打开“开发者工具”,如下图: 控制台显示网页中JS的各种输出信息,包括错误信息、用户日志等;打开脚本标签页,这里面才是我们想要的内容。
4445 0
VS 2010程序添加对System.Web.Script.Serialization的引用
开发JSON相关程序要用到System.Web.Script.Serialization。如何添加? 在C盘搜索中输入:System.Web.Extensions.dll 在解决方案中引用:System.
885 0
Dubbo 在跨语言和协议穿透性方向上的探索:支持 HTTP/2 gRPC 和 Protobuf
本文整理自刘军在 Dubbo 成都 meetup 上分享的《Dubbo 在多语言和协议穿透性方向上的探索》。 本文总体上可分为基础产品简介、Dubbo 对 gRPC (HTTP/2) 和 Protobuf 的支持及示例演示三部分,在简介部分介绍了 Dubbo、HTTP/2、gRPC、Protobuf 的基本概念和特点;第二部分介绍了 Dubbo 为何要支持 gRPC (HTTP/2) 和 P
678 0
+关注
96
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载