[@项籍][¥20]大数据怎么与spring结合-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

[@项籍][¥20]大数据怎么与spring结合

雨辰-梦 2018-11-15 19:25:16 1262

怎么在大数据编程里面结合spring容器的功能?
比如数据库链接,sparksql等等怎么进行整合?

Java 大数据 数据库 Spring 容器
分享到
取消 提交回答
全部回答(1)
  • hiekay
    2019-07-17 23:14:49

    第一步:MAVEN配置


       
    org.apache.spark
    spark-core_2.11
    1.6.0


    org.apache.spark
    spark-mllib_2.11
    1.6.0


      org.apache.spark
      spark-sql_2.11
       1.6.0


    org.scala-lang
    scala-library
    2.11.0


    org.scala-lang
    scala-compiler
    2.11.0


    org.scala-lang
    scala-reflect
    2.11.0

    第二步:Spring配置


               
                         


               


               


    class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

     
           
             classpath:jdbc.properties  
             classpath:spark.properties  
             
         
        

    第三步:新增属性文件  spark.properties

    spark.master=local
    spark.url=jdbc:mysql://192.168.0.202:3306/spark?useUnicode=true&characterEncoding=UTF-8
    spark.table=testtable
    spark.username=root
    spark.password=mysql 

    第四步:写代码

    /**
     * 
     */
    package com.harleycorp.service.impl;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;

    import javax.annotation.Resource;

    import org.apache.log4j.Logger;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SaveMode;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;

    import com.harleycorp.pojo.SparkUser;
    import com.harleycorp.service.ISparkUpperService;

    /**
     * @author kevin
     *
     */
    @Service
    public class SparkUpperServiceImpl implements ISparkUpperService {

    private Logger logger =Logger.getLogger(SparkUpperServiceImpl.class);

    @Value("${spark.master}")
    public String master ; // = "local"

    @Value("${spark.url}")
    public String url ;//= "jdbc:mysql://192.168.0.202:3306/spark?useUnicode=true&characterEncoding=UTF-8";

    @Value("${spark.table}")
    public String table ; //= "testtable"

    @Value("${spark.username}")
    public String username ;// = "root";

    //@Value("${spark.password}")
    public String password = "mysql";

    @Resource
    public SQLContext sqlContext;

    @Resource
    public JavaSparkContext sc;

    public Properties getConnectionProperties(){
    Properties connectionProperties = new Properties();
    connectionProperties.setProperty("dbtable",table);
    connectionProperties.setProperty("user",username);//数据库用户
    connectionProperties.setProperty("password",password); //数据库用户密码
    return connectionProperties;
    }

    public String query() {
    logger.info("=======================this url:"+this.url);
    logger.info("=======================this table:"+this.table);
    logger.info("=======================this master:"+this.master);
    logger.info("=======================this username:"+this.username);
    logger.info("=======================this password:"+this.password);

    DataFrame df = null;
    //以下数据库连接内容请使用实际配置地址代替
    df = sqlContext.read().jdbc(url,table, getConnectionProperties());
    df.registerTempTable(table);
    String result = sqlContext.sql("select * from testtable").javaRDD().collect().toString();
    logger.info("=====================spark mysql:"+result);
    return result;
    }

    public String queryByCon(){
    logger.info("=======================this url:"+this.url);
    logger.info("=======================this table:"+this.table);
    logger.info("=======================this master:"+this.master);
    logger.info("=======================this username:"+this.username);
    logger.info("=======================this password:"+this.password);

    DataFrame df = sqlContext.read().jdbc(url, table, new String[]{"password=000000"}, getConnectionProperties());
    String result = df.collectAsList().toString();
    logger.info("=====================spark mysql:"+result);
    return null;
    }

    public void add(){
    List list = new ArrayList();
    SparkUser us = new SparkUser();
    us.setUsername("kevin");
    us.setPassword("000000");
    list.add(us);
    SparkUser us2 = new SparkUser();
    us2.setUsername("Lisa");
    us2.setPassword("666666");
    list.add(us2);

    JavaRDD personsRDD = sc.parallelize(list);
    DataFrame userDf = sqlContext.createDataFrame(personsRDD, SparkUser.class);
    userDf.write().mode(SaveMode.Append).jdbc(url, table, getConnectionProperties());
    }

    }

    第五步:junit调用

    package com.harleycorp.testmybatis;

    import javax.annotation.Resource;

    import org.apache.log4j.Logger;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

    import com.harleycorp.service.ISparkUpperService;

    @RunWith(SpringJUnit4ClassRunner.class) //表示继承了SpringJUnit4ClassRunner类
    @ContextConfiguration(locations = {"classpath:spring-mybatis.xml"})

    public class TestSpark {

    private static Logger logger=Logger.getLogger(TestSpark.class);
    @Resource 
    private  ISparkUpperService sparkUpperService = null;

    @Test
    public void test1(){
    sparkUpperService.query();
    }

    @Test
    public void test2(){  
    sparkUpperService.add();   
    }

    @Test
    public void test3(){
    sparkUpperService.queryByCon();
    }
    }

    第六步:运行

    0 0
开发与运维
使用钉钉扫一扫加入圈子
+ 订阅

集结各类场景实战经验,助你开发运维畅行无忧

推荐文章
相似问题
推荐课程