1.Linux部署hudi环境
(1)安装maven-3.5.4、jdk1.8环境
# 解压maven,重命名 tar -xf apache-maven-3.5.4-bin.tar.gz -C /usr/local/ mv apache-maven-3.5.4 maven # 解压jdk,重命名 tar -xf jdk-8u212-linux-x64.tar.gz -C /usr/local/ mv jdk1.8.0_212 jdk # 配置环境变量 vi /etc/profile # 添加如下配置: # JAVA HOME JAVA_HOME=/usr/local/jdk export JAVA_HOME CLASSPATH=.:$JAVA_HOME/lib export CLASSPATH PATH=$PATH:$JAVA_HOME/bin:$CLASSPATH export PATH # MAVEN HOME MAVEN_HOME=/usr/local/maven export MAVEN_HOME PATH=$PATH:$MAVEN_HOME/bin export PATH # 刷新配置 source /etc/profile
# 验证环境配置 java -version mvn -version
(2)下载Hudi源码包
wget https://archive.apache.org/dist/hudi/0.9.0/hudi-0.9.0.src.tgz
(3)配置Maven镜像,在maven包下conf目录下setting.xml文件
<mirror> <id>alimaven</id> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <mirrorOf>central</mirrorOf> </mirror> <mirror> <id>aliyunmaven</id> <mirrorOf>*</mirrorOf> <name>阿里云spring插件仓库</name> <url>https://maven.aliyun.com/repository/spring-plugin</url> </mirror> <mirror> <id>repo2</id> <name>Mirror from Maven Repo2</name> <url>https://repo.spring.io/plugins-release/</url> <mirrorOf>central</mirrorOf> </mirror> <mirror> <id>UK</id> <name>UK Central</name> <url>http://uk.maven.org/maven2</url> <mirrorOf>central</mirrorOf> </mirror> <mirror> <id>jboss-public-repository-group</id> <name>JBoss Public Repository Group</name> <url>http://repository.jboss.org/nexus/content/groups/public</url> <mirrorOf>central</mirrorOf> </mirror> <mirror> <id>CN</id> <name>OSChina Central</name> <url>http://maven.oschina.net/content/groups/public/</url> <mirrorOf>central</mirrorOf> </mirror> <mirror> <id>google-maven-central</id> <name>GCS Maven Central mirror Asia Pacific</name> <url>https://maven-central-asia.storage-download.googleapis.com/maven2/</url> <mirrorOf>central</mirrorOf> </mirror> <mirror> <id>confluent</id> <name>confluent maven</name> <url>http://packages.confluent.io/maven/</url> <mirrorOf>confluent</mirrorOf> </mirror>
(4)编译hudi源码包
# 将下载好的hudi解压 tar -xf hudi-0.9.0.src.tgz -C /usr/local/ cd /usr/local/hudi-0.9.0 # 执行命令 mvn clean install -DskipTests -DskipITs -Dscala-2.12 -Dspark3
(5)编译成功后,进入hudi-cli,执行./hudi-cli.sh目录测试
./hudi-cli.sh
(6)安装HDFS
# 解压hadoop安装包 tar -zxf hadoop-2.7.3.tar.gz -C /usr/local/ cd /usr/local/ # 创建软连接 ln -s hadoop-2.7.3 hadoop # 配置环境变量 vi /etc/profile # HADOOP HOME export HADOOP_HOME=/usr/local/hadoop export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME export HADOOP_YARN_HOME=$HADOOP_HOME export HADOOP_MAPRED_HOME=$HADOOP_HOME export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin source /etc/profile # 在Hadoop环境变量脚本配置JDK和HADOOP安装目录 vi /usr/local/hadoop/etc/hadoop/hadoop-env.sh # 添加如下内容 export JAVA_HOME=/usr/local/jdk export HADOOP_HOME=/usr/local/hadoop # 配置Hadoop Common模块公共属性,编辑core-site.xml文件 <property> <name>fs.defaultFS</name> <!-- 以自己的ip地址为准 --> <value>hdfs://192.168.139.100:8020</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/hadoop/datas</value> </property> <property> <name>hadoop.http.staticuser.user</name> <value>root</value> </property> # 配置HDFS分布式文件系统相关属性,hdfs-site.xml <property> <name>dfs.namenode.name.dir</name> <value>/hadoop/datas/dfs/nn</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>/hadoop/datas/dfs/dn</value> </property> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.permissions.enabled</name> <value>false</value> </property> <property> <name>dfs.datanode.data.dir.perm</name> <value>750</value> </property>
# 创建HDFS所需的目录 mkdir -p /hadoop/datas/dfs/nn mkdir -p /hadoop/datas/dfs/dn mkdir -p /hadoop/datas
# 配置HDFS集群中从节点DataNode所运行机器 vi /usr/local/hadoop/etc/hadoop/workers # 增加配置: 192.168.139.100
# 格式化HDFS hdfs namenode -format # 启动HDFS集群 hadoop-daemon.sh start namenode hadoop-daemon.sh start datanode # 访问HDFS UI http://192.168.139.100:50070/
(7)安装Spark 3.x
# 解压软件包 tar -zxf /usr/local/software/spark-3.0.0-bin-hadoop2.7.tgz -C /usr/local/ cd /usr/local/ # 创建软链接 ln -s /usr/local/spark-3.0.0-bin-hadoop2.7 /usr/local/spark # 安装scala tar -zxf /usr/local/softwares/scala-2.12.10.tgz -C /usr/local/ ln -s /usr/local/scala-2.12.10 /usr/local/scala # 设置环境变量 vi /etc/profile # SCALA_HOME export SCALA_HOME=/usr/local/scala export PATH=$PATH:$SCALA_HOME/bin source /etc/profile
# 修改配置spark名称 cd /usr/local/spark/conf # 修改配置文件名称 cp -p spark-env.sh.template spark-env.sh.template.bak mv spark-env.sh.template spark-env.sh # 编辑文件 vi spark-env.sh # 修改配置文件内容 JAVA_HOME=/usr/local/jdk SCALA_HOME=/usr/local/scala HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
# 本地模式启动spark-shell cd /usr/local/spark bin/spark-shell --master local[2]
2.java整合hudi
(1)创建maven工程添加依赖
<dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-java-client</artifactId> <version>0.11.1</version> </dependency> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-examples</artifactId> <version>0.11.1</version> </dependency> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-examples-common</artifactId> <version>0.11.1</version> </dependency> <!--JSON--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.10.1</version> </dependency>
(2)封装HoodieClient类,提供对Hoodie的增删改
/** * @description hudi-client增删改查 * @author lixiang */ public class HoodieClient { private HoodieJavaWriteClient<HoodieAvroPayload> client; private String tableFormat; /** * HDFS 路径 */ private final static String DEFAULT_HDFS_PATH = "hdfs://192.168.139.100:8020"; /** * 默认HDFS 存放的路径 */ private final static String DEFAULT_HDFS_DIR = "usr/hudi/warehouse"; // ==============================构造方法开始============================== public HoodieClient(String hdfsPath, String hdfsDir, String tableName, String tableFormat,HoodieTableType tableType) { this.tableFormat = tableFormat; initHuDiClient(hdfsPath,hdfsDir,tableName,tableFormat,tableType); } //指定tableName、tableFormat和表类型,指定hdfs路径 public HoodieClient(String hdfsDir, String tableName, String tableFormat,HoodieTableType tableType) { this(DEFAULT_HDFS_PATH,hdfsDir,tableName,tableFormat,tableType); } //指定tableName、tableFormat和指定hdfs路径,COPY_ON_WRITE类型表 public HoodieClient(String hdfsDir, String tableName, String tableFormat) { this(DEFAULT_HDFS_PATH,hdfsDir,tableName,tableFormat,HoodieTableType.COPY_ON_WRITE); } //指定tableName、tableFormat和表类型,默认hdfs路径 public HoodieClient(String tableName, String tableFormat,HoodieTableType tableType) { this(DEFAULT_HDFS_PATH,DEFAULT_HDFS_DIR,tableName,tableFormat,tableType); } //指定tableName和tableFormat,默认hdfs路径,COPY_ON_WRITE类型表 public HoodieClient(String tableName, String tableFormat) { this(DEFAULT_HDFS_PATH,DEFAULT_HDFS_DIR,tableName,tableFormat,HoodieTableType.COPY_ON_WRITE); } // ==============================构造方法结束============================== /** * 初始化HoodieJavaWriteClient * @param hdfsPath * @param hdfsDir * @param tableName * @param tableFormat * @param tableType */ private void initHuDiClient(String hdfsPath,String hdfsDir, String tableName, String tableFormat,HoodieTableType tableType){ // 初始化Hoodie表 String tablePath = hdfsPath+"/"+hdfsDir+"/"+tableName; // 创建HDFS路径 Configuration hadoopConf = new Configuration(); Path path = new Path(tablePath); FileSystem fileSystem = FSUtils.getFs(tablePath, hadoopConf); try { // 检查路径是否存在 if (!fileSystem.exists(path)) { // 初始化Hoodie Table 创建Hoodie表的tablePath,写入初始化元数据信息 HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType.name()) .setTableName(tableName) .setPayloadClassName(HoodieAvroPayload.class.getName()) .initTable(hadoopConf, tablePath); } } catch (IOException e) { throw new RuntimeException("初始化表Hoodie表异常,"+tableName); } // 创建write client conf HoodieWriteConfig huDiWriteConf = HoodieWriteConfig.newBuilder() // 数据schema .withSchema(tableFormat) // 数据插入更新并行度 .withParallelism(2, 2) // 数据删除并行度 .withDeleteParallelism(2) // HuDi表索引类型,BLOOM .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) // 合并 .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()) //.withEmbeddedTimelineServerEnabled(false) .withPath(tablePath) .forTable(tableName) .build(); /*huDiWriteConf.getProps().setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),"table_name"); huDiWriteConf.getProps().setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),"uuid");*/ // 获得HuDi write client this.client = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), huDiWriteConf); } /** * 单条插入Hoodie数据 * @param jsonObject */ public void upsertOne(JSONObject insertObject){ upsert(Arrays.asList(insertObject)); } /** * 批量插入Hoodie数据 * @param jsonObject */ public void upsertBatch(List<JSONObject> insertObjects){ upsert(insertObjects); } public void deleteOne(String primaryKey,String tableName){ delete(Arrays.asList(primaryKey),tableName); } public void deleteBatch(List<String> primaryKeys,String tableName){ delete(primaryKeys,tableName); } /** * 删除逻辑 * @param primaryKeys * @param tableName */ private void delete(List<String> primaryKeys,String tableName){ String newCommitTime = client.startCommit(); List<HoodieKey> deleteKeys = primaryKeys.stream().map(key -> new HoodieKey(key,tableName)).collect(Collectors.toList()); client.delete(deleteKeys, newCommitTime); } /** * 新增修改公用操作 * @param insertObjects * @param primaryKey */ private void upsert(List<JSONObject> insertObjects){ String newCommitTime = client.startCommit(); Schema avroSchema = new Schema.Parser().parse(tableFormat); List<HoodieRecord<HoodieAvroPayload>> hoodieRecords = insertObjects.stream().map(obj -> { String tableName = obj.getString("table_name"); String uuid = obj.getString("uuid"); GenericRecord genericRecord = new GenericData.Record(avroSchema); obj.forEach(genericRecord::put); HoodieKey hoodieKey = new HoodieKey(uuid, tableName); HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(genericRecord)); return (HoodieRecord<HoodieAvroPayload>) new HoodieAvroRecord<>(hoodieKey, payload); }).collect(Collectors.toList()); // 获取upsertStatus client.upsert(hoodieRecords, newCommitTime); } /** * 客户端关闭方法 */ public void close(){ client.close(); } }
(3)创建Schema,自定义表结构的JSON数据
//根据自己的表结构进行编写 private static String getTableFormat(String tableName){ JSONObject field1 = new JSONObject(); field1.put("name","uuid"); field1.put("type","string"); JSONObject field2 = new JSONObject(); field2.put("name","table_name"); field2.put("type","string"); JSONObject field3 = new JSONObject(); field3.put("name","date"); field3.put("type","string"); JSONArray fields = new JSONArray(); fields.add(field1); fields.add(field2); fields.add(field3); JSONObject schema = new JSONObject(); schema.put("type","record"); schema.put("name",tableName); schema.put("fields",fields); return schema.toJSONString(); } public static void main(String[] args) { String tableName = "data_raw_cow"; // 获取表的JSON结构 String tableFormat = getTableFormat(tableName); System.out.println(tableFormat); }
运行结果: { "name":"data_raw_cow", "type":"record", "fields":[ { "name":"uuid", "type":"string" }, { "name":"table_name", "type":"string" }, { "name":"date", "type":"string" } ] }
(4)随机获取表名方法(测试)
private static List<String> tableNames; static{ tableNames = Arrays.asList("table_name1","table_name2","table_name3","table_name4","table_name5","table_name6"); } private static String getTableName(){ Random random = new Random(); return tableNames.get(random.nextInt(tableNames.size())); }
(5)测试新增10条数据
public static void main(String[] args) { String tableName = "data_raw_cow"; // 获取表的JSON结构 String tableFormat = getTableFormat(tableName); System.out.println(tableFormat); List<JSONObject> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { JSONObject json = new JSONObject(); json.put("uuid",UUID.randomUUID().toString()); json.put("table_name",getTableName()); json.put("date", String.valueOf(LocalDateTime.now())); list.add(json); } HoodieClient client = new HoodieClient(tableName,tableFormat); client.upsertBatch(list); client.close(); }
(6)修改一条数据
/** * 修改测试数据,修改uuid为1dd87dd5-8e14-4562-9234-51247264968d,table_name为table_name6的数据,将日期改成xxxxxxxxxx * @return */ private static JSONObject getUpdateOneData(){ JSONObject jsonObject = new JSONObject(); jsonObject.put("uuid","1dd87dd5-8e14-4562-9234-51247264968d"); jsonObject.put("table_name","table_name6"); jsonObject.put("date","xxxxxxxxxx"); return jsonObject; } public static void main(String[] args) { String tableName = "data_raw_cow"; // 获取表的JSON结构 String tableFormat = getTableFormat(tableName); JSONObject updateOneData = getUpdateOneData(); HoodieClient client = new HoodieClient(tableName,tableFormat); client.upsertOne(updateOneData); client.close(); }
(7)测试删除数据,删除数据主要是拼接主键,按照HoodieKey去删除数据
public static void main(String[] args) { String tableName = "data_raw_cow"; // 获取表的JSON结构 String tableFormat = getTableFormat(tableName); HoodieClient client = new HoodieClient(tableName,tableFormat); client.deleteOne("1dd87dd5-8e14-4562-9234-51247264968d","table_name6"); client.close(); }
3.Spark整合hudi
Spark整合hudi这块主要是上述查询验证用到,也可以在Spark命令行去执行查看hudi数据
- idea怎末运行scala代码配置:
- https://www.jb51.net/article/216785.htm
(1)创建maven工程,引入依赖,采用scala语言,下面是pom.xml文件内容
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.lixiang</groupId> <artifactId>hudi_scala</artifactId> <version>1.0-SNAPSHOT</version> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <scala.version>2.12.10</scala.version> <scala.binary.version>2.12</scala.binary.version> <spark.version>3.0.0</spark.version> <hadoop.version>2.7.3</hadoop.version> <hudi.version>0.9.0</hudi.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark SQL 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- hudi-spark3 --> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-spark3-bundle_2.12</artifactId> <version>${hudi.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-avro_2.12</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
(2)编写scala代码
import org.apache.spark.sql.{DataFrame, SparkSession} object HuDiClientTest { def main(args: Array[String]): Unit = { //创建SparkSession实例对象,设置属性 val spark: SparkSession = { SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[2]") //设置序列化方式:Kryo .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() } val tableName: String = "data_raw_cow" val tablePath: String = "/usr/hudi/warehouse/" + tableName //查询数据,才采用Snapshot快照方式从Hudi表中查询数据 queryData(spark,tablePath) } /** * 查询hudi数据 * @param spark * @param tablePath */ def queryData(spark: SparkSession, tablePath: String): Unit = { spark.read.format("hudi").load(tablePath).createOrReplaceTempView("hudi_table") spark.sql("select * from hudi_table").show(false) } }