怎么在大数据编程里面结合spring容器的功能?
比如数据库链接,sparksql等等怎么进行整合?
第一步:MAVEN配置
 Â
org.apache.spark
spark-core_2.11
1.6.0
org.apache.spark
spark-mllib_2.11
1.6.0
 org.apache.spark
 spark-sql_2.11
  1.6.0
org.scala-lang
scala-library
2.11.0
org.scala-lang
scala-compiler
2.11.0
org.scala-lang
scala-reflect
2.11.0
第二步:Spring配置
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
classpath:jdbc.properties
classpath:spark.properties
第三步:新增属性文件 spark.properties
spark.master=local
spark.url=jdbc:mysql://192.168.0.202:3306/spark?useUnicode=true&characterEncoding=UTF-8
spark.table=testtable
spark.username=root
spark.password=mysql
第四步:写代码
/**
*
*/
package com.harleycorp.service.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import javax.annotation.Resource;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.harleycorp.pojo.SparkUser;
import com.harleycorp.service.ISparkUpperService;
/**
* @author kevin
*
*/
@Service
public class SparkUpperServiceImpl implements ISparkUpperService {
private Logger logger =Logger.getLogger(SparkUpperServiceImpl.class);
@Value("${spark.master}")
public String master ; // = "local"
@Value("${spark.url}")
public String url ;//= "jdbc:mysql://192.168.0.202:3306/spark?useUnicode=true&characterEncoding=UTF-8";
@Value("${spark.table}")
public String table ; //= "testtable"
@Value("${spark.username}")
public String username ;// = "root";
//@Value("${spark.password}")
public String password = "mysql";
@Resource
public SQLContext sqlContext;
@Resource
public JavaSparkContext sc;
public Properties getConnectionProperties(){
Properties connectionProperties = new Properties();
connectionProperties.setProperty("dbtable",table);
connectionProperties.setProperty("user",username);//数据库用户
connectionProperties.setProperty("password",password); //数据库用户密码
return connectionProperties;
}
public String query() {
logger.info("=======================this url:"+this.url);
logger.info("=======================this table:"+this.table);
logger.info("=======================this master:"+this.master);
logger.info("=======================this username:"+this.username);
logger.info("=======================this password:"+this.password);
DataFrame df = null;
//以下数据库连接内容请使用实际配置地址代替
df = sqlContext.read().jdbc(url,table, getConnectionProperties());
df.registerTempTable(table);
String result = sqlContext.sql("select * from testtable").javaRDD().collect().toString();
logger.info("=====================spark mysql:"+result);
return result;
}
public String queryByCon(){
logger.info("=======================this url:"+this.url);
logger.info("=======================this table:"+this.table);
logger.info("=======================this master:"+this.master);
logger.info("=======================this username:"+this.username);
logger.info("=======================this password:"+this.password);
DataFrame df = sqlContext.read().jdbc(url, table, new String[]{"password=000000"}, getConnectionProperties());
String result = df.collectAsList().toString();
logger.info("=====================spark mysql:"+result);
return null;
}
public void add(){
List list = new ArrayList();
SparkUser us = new SparkUser();
us.setUsername("kevin");
us.setPassword("000000");
list.add(us);
SparkUser us2 = new SparkUser();
us2.setUsername("Lisa");
us2.setPassword("666666");
list.add(us2);
JavaRDD personsRDD = sc.parallelize(list);
DataFrame userDf = sqlContext.createDataFrame(personsRDD, SparkUser.class);
userDf.write().mode(SaveMode.Append).jdbc(url, table, getConnectionProperties());
}
}
第五步:junit调用
package com.harleycorp.testmybatis;
import javax.annotation.Resource;
import org.apache.log4j.Logger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.harleycorp.service.ISparkUpperService;
@RunWith(SpringJUnit4ClassRunner.class) //表示继承了SpringJUnit4ClassRunner类
@ContextConfiguration(locations = {"classpath:spring-mybatis.xml"})
public class TestSpark {
private static Logger logger=Logger.getLogger(TestSpark.class);
@Resource
private ISparkUpperService sparkUpperService = null;
@Test
public void test1(){
sparkUpperService.query();
}
@Test
public void test2(){
sparkUpperService.add();
}
@Test
public void test3(){
sparkUpperService.queryByCon();
}
}
第六步:运行
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。