我们知道StreamingPro 是一个完全SQL/Script化的,基于Spark平台的一套生产力工具。但是不可避免的,我们可能希望直接操作SqlContext或者使用原生的DataFrame API。 这里我们通过script 让大家支持这个功能:
在这个模块里,你可以访问任何一张已经注册的表。并且经过处理后注册一张新的表。给了大家无线的灵活性。{ "name": "batch.script.df", "params": [ { "script": "context.sql(\"select a as t from test\").registerTempTable(\"finalOutputTable\")", "source": "-" } ] }
AI 代码解读
如果source 设置为file,script 填写的是文件路径的话,那么就不用在json文件里写脚本了。
这里给大家一个完整的例子:
{ "batch-console": { "desc": "测试", "strategy": "spark", "algorithm": [], "ref": [], "compositor": [ { "name": "batch.sources", "params": [ { "path": "file:///tmp/hdfsfile/abc.txt", "format": "json", "outputTable": "test" } ] }, { "name": "batch.script.df", "params": [ { "script": "context.sql(\"select a as t from test\").registerTempTable(\"finalOutputTable\")", "source": "-" } ] }, { "name": "batch.outputs", "params": [ { "name":"jack", "format": "console", "path": "-", "inputTableName": "finalOutputTable", "mode":"Overwrite" } ] } ], "configParams": { } } }
AI 代码解读
另外,对于输出,我们也可以控制文件数目:
其中 outputFileNum 就是你最后的输出文件数。你也可以通过batch.script.df 模块控制输出的文件数。无非就是df.repartion(3).registerTable("finalOutputTable") 即可达成。{ "name": "batch.outputs", "params": [ { "name": "jack", "format": "json", "path": "file:///tmp/batch-console", "outputFileNum": "3", "inputTableName": "finalOutputTable", "mode": "Overwrite" } ] }
AI 代码解读
下载地址: StreamingPro