1. 生产测试数据
1.1 数据源
1.1.1 Hudi官方的测试数据
https://github.com/apache/hudi/tree/master/docker/demo/data
1.1.2 利用Hive的TPC-DS数据改造
利用TPC-DS的数据改造,具体查看GitLab:
https://code.bonc.com.cn/bdcut/hive-testbench/snippets/10
部分数据如下:
[hadoop@hadoop01 ~]$ head tpc-ds.json {"customer_id":"AAAAAAAAOKKNEKBA","shop_date":"1999/09/25","sum_cost":"140.33000069856644"}{"customer_id":"AAAAAAAAPNAJLMAA","shop_date":"2002/12/26","sum_cost":"29.320000052452087"}{"customer_id":"AAAAAAAAHNPHCLBA","shop_date":"1999/07/18","sum_cost":"45.949999272823334"}{"customer_id":"AAAAAAAAANHGIPAA","shop_date":"1998/10/04","sum_cost":"21.369999915361404"}{"customer_id":"AAAAAAAAPAHKHIBA","shop_date":"2001/12/13","sum_cost":"58.009999826550484"}{"customer_id":"AAAAAAAABNJBBABA","shop_date":"1998/11/06","sum_cost":"205.01999327540398"}{"customer_id":"AAAAAAAAILDDDKBA","shop_date":"1998/02/15","sum_cost":"107.06000108271837"}{"customer_id":"AAAAAAAAHFJMPEAA","shop_date":"1999/09/29","sum_cost":"43.04000025987625"}{"customer_id":"AAAAAAAAIBOOIHAA","shop_date":"2000/10/16","sum_cost":"122.53999684005976"}{"customer_id":"AAAAAAAAGEHOPKBA","shop_date":"2001/09/13","sum_cost":"73.4099994301796"}
1.2 将测试数据导入Kafka
1.2.1 利用kafkacat工具
1.2.1.1 接收
项目地址:https://github.com/edenhill/kafkacat
Kafkacat可以认为是kafka工具中的[netcat],是通用的非JVM生产消费工具 Apache Kafka >= 0.8。
1.2.1.2 非kerberized Kafka
1. 生产数据
cat batch_1.json | kafkacat -b 172.16.13.116:9092-t stock_ticks -P
2. 查看topic
kafkacat -L -b 172.16.13.116:9092-t stock_ticks
3. 查看topic元数据
kafkacat -b 172.16.13.116:9092-L -J | jq
1.2.1.3 Kerberized Kafka
1. 创建topic
kafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic stock-ticks
2. 编辑kafkacat.conf文件,通过-F指定配置文件,通过kafkacat –X list查看可配置的参数
bootstrap.servers=bdev001.bonc.com:9092,bdev002.bonc.com:9092##可选security.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.kerberos.principal=kafka@BONC.COMsasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab
3. 生产数据
cat batch.txt | kafkacat \-F kafkacat.conf \-b hadoop02.bonc.com:9092 \-t stock-ticks \-P
或者
cat batch.txt | kafkacat \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=GSSAPI \-X sasl.kerberos.service.name=kafka \-X sasl.kerberos.principal=kafka@BONC.COM \-X sasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab \-b hadoop02.bonc.com:9092 \-t stock-ticks \-P
4. 查看topic
kafkacat -L \-b hadoop02.bonc.com:9092 \-F kafkacat.conf \-t stock-ticks
5. 查看topic元数据
kafkacat \-b hadoop02.bonc.com:9092 \-F kafkacat.conf \-L -J | jq
1.2.2 利用kafka自带的测试工具
Kafka自带的生产性能测试工具kafka-producer-perf-test.sh可以用来加载数据。
### 创建topickafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic stock-tickskafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic store-sales### 生产Hudi自带的数据kafka-producer-perf-test.sh --topic stock-ticks --throughput 1000--producer.config /opt/beh/core/kafka/config/producer.properties --print-metrics --payload-file batch_1.json --num-records 3482### 生产改造的数据kafka-producer-perf-test.sh --topic store-sales --throughput 100000--producer.config /opt/beh/core/kafka/config/producer.properties --print-metrics --payload-file store_sales.json --num-records 479456
注:
--payload-delimiter —— 指定分隔符,默认是新行\n
--throughput —— Required, 将吞吐量(msg/s)调整到大约等于该值,如果该值很大,将达不到
--num-records —— *指定生产的数据量(msgs***),这个自然要小于等于文件中的records数。
1.3 写入HDFS
1.3.1 stock_ticks
1. 编辑kafka-source.properties
# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.###include=base.properties# Key fields, for kafka examplehoodie.datasource.write.recordkey.field=keyhoodie.datasource.write.partitionpath.field=date# schema provider configs#schema.registry.url=http://localhost:8081#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latesthoodie.deltastreamer.schemaprovider.source.schema.file=/opt/beh/core/spark/hudi/config/stock_ticks/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=file:///opt/beh/core/hudi/config/stock_ticks/schema.avsc# Kafka Source#hoodie.deltastreamer.source.kafka.topic=uber_tripshoodie.deltastreamer.source.kafka.topic=stock-ticks#Kafka propsbootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092auto.offset.reset=earliestsecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";
2. base.properties
#### Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.###hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.embed.timeline.server=truehoodie.filesystem.view.type=EMBEDDED_KV_STOREhoodie.compact.inline=false
3. schema.avsc
{"type":"record","name":"stock_ticks","fields":[{"name": "volume","type": "long"}, {"name": "ts", "type": "string"}, {"name": "symbol", "type": "string"},{"name": "year", "type": "int"},{"name": "month", "type": "string"},{"name": "high", "type": "double"},{"name": "low", "type": "double"},{"name": "key", "type": "string"},{"name": "date", "type":"string"}, {"name": "close", "type": "double"}, {"name": "open", "type": "double"}, {"name": "day", "type":"string"}]}
4. 写出HDFS-Copy On Write类型
### local 模式spark-submit \--master local[2] \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--target-table stock_ticks_cow \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider### Spark On Yarn模式spark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--target-table stock_ticks_cow \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
5. 写出HDFS——Merge On Read类型
### Local模式spark-submit \--master local[2] \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--target-table stock_ticks_mor \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--disable-compaction### Spark On Yarn模式spark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--target-table stock_ticks_mor \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--disable-compaction
1.3.2 store_sales
1. 编辑kafka-source.properties
include=base.properties# Key fields, for kafka examplehoodie.datasource.write.recordkey.field= customer_idhoodie.datasource.write.partitionpath.field= shop_date# schema provider configs#schema.registry.url=http://localhost:8081#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latesthoodie.deltastreamer.schemaprovider.source.schema.file=/opt/beh/core/spark/hudi/config/stock_ticks/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=file:///opt/beh/core/hudi/config/stock_ticks/schema.avsc# Kafka Source#hoodie.deltastreamer.source.kafka.topic=uber_tripshoodie.deltastreamer.source.kafka.topic= store-sales-1#Kafka propsbootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092auto.offset.reset=earliestsecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";
2. Base.properties
hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.embed.timeline.server=truehoodie.filesystem.view.type=EMBEDDED_KV_STOREhoodie.compact.inline=false
3. schema.avsc
{"type":"record","name":"store_sales","fields":[{"name": "customer_id","type": "string"}, {"name": "shop_date", "type": "string"}, {"name": "sum_cost", "type": "double"}]}
1.4 将hudi数据同步到Hive metastore
1. 前提搭建好hive环境
2. 从build的hoodie的hudi-hive模块获取run_sync_tool.sh文件
${Hoodie-Build}/hudi-hive/run_sync_tool.sh
3. 调整run_sync_tool.sh
[hadoop@hadoop02 tools]$ git diff run_sync_tool.sh.template run_sync_tool.shdiff --git a/run_sync_tool.sh.template b/run_sync_tool.shindex 42d2b9a..66c8180100755--- a/run_sync_tool.sh.template+++ b/run_sync_tool.sh@@ -47,9+47,11@@ if[ -z "${HIVE_JDBC}"]; then HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | grep -v handler | tr '\n' ':'`fi HIVE_JACKSON=`ls ${HIVE_HOME}/lib/jackson-*.jar | tr '\n' ':'`-HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON+# HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON+HIVE_JARS=`ls ${HIVE_HOME}/lib/*.jar | tr '\n' ':'`-HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/+# HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/li+HADOOP_HIVE_JARS=${HIVE_JARS}:${HIVE_HOME}/conf:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/* echo "Running Command : java -cp ${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:$HUDI_HIVE_UBER_JAR org.apache.hudi.hive.HiveSyncTool $@" java -cp $HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR} org.apache.hudi.hive.HiveSyncTool"$@"
4. 将hudi-utilities-*.jar放在${HIVE_HOME}/lib目录下,并重启Hive服务
5. 初始化票据,并创建hudi_stock数据库
$ kinit hadoop$ beeline –u “jdbc:hive2://hadoop01.bonc.com:10000/;principal=hs2/hadoop01.bonc.com@BONC.COM”
6. 同步1.3节写入HDFS的stock_ticks_cow
./run_sync_tool.sh \--jdbc-url "jdbc:hive2://hadoop02.bonc.com:10000/;principal=hs2/hadoop02.bonc.com@BONC.COM" \--user hadoop \--pass hadoop \--partitioned-by dt \--base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--database hudi_stock \--table stock_ticks_cow
注:
1、执行需要kinit缓存票据
2、即使使用缓存—user –pass也必须指定,脚本定义
7. 同步1.3节写入HDFS的stock_ticks_mor
./run_sync_tool.sh \--jdbc-url "jdbc:hive2://hadoop02.bonc.com:10000/;principal=hs2/hadoop02.bonc.com@BONC.COM" \--user hadoop \--pass hadoop \--partitioned-by dt \--base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--database hudi_stock \--table stock_ticks_mor
8. 查看同步结果
0: jdbc:hive2://hadoop02.bonc.com:10000/> show tables from hudi_stock;+---------------------+| tab_name |+---------------------+| stock_ticks_cow || stock_ticks_mor_ro || stock_ticks_mor_rt |+---------------------+
说明:
表stock_ticks_cow由步骤6产生,支持基于HoodieParquetInputFormat的snapshot query和incremental query;
表stock_ticks_mor_rt和stock_ticks_mor_ro由步骤7产生;
其中表stock_ticks_mor_rt支持基于HoodieParquetRealtimeInputFormat的snapshot query和incremental query;表stock_ticks_mor_ro支持基于HoodieParquetInputFormat的read optimized query。
1.5 Query
1.5.1 Run Hive Queries
前提条件:需要处理的问题如下
1. 调整hive-site.xml的配置,并重启hive服务
### 修改配置,默认值是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat<property><name>hive.input.format</name><value>org.apache.hadoop.hive.ql.io.HiveInputFormat</value></property>
2. 指定参数set hive.fetch.task.conversion=none;
3. 将hudi-utilities-*.jar拷贝到${HIVE_HOME}/lib目录下
4. 另外将hudi-hadoop-mr-*.jar分发到${HADOOP_HOME}/share/hadoop/common/及${HIVE_HOME}/lib/目录下。
beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=noneshow tables;show partitions stock_ticks_mor_rt;### Copy_On_Write Queriesselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG'; select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read Queries# Read Optimized Query,查询最新的时间戳select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';# Snapshot Query 查询最新的时间戳select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';#select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
注:实际上以上查询主要是针对各种类型(三种)的表执行两种查询
第一种:查看最新的时间戳
第二种:查询当前数据的部分投影,及部分数据
1.5.2 Run Spark-SQL Queries
运行Spark-SQL查询时,将hudi-spark-*.jar包拷贝至${SPARK_HOME}/jars目录下,同时间接依赖hive query中介绍的hudi-hadoop-mr-*.jar(放置到集群中hadoop/hive安装环境下)。
hudi-hadoop-mr-*.jar和RecordReader相关
./bin/spark-sql \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jarSql:show databases;use hudi_stock;select symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
./bin/spark-shell \--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \--driver-class-path /opt/beh/core/hive/conf \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--deploy-mode client \--driver-memory 1G \--executor-memory 3G \--num-executors 1Sql:spark.sql("show databases").show(100, false)spark.sql("show tables from hudi_stock").show(100, false)spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_cow where symbol = 'GOOG'").show(100, false)spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false):quit
1.5.3 Run Presto Queries(降低Presto版本成功)
1. 将hudi-presto-*.jar分发到${PRESTO_HOME}/plugin/hive-hadoop2/目录下,并重启Presto。
2. 调整${PRESTO_HOME}/etc/catalog/hive.properties,添加yarn-site的配置文件
hive.config.resources=/opt/beh/core/hadoop/etc/hadoop/core-site.xml,/opt/beh/core/hadoop/etc/hadoop/hdfs-site.xml,/opt/beh/core/hadoop/etc/hadoop/yarn-site.xml
./bin/presto \--server https://hadoop01.bonc.com:7778 \--krb5-config-path /etc/krb5.conf \--krb5-principal hadoop@BONC.COM \--krb5-keytab-path /opt/beh/metadata/key/presto.keytab \--krb5-remote-service-name presto \--keystore-path /opt/beh/metadata/key/hadoop.keystore \--keystore-password hadoop \--catalog hive \--schema hudi_stock### Copy_On_Writeselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select"_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read_ROselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select"_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
注:Presto不支持增量查询,只支持HoodieParquetInputFormat格式的查询即Copy_On_Write上的snapshot query以及Merge_On_Read_RO上的read optimized query。
异常终止:版本不一致导致
2020-05-28T18:00:07.263+0800 WARN hive-hive-0 io.prestosql.plugin.hive.util.ResumableTasks ResumableTask completed exceptionally
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/client/util/YarnClientUtils
at org.apache.hadoop.mapred.Master.getMasterPrincipal(Master.java:58)
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:81)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:216)
at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:105)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:362)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)
at io.prestosql.plugin.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)
at io.prestosql.plugin.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)
at io.prestosql.plugin.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)
at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.client.util.YarnClientUtils
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at io.prestosql.server.PluginClassLoader.loadClass(PluginClassLoader.java:80)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 17 more
1.6 写入第二批数据
1.6.1 Hoodie自带的数据
1. 利用kafkacat导入Kafka
cat /opt/beh/core/hudi/data/batch_2.json | kafkacat \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=GSSAPI \-X sasl.kerberos.service.name=kafka \-X sasl.kerberos.principal=kafka@BONC.COM \-X sasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab \-b hadoop02.bonc.com:9092 \-t stock-ticks \-P
2. 写入HDFS
### Copy_On_Writespark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--target-table stock_ticks_cow \--props file:///opt/beh/core/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider### Merge_On_Readspark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--target-table stock_ticks_mor \--props file:///opt/beh/core/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--disable-compaction
3. 查看导入结果
### Copy_On_Read[hadoop@hadoop02 tools]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_cow/2018/08/31Found3 items-rw------- 3 hadoop hadoop 932020-05-3018:40/user/hive/warehouse/stock_ticks_cow/2018/08/31/.hoodie_partition_metadata-rw------- 3 hadoop hadoop 4437892020-05-3018:40/user/hive/warehouse/stock_ticks_cow/2018/08/31/da906d57-1987-4439-8f18-14f14e380705-0_0-21-21_20200530184035.parquet-rw------- 3 hadoop hadoop 4435182020-06-0109:44/user/hive/warehouse/stock_ticks_cow/2018/08/31/da906d57-1987-4439-8f18-14f14e380705-0_0-21-24_20200601094404.parquet### Merge_On_Read[hadoop@hadoop02 tools]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_mor/2018/08/31Found3 items-rw------- 3 hadoop hadoop 216232020-06-0109:43/user/hive/warehouse/stock_ticks_mor/2018/08/31/.d972178a-2139-48cb-adea-909a1735266d-0_20200530184111.log.1_0-21-24-rw------- 3 hadoop hadoop 932020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/.hoodie_partition_metadata-rw------- 3 hadoop hadoop 4437772020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-21-21_20200530184111.parquet
注:可以看到Copy_On_Read表,直接写出到Columnar文件中
Merge_On_Read表,增量写入log文件中
1.6.2 重新同步到hive metastore
由于本次写入的数据并没有新建分区,因此不需要重新同步。
1.7 Query
1.7.1 Run Hive Queries
beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=none### Copy_On_Writeselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read# Read Optimized Queryselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';# Snapshot Queryselect symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
注:具体的查询结果不在上面展示,可以查看官网Docker Demo章节http://hudi.apache.org/docs/docker_demo.html
主要演示的是Copy_On_Read和Merge_On_Read的底层写入机制。
1.7.2 Run Spark-SQL Queries
./bin/spark-sql \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar ### 切换数据库show databases;use hudi_stock;### Copy_On_Writeselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read # Read Optimized Queryselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';# Snapshot Queryselect symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
1.7.3 Run Presto Queries
重新编译prestosql.presto-302,编译测试问题见4.4记录。
将hudi-presto-bundle-0.5.2-incubating.jar拷贝至${PRESTO_HOME}/plugin/hive-hadoop2目录下,重启Presto。
1.8 增量查询Copy_On_Write
1. Hive Query
beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=noneselect`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';+----------------------+---------+----------------------+---------+------------+-----------+| _hoodie_commit_time | symbol | ts | volume | open | close |+----------------------+---------+----------------------+---------+------------+-----------+| 20200530184035| GOOG | 2018-08-3109:59:00| 6330| 1230.5| 1230.02|| 20200601094404| GOOG | 2018-08-3110:59:00| 9021| 1227.1993| 1227.215|+----------------------+---------+----------------------+---------+------------+-----------+select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530185035';
注:指定的时间戳是居于两次提交时间的中间。
2. Spark-SQL
./bin/spark-sql \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jarselect`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530185035';
注:指定的时间戳是居于两次提交时间的中间。
3. Spark-shell
./bin/spark-shell \--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \--driver-class-path /opt/beh/core/hive/conf \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--deploy-mode client \--driver-memory 1G \--executor-memory 3G \--num-executors 1import org.apache.hudi.DataSourceReadOptionsval hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200530185035").load("/user/hive/warehouse/stock_ticks_cow")hoodieIncViewDF.registerTempTable("stock_ticks_cow_incr_tmp1")spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_incr_tmp1 where symbol = 'GOOG'").show(100, false);
1.9 在Merge_On_Read表上调度和运行compaction
1. 编译hudi,拷贝至管理节点
2. 进入hudi-cli模块
[hadoop@hadoop02 hudi]$ pwd/home/hadoop/hudi[hadoop@hadoop02 hudi]$ tree -L 2 hudi-clihudi-cli├── conf│ └── **hudi-env.sh**├── hoodie-cmd.log├── **hudi-cli.sh**├── pom.xml├── src│ └── main└── target├── checkstyle-cachefile├── checkstyle-checker.xml├── checkstyle-result.xml├── checkstyle-suppressions.xml├── classes├── classes.-502447588.timestamp├── generated-sources├── hudi-cli-0.5.2-incubating.jar├── hudi-cli-0.5.2-incubating-sources.jar├── **lib**├── maven-archiver├── maven-shared-archive-resources├── maven-status├── rat.txt└── test-classes
3. 编辑hudi-cli文件
### echo `hadoop classpath`**/opt/beh/core/hadoop/etc/hadoop:/opt/beh/core/hadoop/share/hadoop/common/lib/*:/opt/beh/core/hadoop/share/hadoop/common/*:/opt/beh/core/hadoop/share/hadoop/hdfs:/opt/beh/core/hadoop/share/hadoop/hdfs/lib/*:/opt/beh/core/hadoop/share/hadoop/hdfs/*:/opt/beh/core/hadoop/share/hadoop/yarn/lib/*:/opt/beh/core/hadoop/share/hadoop/yarn/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/lib/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/*:/opt/beh/core/hadoop/contrib/capacity-scheduler/*.jar### 编辑hudi-cli.sh文件,添加HADOOP依赖diff --git a/tmp/hudi-cli.sh b/hudi-cli/hudi-cli.shindex b6e708c..3086203100755--- a/tmp/hudi-cli.sh+++ b/hudi-cli/hudi-cli.sh@@ -25,4+25,4@@ if[ -z "$CLIENT_JAR"]; then echo "Client jar location not set, please set it in conf/hudi-env.sh"fi-java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@+java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:/opt/beh/core/hadoop/etc/hadoop:/opt/beh/core/hadoop/share/hadoop/common/lib/*:/opt/beh/core/hadoop/share/hadoop/common/*:/opt/beh/core/hadoop/share/hadoop/hdfs:/opt/beh/core/hadoop/share/hadoop/hdfs/lib/*:/opt/beh/core/hadoop/share/hadoop/hdfs/*:/opt/beh/core/hadoop/share/hadoop/yarn/lib/*:/opt/beh/core/hadoop/share/hadoop/yarn/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/lib/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@
4. 将写入kafka时定义的schema.avsc,上传至hdfs:////var/demo/config
[hadoop@hadoop02 hudi]$ hdfs dfs -ls /var/demo/config/schema.avsc-rw------- 3 hadoop hadoop 14642020-06-0114:58/var/demo/config/schema.avsc
5. 执行hudi-cli.sh
[hadoop@hadoop02 hudi-cli]$ ./hudi-cli.shhudi->connect --path /user/hive/warehouse/stock_ticks_morhudi:stock_ticks_mor->compactions show all![](./1.png)hudi:stock_ticks_mor->compaction scheduleCompaction successfully completed for***20200601144520\***hudi:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_morhudi:stock_ticks_mor->compactions show allhudi:stock_ticks_mor->compaction run --compactionInstant 20200601144520--parallelism 2--sparkMemory 1G--schemaFilePath /var/demo/config/schema.avsc --retry1Compaction successfully completed for20200601144520hudi:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_morhudi:stock_ticks_mor->compactions show all
6. 查看hudi数据
[hadoop@hadoop02 hudi]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_mor/2018/08/31Found4 items-rw------- 3 hadoop hadoop 216232020-06-0109:43/user/hive/warehouse/stock_ticks_mor/2018/08/31/.d972178a-2139-48cb-adea-909a1735266d-0_20200530184111.log.1_0-21-24-rw------- 3 hadoop hadoop 932020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/.hoodie_partition_metadata-rw------- 3 hadoop hadoop 4434792020-06-0114:59/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-0-0_20200601144520.parquet-rw------- 3 hadoop hadoop 4437772020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-21-21_20200530184111.parquet
1.10 Hive Queries ON Merge_On_Read
beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=none# Read Optimized Queryselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';# Snapshot Queryselect symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';# Incremental Query**select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530284111';
注:增量查询指定的提交时间,是合并提交时间戳。官方是2018年,但是截至文档书写时间是2020.
1.11 Spark-SQL ON Merge_On_Read
./bin/spark-shell \--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \--driver-class-path /opt/beh/core/hive/conf \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--deploy-mode client \--driver-memory 1G \--executor-memory 3G \--num-executors 1# Read Optimized Queryspark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)# Snapshot Queryspark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)# Incremental Queryspark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '20200530284111'").show(100, false)
2. 管理
在测试过程中,存在两处管理操作
•一处是使用run_sync_tool.sh脚本同步到hive metastore•一处是Merge_On_Read表通过hudi-cli.sh执行compaction操作
此两处均依赖hudi打包后生成的jars。可能需要规划hudi的目录。
3. 自定义数据压测
注:自定义数据存在的问题
首先,全是新增数据,并不含有更新数据。测试使用的是默认操作upsert,还支持insert,bulk_insert。
其次,数据并非是随着时间逐步更新的,数据在第一批次,基本上所有的分区就已经落地。
3.1 数据准备—落入kafka
1. 自定义数据准备参考1.1.2节
2. 数据落kafka及配置参考1.2节
3.2 从Kafka写入HDFS
3.2.1 相关配置
1. base.properties
hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.embed.timeline.server=truehoodie.filesystem.view.type=EMBEDDED_KV_STOREhoodie.compact.inline=false
2. schema.avsc
{"type":"record","name":"store_sales","fields":[{"name": "customer_id","type": "string"}, {"name": "shop_date", "type": "string"}, {"name": "sum_cost", "type": "double"}]}
3. kafka-source.properties
include=hdfs://beh001/hudi/store2/base.properties# Key fields, for kafka examplehoodie.datasource.write.recordkey.field=customer_idhoodie.datasource.write.partitionpath.field=shop_date# schema provider configs#schema.registry.url=http://localhost:8081#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latesthoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://beh001/hudi/store2/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://beh001/hudi/store2/schema.avsc# Kafka Source#hoodie.deltastreamer.source.kafka.topic=uber_tripshoodie.deltastreamer.source.kafka.topic=store-2#Kafka propsbootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092auto.offset.reset=earliestsecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";
注:配置文件上传至hdfs,因此配置文件中文件引用路径皆以hdfs://开头
3.2.2 数据持续写入COW表
nohup spark-submit \--master yarn \--num-executors 12 \--executor-memory 18G \--executor-cores 6 \--deploy-mode cluster \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--continuous \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field shop_date \--target-base-path hdfs://beh001/user/hive/warehouse/store_sales_cow \--target-table store_sales_cow \--props hdfs://beh001/hudi/store_sales/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider&
压测结果:
CommitTime | Total Bytes Written | Total Files Added | Total Files Updated | Total Partitions Written | Total Records Written | Total Update Records Written | Total Errors |
20200623114826 | 9.5 GB | 0 | 1823 | 1823 | 555059000 | 0 | 0 |
20200623114342 | 9.4 GB | 0 | 1823 | 1823 | 551850199 | 0 | 0 |
20200623113915 | 9.4 GB | 0 | 1823 | 1823 | 546850199 | 0 | 0 |
20200623113438 | 9.3 GB | 0 | 1823 | 1823 | 541850199 | 0 | 0 |
20200623113009 | 9.2 GB | 0 | 1823 | 1823 | 536850199 | 0 | 0 |
20200623112534 | 9.1 GB | 0 | 1823 | 1823 | 531850199 | 0 | 0 |
20200623112104 | 9.0 GB | 0 | 1823 | 1823 | 526850199 | 0 | 0 |
20200623111641 | 9.0 GB | 0 | 1823 | 1823 | 521850199 | 0 | 0 |
20200623111205 | 8.9 GB | 0 | 1823 | 1823 | 516850199 | 0 | 0 |
20200623110736 | 8.8 GB | 0 | 1823 | 1823 | 511850199 | 0 | 0 |
20200623110320 | 8.7 GB | 0 | 1823 | 1823 | 506850200 | 0 | 0 |
20200623105855 | 8.7 GB | 0 | 1823 | 1823 | 501850200 | 0 | 0 |
20200623105435 | 8.6 GB | 0 | 1823 | 1823 | 496850200 | 0 | 0 |
20200623105000 | 8.5 GB | 0 | 1823 | 1823 | 491850200 | 0 | 0 |
20200623104543 | 8.4 GB | 0 | 1823 | 1823 | 486850200 | 0 | 0 |
20200623104120 | 8.3 GB | 0 | 1823 | 1823 | 481850200 | 0 | 0 |
20200623103705 | 8.3 GB | 0 | 1823 | 1823 | 476850200 | 0 | 0 |
20200623103305 | 8.2 GB | 0 | 1823 | 1823 | 471850200 | 0 | 0 |
20200623102848 | 8.1 GB | 0 | 1823 | 1823 | 466850200 | 0 | 0 |
20200623102440 | 8.0 GB | 0 | 1823 | 1823 | 461850200 | 0 | 0 |
20200623102030 | 7.9 GB | 0 | 1823 | 1823 | 456850200 | 0 | 0 |
20200623101628 | 7.9 GB | 0 | 1823 | 1823 | 451850200 | 0 | 0 |
20200623101229 | 7.8 GB | 0 | 1823 | 1823 | 446850200 | 0 | 0 |
3.2.3 数据持续写入MOR表
nohup spark-submit \--master yarn \--num-executors 12 \--executor-memory 18G \--executor-cores 6 \--deploy-mode cluster \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--continuous \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field shop_date \--target-base-path hdfs://beh001/user/hive/warehouse/store_sales_mor \--target-table store_sales_mor \--props hdfs://beh001/hudi/store_sales/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider&
压测结果如下:
CommitTime | Total Bytes Written | Total Files Added | Total Files Updated | Total Partitions Written | Total Records Written | Total Update Records Written | Total Errors |
20200623165326 | 3.6 GB | 0 | 1823 | 1823 | 179999235 | 0 | 0 |
20200623165027 | 3.5 GB | 0 | 1823 | 1823 | 174999235 | 0 | 0 |
20200623164729 | 3.4 GB | 0 | 1823 | 1823 | 169999235 | 0 | 0 |
20200623164431 | 3.3 GB | 0 | 1823 | 1823 | 164999235 | 0 | 0 |
20200623164141 | 3.3 GB | 0 | 1823 | 1823 | 159999235 | 0 | 0 |
20200623163838 | 3.2 GB | 0 | 1823 | 1823 | 154999235 | 0 | 0 |
20200623163550 | 3.1 GB | 0 | 1823 | 1823 | 149999235 | 0 | 0 |
20200623163254 | 3.0 GB | 0 | 1823 | 1823 | 144999235 | 0 | 0 |
20200623163017 | 3.0 GB | 0 | 1823 | 1823 | 139999235 | 0 | 0 |
20200623162735 | 2.9 GB | 0 | 1823 | 1823 | 134999235 | 0 | 0 |
20200623162459 | 2.8 GB | 0 | 1823 | 1823 | 129999235 | 0 | 0 |
20200623162223 | 2.7 GB | 0 | 1823 | 1823 | 124999235 | 0 | 0 |
20200623161945 | 2.6 GB | 0 | 1823 | 1823 | 119999235 | 0 | 0 |
20200623161707 | 2.6 GB | 0 | 1823 | 1823 | 114999235 | 0 | 0 |
20200623161441 | 2.5 GB | 0 | 1823 | 1823 | 109999235 | 0 | 0 |
20200623161211 | 2.4 GB | 0 | 1823 | 1823 | 104999235 | 0 | 0 |
20200623160943 | 2.3 GB | 0 | 1823 | 1823 | 99999235 | 0 | 0 |
20200623160700 | 2.2 GB | 0 | 1823 | 1823 | 94999235 | 0 | 0 |
20200623160440 | 2.2 GB | 0 | 1823 | 1823 | 89999235 | 0 | 0 |
20200623160225 | 2.1 GB | 0 | 1823 | 1823 | 84999235 | 0 | 0 |
20200623160002 | 2.0 GB | 0 | 1823 | 1823 | 79999235 | 0 | 0 |
20200623155741 | 1.9 GB | 0 | 1823 | 1823 | 74999235 | 0 | 0 |
20200623155527 | 1.8 GB | 0 | 1823 | 1823 | 69999235 | 0 | 0 |
3.3 疑点
3.3.1 测试数据
自定义的数据全部是新增操作,不含有更新的情况。
3.3.2 写MOR时,没有delta log
在写MOR表时,自定义数据中没有更新数据的情况,并没有产生delta log。调整自定义数据增加部分更新操作,再次写MOR表时,确认是生成delta log。
4 问题记录
4.1 Kafkacat工具使用
[hadoop@bdev001 beh]$ cat batch_1.json | kafkacat -F kafkacat.conf -t zjh -P % Reading configuration from file kafkacat.conf%2|1591338623.769|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev001.bonc.com:9092/bootstrap]: sasl_plaintext://bdev001.bonc.com:9092/bootstrap: No worthy mechs found% ERROR: Local: Authentication failure: sasl_plaintext://bdev001.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)%2|1591338624.768|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev003.bonc.com:9092/bootstrap]: sasl_plaintext://bdev003.bonc.com:9092/bootstrap: No worthy mechs found% ERROR: Local: Authentication failure: sasl_plaintext://bdev003.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)%2|1591338625.768|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev002.bonc.com:9092/bootstrap]: sasl_plaintext://bdev002.bonc.com:9092/bootstrap: No worthy mechs found% ERROR: Local: Authentication failure: sasl_plaintext://bdev002.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)% ERROR: Local: All broker connections are down: 3/3 brokers are down : terminating
解决方式:
yum install cyrus-sasl-plain cyrus-sasl-devel cyrus-sasl-gssapi
4.2 Hive Queries
执行hive查询语句时,applicationmaster发现如下异常:
ERROR : Job failed with java.lang.ClassNotFoundException: org.apache.avro.LogicalType
java.util.concurrent.ExecutionException: Exception thrown by job
at org.apache.spark.JavaFutureActionWrapper.getImpl(FutureAction.scala:337)
at org.apache.spark.JavaFutureActionWrapper.get(FutureAction.scala:342)
at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:382)
at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:343)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 4, bdev002.bonc.com, executor 1): java.lang.NoClassDefFoundError: org/apache/avro/LogicalType
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:335)
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.(AbstractRealtimeRecordReader.java:108)
at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.(RealtimeCompactedRecordReader.java:50)
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:69)
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.(HoodieRealtimeRecordReader.java:47)
at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:251)
at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:418)
at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:256)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.LogicalType
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 23 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.lang.NoClassDefFoundError: org/apache/avro/LogicalType
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:335)
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.(AbstractRealtimeRecordReader.java:108)
at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.(RealtimeCompactedRecordReader.java:50)
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:69)
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.(HoodieRealtimeRecordReader.java:47)
at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:251)
at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:418)
at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:256)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.LogicalType
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 23 more
解决方式:
经排查发现我们的hadoop包里的avro-1.7.4.jar文件确实不包含相应的类。而在hudi的编译工程里发现高版本avro-1.8.2.jar里有对应的类。于是进行替换/opt/beh/core/hadoop/share/hadoop/common/lib和/opt/beh/core/hive/lib/目录下的低版本avro包。然后尝试成功。
4.3 Presto Queries
1. HoodieException
org.apache.hudi.exception.HoodieException: Error reading Hoodie partition metadata for hdfs://beh001/user/hive/warehouse/stock_ticks_cow/2018/08/31### Presto Coordinator节点开启debug编辑${PRESTO_HOME}/etc/** **log.propertiesio.prestosql=DEBUG输出日志org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=kafka, access=EXECUTE, inode="/user/hive/warehouse":hadoop:hadoop:drwx------
根本原因是kafka用户没有权限。
为什么是kafka用户?presto刷新了cache
4.4 编译测试
4.4.1 加载不到hadoop native包
java.lang.ExceptionInInitializerError
at io.prestosql.plugin.hive.HdfsEnvironment$$FastClassByGuice$$2c8553d4.newInstance()
at com.google.inject.internal.DefaultConstructionProxyFactory$FastClassProxy.newInstance(DefaultConstructionProxyFactory.java:89)
at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:114)
at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91)
at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306)
at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)
at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)
at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)
at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:42)
at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:65)
at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:113)
at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91)
at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306)
at com.google.inject.internal.FactoryProxy.get(FactoryProxy.java:62)
at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)
at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)
at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)
at com.google.inject.internal.InternalInjectorCreator.loadEagerSingletons(InternalInjectorCreator.java:211)
at com.google.inject.internal.InternalInjectorCreator.injectDynamically(InternalInjectorCreator.java:182)
at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:109)
at com.google.inject.Guice.createInjector(Guice.java:87)
at io.airlift.bootstrap.Bootstrap.initialize(Bootstrap.java:240)
at io.prestosql.plugin.hive.HiveConnectorFactory.create(HiveConnectorFactory.java:123)
at io.prestosql.connector.ConnectorManager.createConnector(ConnectorManager.java:321)
at io.prestosql.connector.ConnectorManager.addCatalogConnector(ConnectorManager.java:195)
at io.prestosql.connector.ConnectorManager.createConnection(ConnectorManager.java:187)
at io.prestosql.connector.ConnectorManager.createConnection(ConnectorManager.java:173)
at io.prestosql.metadata.StaticCatalogStore.loadCatalog(StaticCatalogStore.java:96)
at io.prestosql.metadata.StaticCatalogStore.loadCatalogs(StaticCatalogStore.java:74)
at io.prestosql.server.PrestoServer.run(PrestoServer.java:122)
at io.prestosql.server.PrestoServer.main(PrestoServer.java:68)
Caused by: java.lang.RuntimeException: failed to load Hadoop native library
at io.prestosql.hadoop.HadoopNative.requireHadoopNative(HadoopNative.java:58)
at io.prestosql.plugin.hive.HdfsEnvironment.(HdfsEnvironment.java:37)
... 31 more
Caused by: java.lang.RuntimeException: native snappy library not available: SnappyCompressor has not been loaded.
at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:72)
at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
at io.prestosql.hadoop.HadoopNative.loadAllCodecs(HadoopNative.java:71)
at io.prestosql.hadoop.HadoopNative.requireHadoopNative(HadoopNative.java:52)
... 32 more
解决方式:
export LD_LIBRARY_PATH=/opt/beh/core/hadoop/lib/native
或者在编译时处理。
4.4.2 加载不到org.apache.hadoop.yarn.conf.HAUtil
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/conf/HAUtil
at org.apache.hadoop.mapred.Master.getMasterAddress(Master.java:61)
at org.apache.hadoop.mapred.Master.getMasterPrincipal(Master.java:88)
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:81)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:213)
at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:105)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:362)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)
at io.prestosql.plugin.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)
at io.prestosql.plugin.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)
at io.prestosql.plugin.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)
at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.conf.HAUtil
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at io.prestosql.server.PluginClassLoader.loadClass(PluginClassLoader.java:80)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 18 more
解决方式:将hadoop-yarn-api拷贝到${PRESTO_HOME}/plugin/hive-hadoop2