实验环境: linux centOS 6.7 vmware虚拟机

spark-1.5.1-bin-hadoop-2.1.0

apache-hive-1.2.1

eclipse 或IntelJIDea 本次使用eclipse.

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import  org.apache.spark.SparkConf;
import  org.apache.spark.api.java.JavaSparkContext;
import  org.apache.spark.sql.DataFrame;
import  org.apache.spark.sql.hive.HiveContext;
public  class  SparkOnHiveDemo {
public  static  void  main(String[] args) {
         
         // 首先还是创建SparkConf
         SparkConf conf =  new  SparkConf().setAppName( "HiveDataSource" );
         // 创建JavaSparkContext
         JavaSparkContext sc =  new  JavaSparkContext(conf);
         // 创建HiveContext,注意,这里,它接收的是SparkContext作为参数,不是JavaSparkContext
         HiveContext hiveContext =  new  HiveContext(sc.sc());
         
         //1.可以使用HiveContext 下面的sql(xxx语句)执行HiveSQL语句
         //1 .删除表,创建表
         // stars_infos ,stars_scores
         hiveContext.sql( "DROP TABLE IF EXISTS stars_infos" );
         hiveContext.sql( "CREATE TABLE IF NOT EXISTS stars_infos(name STRING,age INT) "
                 "row format delimited fields terminated by ','" );
         
         //2.向表里面导入数据
         hiveContext.sql( "LOAD DATA "
                 "LOCAL INPATH "
                 "'/root/book/stars_infos.txt' "
                 "INTO TABLE stars_infos" );
         
         hiveContext.sql( "DROP TABLE IF EXISTS stars_scores" );
         hiveContext.sql( "CREATE TABLE IF NOT EXISTS stars_scores(name STRING,score INT) "
                 "row format delimited fields terminated by ','" );
 
         hiveContext.sql( "LOAD DATA "
                 "LOCAL INPATH "
                 "'/root/book/stars_score.txt' "
                 "INTO TABLE stars_scores" );
         
         
         //3.从一张已经存在的hive表里面拿数据,转换为DF
         DataFrame superStarDataFrame = hiveContext.sql( "SELECT si.name,si.age,ss.score "
                 "FROM stars_infos si "
                 "JOIN stars_scores ss ON si.name=ss.name "
                 "WHERE ss.score>=90" );
             
         //4.把DF的数据再持久化到hive中去,千万别和registerTemtable搞混了
         hiveContext.sql( "DROP TABLE IF EXISTS superStar" );
         superStarDataFrame.saveAsTable( "superStar" );
         
         //5.直接从Hive中得到DF
         hiveContext.table( "superStar" ).show();
         
         sc.close();
     }
}

 元数据:

可以下载附件,然后上传到指定的目录下。

 把程序打包jar后上传到linux指定的目录下,写一个脚本。脚本附件见正文。具体内容修改即可。


 运行脚本就可以了。当然要保证MySQL数据库正常,hive正常。