在Spark中,这个json是在数据帧(DF)中,现在我们必须导航到表(在基于cust的json中),我们必须读取第一个表块并且必须准备sql查询。例如: SELECT CUST_NAME FROM CUST WHERE CUST_ID =112
我们必须在Database&store中执行此查询,结果是json文件。
{
"cust": "Retails",
"tables": [
{
"Name":"customer",
"table_NAME":"cust",
"param1":"cust_id",
"val":"112",
"op":"cust_name"
},
{
"Name":"sales",
"table_NAME":"sale",
"param1":"country",
"val":"ind",
"op":"monthly_sale"
}]
}
root |-- cust: string (nullable = true)
|-- tables: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Name: string (nullable = true)
| | |-- op: string (nullable = true)
| | |-- param1: string (nullable = true)
| | |-- table_NAME: string (nullable = true)
| | |-- val: string (nullable = true)
第二块表格相同。例如: SELECT MONTHLY_SALE FROM SALE WHERE COUNTRY = 'IND'
必须在DB中执行此查询并且必须将此结果存储在上面的json文件中。
这样做的最佳方法是什么?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
这是我实现这一目标的方式。对于这整个解决方案,我使用了spark-shell。这些是一些先决条件:
从json-serde下载这个jar
将zip文件解压缩到任何位置
现在使用此命令运行spark-shell
spark-shell --jars path/to/jars/json-serde-cdh5-shim-1.3.7.3.jar,path/to/jars/json-serde-1.3.7.3.jar,path/to/jars/json-1.3.7.3.jar
你的Json文件:
{
"cust": "Retails",
"tables": [
{
"Name":"customer",
"table_NAME":"cust",
"param1":"cust_id",
"val":"112",
"op":"cust_name"
},
{
"Name":"sales",
"table_NAME":"sale",
"param1":"country",
"val":"ind",
"op":"monthly_sale"
}]
}
折叠版:
{"cust": "Retails","tables":[{"Name":"customer","table_NAME":"cust","param1":"cust_id","val":"112","op":"cust_name"},{"Name":"sales","table_NAME":"sale","param1":"country","val":"ind","op":"monthly_sale"}]}
我把这个json放在这个/tmp/sample.json中
现在去spark-sql部分:
基于json模式创建表
sql("CREATE TABLE json_table(cust string,tables array>) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'")
现在将json数据加载到表中
sql("LOAD DATA LOCAL INPATH '/tmp/sample.json' OVERWRITE INTO TABLE json_table")
现在我将使用蜂巢侧视图概念横向视图
val ans=sql("SELECT myCol FROM json_table LATERAL VIEW explode(tables) myTable as myCol").collect
返回结果的架构:
ans.printSchema
root
|-- table: struct (nullable = true)
| |-- Name: string (nullable = true)
| |-- table_NAME: string (nullable = true)
| |-- param1: string (nullable = true)
| |-- val: string (nullable = true)
| |-- op: string (nullable = true)
ans.show的结果
ans.show
+--------------------+
| table|
+--------------------+
|[customer,cust,cu...|
|[sales,sale,count...|
+--------------------+
现在我假设可以有两种类型的数据,例如cust_id是Number类型,country是String类型。我正在添加一种方法来根据它的值来识别数据类型。例如
def isAllDigits(x: String) = x forall Character.isDigit
注意:您可以使用自己的方式识别此信息
7.现在基于json数据创建查询
ans.foreach(f=>{
val splitted_string=f.toString.split(",")
val op=splitted_string(4).substring(0,splitted_string(4).size-2)
val table_NAME=splitted_string(1)
val param1 = splitted_string(2)
val value = splitted_string(3)
if(isAllDigits(value)){
println("SELECT " +op+" FROM "+ table_NAME+" WHERE "+param1+"="+value)
}else{
println("SELECT " +op+" FROM "+ table_NAME+" WHERE "+param1+"='"+value+"'")
}
})
这是我得到的结果:
SELECT cust_name FROM cust WHERE cust_id=112
SELECT monthly_sale FROM sale WHERE country='ind'