SparkSQL是spark家族中一个结构化或半结构化数据的处理模块。对SQL的处理跟关系型数据库SQL类似,将SQL解析成一棵树,通过规则的模式匹配,对树进行绑定、优化,得到查询结果。
SparkSQL提供了一种特殊的RDD-DataFrame,相当于关系型数据的一个表,在Java API中,由行(row)组成的数据集(DataSet)表示为一个DataFrame。
用户程序在执行过程中,下图表示了从SQL语句到DataFrame的整个执行过程。
SparkSession
在spark1.x时代,SparkSQL的入口都是通过SQLContext 或者HiveContext完成,从1.6以后,引入了SparkSession概念,替代了SQLContext,实现对数据的加载、转换、处理等工作。
可以通过SparkSession.builder来创建一个SparkSession,也可以通过stop停止
SparkSQL与MongoDB的集成
Mongo对Spark的支持可参见Mongo官方文档MongoDB
public static void main(String[] args) throws AnalysisException { logger.info("开始执行告警统计spark任务"); SparkSession spark = SparkSession.builder() .master("local") .appName("alarmService") .config("spark.mongodb.input.uri", MONGODB_INPUT_URL) .config("spark.mongodb.output.uri", MONGODB_OUTPUT_URL) .getOrCreate(); Dataset ds = MongoSpark.load(spark, EmAlarmBean.class); ds.registerTempTable("test"); ds = spark.sql(getSql()); MongoSpark.save(ds); } private static String getSql() { String sql = "select orgId,from_unixtime(createTimestamp, 'yyyyMMdd') AS statisticDate, " + "sum(if(levelDictId='4001',1,0)) level1," + "sum(if(levelDictId='4002',1,0)) level2," + "sum(if(levelDictId='4003',1,0)) level3" + " from test " + " where (deviceType=0 or deviceType=2 or deviceType=3) and createTimestamp is not null and orgId is not null " + " group by from_unixtime(createTimestamp, 'yyyyMMdd'),orgId"; return sql; }