使用工具 idea
一个项目有很多功能 它可以包含很多方面 为此 我们首先要做的就是分析需求根据需求来创建项目 但是发现一个项目中含有的东西如果放在一起的话就会变得十分乱
哪么 对于这个项目来讲 就需要将它按功能划分成模块 比如一个大数据项目 有web 部分 数据部分 整个流程 就是 从web或者关系型数据库得到数据 然后经过分析处理 然后变成可视化图形数据 那么 采集数据就需要分为一个模块 在这个模块 包括例如sqoop的连接 创建 启动 过程 也包含数据的导入导出过程 web部分展示数据也含有将数据记录到关系型数据库中
值得注意的是 再maven项目结构上 需要在总项目的pom中
<packaging>pom</packaging>
项目结构:
这个类创建了两个方法 分别是连接sqoop连接hdfs 和 sqoop 连接PostgreSql数据库
package com.zhiyou100.sqoop.link
import org.apache.sqoop.client.SqoopClient
import org.apache.sqoop.model.{MConfig, MInput, MLink}
object LinkCreator {
val url="http://master:12000/sqoop/"
val client =new SqoopClient(url)
//创建hdfslink
//创建postgresql link link
def createHdfsLink()={
val hdfsLink =client.createLink("hdfs-connector")
val linkConfig =hdfsLink.getConnectorLinkConfig()
val configs:java.util.List[MConfig] =linkConfig.getConfigs()
printLinkConfigruation(configs)
// for(i<-0 until configs.size()){
// val inputs:java.util.List[MInput[_]]=configs.get(i).getInputs
// for(j<-0 until inputs.size()){
// val input =inputs.get(j)
// println(input)
// }
// }
linkConfig.getStringInput("linkConfig.uri").setValue("hdfs://master:9000")
linkConfig.getStringInput("linkConfig.confDir").setValue("/opt/SoftWare/Hadoop/hadoop-2.7.3/etc/hadoop")
hdfsLink.setName("btrip_hdfs")
val status =client.saveLink(hdfsLink)
getStatus(status.canProceed)
//
// if(status.canProceed){
// println("hdfs_link 创建成功")
// }else{
// println("hdfs_link 常见失败")
// }
// for(config:MConfig<-configs) {
// val inputs :java.util.List[MInput[_]]=config.getInputs()
// for(input:)
// println()
// }
}
def createPostgresqlLink()={
val pglink =client.createLink("generic-jdbc-connector")
val linkConfig =pglink.getConnectorLinkConfig()
printLinkConfigruation(linkConfig.getConfigs)
linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("org.postgresql.Driver")
linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:postgresql://192.168.6.168:5432/WscHMS")
linkConfig.getStringInput("linkConfig.username").setValue("postgres")
linkConfig.getStringInput("linkConfig.password").setValue("123456")
linkConfig.getStringInput("dialect.identifierEnclose").setValue("5")
pglink.setName("link_postgresql")
val status =client.saveLink(pglink)
getStatus(status.canProceed)
}
def printLinkConfigruation(configs:java.util.List[MConfig]) ={
for(i<-0 until configs.size()){
val inputs:java.util.List[MInput[_]]=configs.get(i).getInputs
for(j<-0 until inputs.size()){
val input =inputs.get(j)
println(input)
}
}
}
def getStatus(statuscanProceed:Boolean)={
if(statuscanProceed){
println("hdfs_link 创建成功")
}else{
println("hdfs_link 常见失败")
}
}
def main(args: Array[String]): Unit = {
// createHdfsLink()
createPostgresqlLink()
}
}
创建job
def createJob()={
val sql=
"""
|select *
| from ***
| ${CONDITIONS}
|
""".stripMargin
val job=client.createJob("link_postgresql","btrip_hdfs")
job.setName("postgreSql")
val fromConfig =job.getFromJobConfig()
val toConfig =job.getToJobConfig()
showFromJobConfig(fromConfig)
showToJobConfig(toConfig)
fromConfig.getStringInput("fromJobConfig.schemaName").setValue("wsc")
fromConfig.getStringInput("fromJobConfig.tableName").setValue("tb_company")
fromConfig.getStringInput("fromJobConfig.sql").setValue(sql)
fromConfig.getStringInput("fromJobConfig.boundaryQuery").setValue("false")
fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue("company_id")
// fromConfig.getStringInput("dialect.identifierEnclose").setValue("")
toConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE")
toConfig.getEnumInput("toJobConfig.compression").setValue("NONE")
toConfig.getStringInput("toJobConfig.outputDirectory").setValue("/sqoop/btrip_pg")
toConfig.getBooleanInput("toJobConfig.appendMode").setValue(true)
// job.setName("btrip_company")
val status =client.saveJob(job)
if(status.canProceed){
println("job 创建成功")
}else{
println("job 创建失败")
}
}