开发者社区> cloudcoder> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

spark sql简单示例

简介:
+关注继续查看

运行环境

集群环境:CDH5.3.0

具体JAR版本如下:

spark版本:1.2.0-cdh5.3.0

hive版本:0.13.1-cdh5.3.0

hadoop版本:2.5.0-cdh5.3.0

spark sql的JAVA版简单示例

  1. spark sql直接查询JSON格式的数据

  2. spark sql的自定义函数

  3. spark sql查询hive上面的表

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.DataType;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.hive.api.java.JavaHiveContext;


/**
 * 注意:
 * 使用JavaHiveContext时
 * 1:需要在classpath下面增加三个配置文件:hive-site.xml,core-site.xml,hdfs-site.xml
 * 2:需要增加postgresql或mysql驱动包的依赖
 * 3:需要增加hive-jdbc,hive-exec的依赖
 *
 */
public class SimpleDemo {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("simpledemo").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaSQLContext sqlCtx = new JavaSQLContext(sc);
        JavaHiveContext hiveCtx = new JavaHiveContext(sc);
//        testQueryJson(sqlCtx);
//        testUDF(sc, sqlCtx);
        testHive(hiveCtx);
        sc.stop();
        sc.close();
    }

    //测试spark sql直接查询JSON格式的数据
    public static void testQueryJson(JavaSQLContext sqlCtx) {
        JavaSchemaRDD rdd = sqlCtx.jsonFile("file:///D:/tmp/tmp/json.txt");
        rdd.printSchema();

        // Register the input schema RDD
        rdd.registerTempTable("account");

        JavaSchemaRDD accs = sqlCtx.sql("SELECT address, email,id,name FROM account ORDER BY id LIMIT 10");
        List<Rowresult = accs.collect();
        for (Row row : result) {
            System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getInt(2) + ","
                    + row.getString(3));
        }

        JavaRDD<String> names = accs.map(new Function<RowString>() {
            @Override
            public String call(Row row) throws Exception {
                return row.getString(3);
            }
        });
        System.out.println(names.collect());
    }


    //测试spark sql的自定义函数
    public static void testUDF(JavaSparkContext sc, JavaSQLContext sqlCtx) {
        // Create a account and turn it into a Schema RDD
        ArrayList<AccountBean> accList = new ArrayList<AccountBean>();
        accList.add(new AccountBean(1"lily""lily@163.com""gz tianhe"));
        JavaRDD<AccountBean> accRDD = sc.parallelize(accList);

        JavaSchemaRDD rdd = sqlCtx.applySchema(accRDD, AccountBean.class);

        rdd.registerTempTable("acc");

        // 编写自定义函数UDF
        sqlCtx.registerFunction("strlength", new UDF1<StringInteger>() {
            @Override
            public Integer call(String str) throws Exception {
                return str.length();
            }
        }, DataType.IntegerType);

        // 数据查询
        List<Rowresult = sqlCtx.sql("SELECT strlength('name'),name,address FROM acc LIMIT 10").collect();
        for (Row row : result) {
            System.out.println(row.getInt(0) + "," + row.getString(1) + "," + row.getString(2));
        }
    }

    //测试spark sql查询hive上面的表
    public static void testHive(JavaHiveContext hiveCtx) {
        List<Rowresult = hiveCtx.sql("SELECT foo,bar,name from pokes2 limit 10").collect();
        for (Row row : result) {
            System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getString(2));
        }
    }
}

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
sqlServer存储过程
1、创建存储过程报错:     'CREATE/ALTER PROCEDURE' 必须是查询批次中的第一个语句。 解决方法: use databaseName 后面要加上一句: GO ...
835 0
SQL Server基础之<存储过程>
原文:SQL Server基础之   简单来说,存储过程就是一条或者多条sql语句的集合,可视为批处理文件,但是其作用不仅限于批处理。本篇主要介绍变量的使用,存储过程和存储函数的创建,调用,查看,修改以及删除操作。
1487 0
SQL Server 死锁检查
原文:SQL Server 死锁检查 示例代码: select spid, blocked, status, hostname, program_name, hostprocess, cmd from sysprocesses where dbid = db_id('xxx') and b...
477 0
SQL Server 已提交读快照 测试
原文:SQL Server 已提交读快照 测试 1. 打开数据库 已提交读快照 选项   2. 数据库 已提交读快照 模式下的测试   a) 测试表 Test   b) 开启事务1,更新数据C2 = '200'(未提交) BEGIN TRAN UPDATE Test...
925 0
SQLSERVER存储过程语法详解
SQL SERVER存储过程语法: Create PROC [ EDURE ] procedure_name [ ; number ]     [ { @parameter data_type }         [ VARYING ] [ = default ] [ OUTPUT ]     ] [ ,...n ]   [ WITH     { RECOMPILE | ENCRY
1667 0
+关注
cloudcoder
热衷于大数据处理技术研究、使用 关注中间件技术
文章
问答
文章排行榜
最热
最新
相关电子书
更多
Comparison of Spark SQL with Hive
立即下载
spark替代HIVE实现ETL作业
立即下载
SQL Server 2017
立即下载