Apache Hudi测试、运维操作万字总结

本文涉及的产品
性能测试 PTS,5000VUM额度
简介: Apache Hudi测试、运维操作万字总结

1. 生产测试数据

1.1 数据源

1.1.1 Hudi官方的测试数据

https://github.com/apache/hudi/tree/master/docker/demo/data

1.1.2 利用Hive的TPC-DS数据改造

利用TPC-DS的数据改造,具体查看GitLab:

https://code.bonc.com.cn/bdcut/hive-testbench/snippets/10

部分数据如下:

[hadoop@hadoop01 ~]$ head tpc-ds.json {"customer_id":"AAAAAAAAOKKNEKBA","shop_date":"1999/09/25","sum_cost":"140.33000069856644"}{"customer_id":"AAAAAAAAPNAJLMAA","shop_date":"2002/12/26","sum_cost":"29.320000052452087"}{"customer_id":"AAAAAAAAHNPHCLBA","shop_date":"1999/07/18","sum_cost":"45.949999272823334"}{"customer_id":"AAAAAAAAANHGIPAA","shop_date":"1998/10/04","sum_cost":"21.369999915361404"}{"customer_id":"AAAAAAAAPAHKHIBA","shop_date":"2001/12/13","sum_cost":"58.009999826550484"}{"customer_id":"AAAAAAAABNJBBABA","shop_date":"1998/11/06","sum_cost":"205.01999327540398"}{"customer_id":"AAAAAAAAILDDDKBA","shop_date":"1998/02/15","sum_cost":"107.06000108271837"}{"customer_id":"AAAAAAAAHFJMPEAA","shop_date":"1999/09/29","sum_cost":"43.04000025987625"}{"customer_id":"AAAAAAAAIBOOIHAA","shop_date":"2000/10/16","sum_cost":"122.53999684005976"}{"customer_id":"AAAAAAAAGEHOPKBA","shop_date":"2001/09/13","sum_cost":"73.4099994301796"}

1.2 将测试数据导入Kafka

1.2.1 利用kafkacat工具

1.2.1.1 接收

项目地址:https://github.com/edenhill/kafkacat

Kafkacat可以认为是kafka工具中的[netcat],是通用的非JVM生产消费工具 Apache Kafka >= 0.8。

1.2.1.2 非kerberized Kafka

1. 生产数据

cat batch_1.json | kafkacat -b 172.16.13.116:9092-t stock_ticks -P

2. 查看topic

kafkacat -L -b 172.16.13.116:9092-t stock_ticks

3. 查看topic元数据

kafkacat -b 172.16.13.116:9092-L -J | jq

1.2.1.3 Kerberized Kafka

1. 创建topic

kafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic stock-ticks

2. 编辑kafkacat.conf文件,通过-F指定配置文件,通过kafkacat –X list查看可配置的参数

bootstrap.servers=bdev001.bonc.com:9092,bdev002.bonc.com:9092##可选security.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.kerberos.principal=kafka@BONC.COMsasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab

3. 生产数据

cat batch.txt | kafkacat \-F kafkacat.conf \-b hadoop02.bonc.com:9092 \-t stock-ticks \-P

或者

cat batch.txt | kafkacat \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=GSSAPI \-X sasl.kerberos.service.name=kafka \-X sasl.kerberos.principal=kafka@BONC.COM \-X sasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab \-b hadoop02.bonc.com:9092 \-t stock-ticks \-P

4. 查看topic

kafkacat -L \-b hadoop02.bonc.com:9092 \-F kafkacat.conf \-t stock-ticks

5. 查看topic元数据

kafkacat \-b hadoop02.bonc.com:9092 \-F kafkacat.conf \-L -J | jq

1.2.2 利用kafka自带的测试工具

Kafka自带的生产性能测试工具kafka-producer-perf-test.sh可以用来加载数据。

### 创建topickafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic stock-tickskafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic store-sales### 生产Hudi自带的数据kafka-producer-perf-test.sh --topic stock-ticks --throughput 1000--producer.config /opt/beh/core/kafka/config/producer.properties --print-metrics --payload-file batch_1.json --num-records 3482### 生产改造的数据kafka-producer-perf-test.sh --topic store-sales --throughput 100000--producer.config /opt/beh/core/kafka/config/producer.properties --print-metrics --payload-file store_sales.json --num-records 479456

注:

--payload-delimiter —— 指定分隔符,默认是新行\n

--throughput —— Required, 将吞吐量(msg/s)调整到大约等于该值,如果该值很大,将达不到

--num-records —— *指定生产的数据量(msgs***),这个自然要小于等于文件中的records数。

1.3 写入HDFS

1.3.1 stock_ticks

1. 编辑kafka-source.properties

# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at##   http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.###include=base.properties# Key fields, for kafka examplehoodie.datasource.write.recordkey.field=keyhoodie.datasource.write.partitionpath.field=date# schema provider configs#schema.registry.url=http://localhost:8081#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latesthoodie.deltastreamer.schemaprovider.source.schema.file=/opt/beh/core/spark/hudi/config/stock_ticks/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=file:///opt/beh/core/hudi/config/stock_ticks/schema.avsc# Kafka Source#hoodie.deltastreamer.source.kafka.topic=uber_tripshoodie.deltastreamer.source.kafka.topic=stock-ticks#Kafka propsbootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092auto.offset.reset=earliestsecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";

2. base.properties

#### Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at##   http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.###hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.embed.timeline.server=truehoodie.filesystem.view.type=EMBEDDED_KV_STOREhoodie.compact.inline=false

3. schema.avsc

{"type":"record","name":"stock_ticks","fields":[{"name": "volume","type": "long"}, {"name": "ts", "type": "string"}, {"name": "symbol", "type": "string"},{"name": "year", "type": "int"},{"name": "month", "type": "string"},{"name": "high", "type": "double"},{"name": "low", "type": "double"},{"name": "key", "type": "string"},{"name": "date", "type":"string"}, {"name": "close", "type": "double"}, {"name": "open", "type": "double"}, {"name": "day", "type":"string"}]}

4. 写出HDFS-Copy On Write类型

### local 模式spark-submit \--master local[2] \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--target-table stock_ticks_cow \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider### Spark On Yarn模式spark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--target-table stock_ticks_cow \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider

5. 写出HDFS——Merge On Read类型

### Local模式spark-submit \--master local[2] \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--target-table stock_ticks_mor \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--disable-compaction### Spark On Yarn模式spark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--target-table stock_ticks_mor \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--disable-compaction

1.3.2 store_sales

1. 编辑kafka-source.properties

include=base.properties# Key fields, for kafka examplehoodie.datasource.write.recordkey.field= customer_idhoodie.datasource.write.partitionpath.field= shop_date# schema provider configs#schema.registry.url=http://localhost:8081#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latesthoodie.deltastreamer.schemaprovider.source.schema.file=/opt/beh/core/spark/hudi/config/stock_ticks/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=file:///opt/beh/core/hudi/config/stock_ticks/schema.avsc# Kafka Source#hoodie.deltastreamer.source.kafka.topic=uber_tripshoodie.deltastreamer.source.kafka.topic= store-sales-1#Kafka propsbootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092auto.offset.reset=earliestsecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";

2. Base.properties

hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.embed.timeline.server=truehoodie.filesystem.view.type=EMBEDDED_KV_STOREhoodie.compact.inline=false

3. schema.avsc

{"type":"record","name":"store_sales","fields":[{"name": "customer_id","type": "string"}, {"name": "shop_date", "type": "string"}, {"name": "sum_cost", "type": "double"}]}

1.4 将hudi数据同步到Hive metastore

1. 前提搭建好hive环境

2. 从build的hoodie的hudi-hive模块获取run_sync_tool.sh文件

${Hoodie-Build}/hudi-hive/run_sync_tool.sh

3. 调整run_sync_tool.sh

[hadoop@hadoop02 tools]$ git diff run_sync_tool.sh.template run_sync_tool.shdiff --git a/run_sync_tool.sh.template b/run_sync_tool.shindex 42d2b9a..66c8180100755--- a/run_sync_tool.sh.template+++ b/run_sync_tool.sh@@ -47,9+47,11@@ if[ -z "${HIVE_JDBC}"]; then  HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | grep -v handler | tr '\n' ':'`fi HIVE_JACKSON=`ls ${HIVE_HOME}/lib/jackson-*.jar | tr '\n' ':'`-HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON+# HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON+HIVE_JARS=`ls ${HIVE_HOME}/lib/*.jar | tr '\n' ':'`-HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/+# HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/li+HADOOP_HIVE_JARS=${HIVE_JARS}:${HIVE_HOME}/conf:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/* echo "Running Command : java -cp ${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:$HUDI_HIVE_UBER_JAR org.apache.hudi.hive.HiveSyncTool $@" java -cp $HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR} org.apache.hudi.hive.HiveSyncTool"$@"

4. 将hudi-utilities-*.jar放在${HIVE_HOME}/lib目录下,并重启Hive服务

5. 初始化票据,并创建hudi_stock数据库

$ kinit hadoop$ beeline –u “jdbc:hive2://hadoop01.bonc.com:10000/;principal=hs2/hadoop01.bonc.com@BONC.COM”

6. 同步1.3节写入HDFS的stock_ticks_cow

./run_sync_tool.sh \--jdbc-url "jdbc:hive2://hadoop02.bonc.com:10000/;principal=hs2/hadoop02.bonc.com@BONC.COM" \--user hadoop \--pass hadoop \--partitioned-by dt \--base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--database hudi_stock \--table stock_ticks_cow

注:

1、执行需要kinit缓存票据

2、即使使用缓存—user –pass也必须指定,脚本定义

7. 同步1.3节写入HDFS的stock_ticks_mor

./run_sync_tool.sh \--jdbc-url "jdbc:hive2://hadoop02.bonc.com:10000/;principal=hs2/hadoop02.bonc.com@BONC.COM" \--user hadoop \--pass hadoop \--partitioned-by dt \--base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--database hudi_stock \--table stock_ticks_mor

8. 查看同步结果

0: jdbc:hive2://hadoop02.bonc.com:10000/> show tables from hudi_stock;+---------------------+|   tab_name    |+---------------------+| stock_ticks_cow   || stock_ticks_mor_ro || stock_ticks_mor_rt |+---------------------+

说明:

表stock_ticks_cow由步骤6产生,支持基于HoodieParquetInputFormat的snapshot query和incremental query;

表stock_ticks_mor_rt和stock_ticks_mor_ro由步骤7产生;

其中表stock_ticks_mor_rt支持基于HoodieParquetRealtimeInputFormat的snapshot query和incremental query;表stock_ticks_mor_ro支持基于HoodieParquetInputFormat的read optimized query。

1.5 Query

1.5.1 Run Hive Queries

前提条件:需要处理的问题如下

1. 调整hive-site.xml的配置,并重启hive服务

### 修改配置,默认值是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat<property><name>hive.input.format</name><value>org.apache.hadoop.hive.ql.io.HiveInputFormat</value></property>

2. 指定参数set hive.fetch.task.conversion=none;

3. 将hudi-utilities-*.jar拷贝到${HIVE_HOME}/lib目录下

4. 另外将hudi-hadoop-mr-*.jar分发到${HADOOP_HOME}/share/hadoop/common/及${HIVE_HOME}/lib/目录下。

beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=noneshow tables;show partitions stock_ticks_mor_rt;### Copy_On_Write Queriesselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG'; select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read Queries# Read Optimized Query,查询最新的时间戳select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';# Snapshot Query 查询最新的时间戳select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';#select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';

注:实际上以上查询主要是针对各种类型(三种)的表执行两种查询

第一种:查看最新的时间戳

第二种:查询当前数据的部分投影,及部分数据

1.5.2 Run Spark-SQL Queries

运行Spark-SQL查询时,将hudi-spark-*.jar包拷贝至${SPARK_HOME}/jars目录下,同时间接依赖hive query中介绍的hudi-hadoop-mr-*.jar(放置到集群中hadoop/hive安装环境下)。

hudi-hadoop-mr-*.jar和RecordReader相关

./bin/spark-sql \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jarSql:show databases;use hudi_stock;select symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
./bin/spark-shell \--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \--driver-class-path /opt/beh/core/hive/conf \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--deploy-mode client \--driver-memory 1G \--executor-memory 3G \--num-executors 1Sql:spark.sql("show databases").show(100, false)spark.sql("show tables from hudi_stock").show(100, false)spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_cow where symbol = 'GOOG'").show(100, false)spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false):quit

1.5.3 Run Presto Queries(降低Presto版本成功)

1. 将hudi-presto-*.jar分发到${PRESTO_HOME}/plugin/hive-hadoop2/目录下,并重启Presto。

2. 调整${PRESTO_HOME}/etc/catalog/hive.properties,添加yarn-site的配置文件

hive.config.resources=/opt/beh/core/hadoop/etc/hadoop/core-site.xml,/opt/beh/core/hadoop/etc/hadoop/hdfs-site.xml,/opt/beh/core/hadoop/etc/hadoop/yarn-site.xml
./bin/presto \--server https://hadoop01.bonc.com:7778 \--krb5-config-path /etc/krb5.conf \--krb5-principal hadoop@BONC.COM \--krb5-keytab-path /opt/beh/metadata/key/presto.keytab \--krb5-remote-service-name presto \--keystore-path /opt/beh/metadata/key/hadoop.keystore \--keystore-password hadoop \--catalog hive \--schema hudi_stock### Copy_On_Writeselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select"_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read_ROselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select"_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';

注:Presto不支持增量查询,只支持HoodieParquetInputFormat格式的查询即Copy_On_Write上的snapshot query以及Merge_On_Read_RO上的read optimized query。

异常终止:版本不一致导致

2020-05-28T18:00:07.263+0800 WARN hive-hive-0 io.prestosql.plugin.hive.util.ResumableTasks ResumableTask completed exceptionally

java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/client/util/YarnClientUtils

at org.apache.hadoop.mapred.Master.getMasterPrincipal(Master.java:58)

at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:81)

at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:216)

at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:105)

at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:362)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)

at io.prestosql.plugin.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)

at io.prestosql.plugin.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)

at io.prestosql.plugin.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)

at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.client.util.YarnClientUtils

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at io.prestosql.server.PluginClassLoader.loadClass(PluginClassLoader.java:80)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 17 more

1.6 写入第二批数据

1.6.1 Hoodie自带的数据

1. 利用kafkacat导入Kafka

cat /opt/beh/core/hudi/data/batch_2.json | kafkacat \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=GSSAPI \-X sasl.kerberos.service.name=kafka \-X sasl.kerberos.principal=kafka@BONC.COM \-X sasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab \-b hadoop02.bonc.com:9092 \-t stock-ticks \-P

2. 写入HDFS

### Copy_On_Writespark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--target-table stock_ticks_cow \--props file:///opt/beh/core/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider### Merge_On_Readspark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--target-table stock_ticks_mor \--props file:///opt/beh/core/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--disable-compaction

3. 查看导入结果

### Copy_On_Read[hadoop@hadoop02 tools]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_cow/2018/08/31Found3 items-rw-------  3 hadoop hadoop     932020-05-3018:40/user/hive/warehouse/stock_ticks_cow/2018/08/31/.hoodie_partition_metadata-rw-------  3 hadoop hadoop   4437892020-05-3018:40/user/hive/warehouse/stock_ticks_cow/2018/08/31/da906d57-1987-4439-8f18-14f14e380705-0_0-21-21_20200530184035.parquet-rw-------  3 hadoop hadoop   4435182020-06-0109:44/user/hive/warehouse/stock_ticks_cow/2018/08/31/da906d57-1987-4439-8f18-14f14e380705-0_0-21-24_20200601094404.parquet### Merge_On_Read[hadoop@hadoop02 tools]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_mor/2018/08/31Found3 items-rw-------  3 hadoop hadoop   216232020-06-0109:43/user/hive/warehouse/stock_ticks_mor/2018/08/31/.d972178a-2139-48cb-adea-909a1735266d-0_20200530184111.log.1_0-21-24-rw-------  3 hadoop hadoop     932020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/.hoodie_partition_metadata-rw-------  3 hadoop hadoop   4437772020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-21-21_20200530184111.parquet

注:可以看到Copy_On_Read表,直接写出到Columnar文件中

Merge_On_Read表,增量写入log文件中

1.6.2 重新同步到hive metastore

由于本次写入的数据并没有新建分区,因此不需要重新同步。

1.7 Query

1.7.1 Run Hive Queries

beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=none### Copy_On_Writeselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read# Read Optimized Queryselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';# Snapshot Queryselect symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';

注:具体的查询结果不在上面展示,可以查看官网Docker Demo章节http://hudi.apache.org/docs/docker_demo.html

主要演示的是Copy_On_Read和Merge_On_Read的底层写入机制。

1.7.2 Run Spark-SQL Queries

./bin/spark-sql \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar ### 切换数据库show databases;use hudi_stock;### Copy_On_Writeselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read # Read Optimized Queryselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';# Snapshot Queryselect symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';

1.7.3 Run Presto Queries

重新编译prestosql.presto-302,编译测试问题见4.4记录。

将hudi-presto-bundle-0.5.2-incubating.jar拷贝至${PRESTO_HOME}/plugin/hive-hadoop2目录下,重启Presto。

1.8 增量查询Copy_On_Write

1. Hive Query

beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=noneselect`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';+----------------------+---------+----------------------+---------+------------+-----------+| _hoodie_commit_time | symbol |     ts     | volume |  open  |  close  |+----------------------+---------+----------------------+---------+------------+-----------+| 20200530184035| GOOG  | 2018-08-3109:59:00| 6330| 1230.5| 1230.02|| 20200601094404| GOOG  | 2018-08-3110:59:00| 9021| 1227.1993| 1227.215|+----------------------+---------+----------------------+---------+------------+-----------+select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530185035';

注:指定的时间戳是居于两次提交时间的中间。

2. Spark-SQL

./bin/spark-sql \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jarselect`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530185035';

注:指定的时间戳是居于两次提交时间的中间。

3. Spark-shell

./bin/spark-shell \--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \--driver-class-path /opt/beh/core/hive/conf \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--deploy-mode client \--driver-memory 1G \--executor-memory 3G \--num-executors 1import org.apache.hudi.DataSourceReadOptionsval hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200530185035").load("/user/hive/warehouse/stock_ticks_cow")hoodieIncViewDF.registerTempTable("stock_ticks_cow_incr_tmp1")spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_incr_tmp1 where symbol = 'GOOG'").show(100, false);

1.9 在Merge_On_Read表上调度和运行compaction

1. 编译hudi,拷贝至管理节点

2. 进入hudi-cli模块

[hadoop@hadoop02 hudi]$ pwd/home/hadoop/hudi[hadoop@hadoop02 hudi]$ tree -L 2 hudi-clihudi-cli├── conf│  └── **hudi-env.sh**├── hoodie-cmd.log├── **hudi-cli.sh**├── pom.xml├── src│  └── main└── target├── checkstyle-cachefile├── checkstyle-checker.xml├── checkstyle-result.xml├── checkstyle-suppressions.xml├── classes├── classes.-502447588.timestamp├── generated-sources├── hudi-cli-0.5.2-incubating.jar├── hudi-cli-0.5.2-incubating-sources.jar├── **lib**├── maven-archiver├── maven-shared-archive-resources├── maven-status├── rat.txt└── test-classes

3. 编辑hudi-cli文件

### echo `hadoop classpath`**/opt/beh/core/hadoop/etc/hadoop:/opt/beh/core/hadoop/share/hadoop/common/lib/*:/opt/beh/core/hadoop/share/hadoop/common/*:/opt/beh/core/hadoop/share/hadoop/hdfs:/opt/beh/core/hadoop/share/hadoop/hdfs/lib/*:/opt/beh/core/hadoop/share/hadoop/hdfs/*:/opt/beh/core/hadoop/share/hadoop/yarn/lib/*:/opt/beh/core/hadoop/share/hadoop/yarn/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/lib/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/*:/opt/beh/core/hadoop/contrib/capacity-scheduler/*.jar### 编辑hudi-cli.sh文件,添加HADOOP依赖diff --git a/tmp/hudi-cli.sh b/hudi-cli/hudi-cli.shindex b6e708c..3086203100755--- a/tmp/hudi-cli.sh+++ b/hudi-cli/hudi-cli.sh@@ -25,4+25,4@@ if[ -z "$CLIENT_JAR"]; then  echo "Client jar location not set, please set it in conf/hudi-env.sh"fi-java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@+java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:/opt/beh/core/hadoop/etc/hadoop:/opt/beh/core/hadoop/share/hadoop/common/lib/*:/opt/beh/core/hadoop/share/hadoop/common/*:/opt/beh/core/hadoop/share/hadoop/hdfs:/opt/beh/core/hadoop/share/hadoop/hdfs/lib/*:/opt/beh/core/hadoop/share/hadoop/hdfs/*:/opt/beh/core/hadoop/share/hadoop/yarn/lib/*:/opt/beh/core/hadoop/share/hadoop/yarn/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/lib/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@

4. 将写入kafka时定义的schema.avsc,上传至hdfs:////var/demo/config

[hadoop@hadoop02 hudi]$ hdfs dfs -ls /var/demo/config/schema.avsc-rw-------  3 hadoop hadoop    14642020-06-0114:58/var/demo/config/schema.avsc

5. 执行hudi-cli.sh

[hadoop@hadoop02 hudi-cli]$ ./hudi-cli.shhudi->connect --path /user/hive/warehouse/stock_ticks_morhudi:stock_ticks_mor->compactions show all![](./1.png)hudi:stock_ticks_mor->compaction scheduleCompaction successfully completed for***20200601144520\***hudi:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_morhudi:stock_ticks_mor->compactions show allhudi:stock_ticks_mor->compaction run --compactionInstant 20200601144520--parallelism 2--sparkMemory 1G--schemaFilePath /var/demo/config/schema.avsc --retry1Compaction successfully completed for20200601144520hudi:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_morhudi:stock_ticks_mor->compactions show all

6. 查看hudi数据

[hadoop@hadoop02 hudi]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_mor/2018/08/31Found4 items-rw-------  3 hadoop hadoop   216232020-06-0109:43/user/hive/warehouse/stock_ticks_mor/2018/08/31/.d972178a-2139-48cb-adea-909a1735266d-0_20200530184111.log.1_0-21-24-rw-------  3 hadoop hadoop     932020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/.hoodie_partition_metadata-rw-------  3 hadoop hadoop   4434792020-06-0114:59/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-0-0_20200601144520.parquet-rw-------  3 hadoop hadoop   4437772020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-21-21_20200530184111.parquet

1.10 Hive Queries ON Merge_On_Read

beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=none# Read Optimized Queryselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';# Snapshot Queryselect symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';# Incremental Query**select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530284111';

注:增量查询指定的提交时间,是合并提交时间戳。官方是2018年,但是截至文档书写时间是2020.

1.11 Spark-SQL ON Merge_On_Read

./bin/spark-shell \--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \--driver-class-path /opt/beh/core/hive/conf \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--deploy-mode client \--driver-memory 1G \--executor-memory 3G \--num-executors 1# Read Optimized Queryspark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)# Snapshot Queryspark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)# Incremental Queryspark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '20200530284111'").show(100, false)

2. 管理

在测试过程中,存在两处管理操作

一处是使用run_sync_tool.sh脚本同步到hive metastore一处是Merge_On_Read表通过hudi-cli.sh执行compaction操作

此两处均依赖hudi打包后生成的jars。可能需要规划hudi的目录。

3. 自定义数据压测

注:自定义数据存在的问题

首先,全是新增数据,并不含有更新数据。测试使用的是默认操作upsert,还支持insert,bulk_insert。

其次,数据并非是随着时间逐步更新的,数据在第一批次,基本上所有的分区就已经落地。

3.1 数据准备—落入kafka

1. 自定义数据准备参考1.1.2节

2. 数据落kafka及配置参考1.2节

3.2 从Kafka写入HDFS

3.2.1 相关配置

1. base.properties

hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.embed.timeline.server=truehoodie.filesystem.view.type=EMBEDDED_KV_STOREhoodie.compact.inline=false

2. schema.avsc

{"type":"record","name":"store_sales","fields":[{"name": "customer_id","type": "string"}, {"name": "shop_date", "type": "string"}, {"name": "sum_cost", "type": "double"}]}

3. kafka-source.properties

include=hdfs://beh001/hudi/store2/base.properties# Key fields, for kafka examplehoodie.datasource.write.recordkey.field=customer_idhoodie.datasource.write.partitionpath.field=shop_date# schema provider configs#schema.registry.url=http://localhost:8081#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latesthoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://beh001/hudi/store2/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://beh001/hudi/store2/schema.avsc# Kafka Source#hoodie.deltastreamer.source.kafka.topic=uber_tripshoodie.deltastreamer.source.kafka.topic=store-2#Kafka propsbootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092auto.offset.reset=earliestsecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";

注:配置文件上传至hdfs,因此配置文件中文件引用路径皆以hdfs://开头

3.2.2 数据持续写入COW表

nohup spark-submit \--master yarn \--num-executors 12 \--executor-memory 18G \--executor-cores 6 \--deploy-mode cluster \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--continuous \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field shop_date \--target-base-path hdfs://beh001/user/hive/warehouse/store_sales_cow \--target-table store_sales_cow \--props hdfs://beh001/hudi/store_sales/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider&

压测结果:

CommitTime Total Bytes Written Total Files Added Total Files Updated Total Partitions Written Total Records Written Total Update Records Written Total Errors
20200623114826 9.5 GB 0 1823 1823 555059000 0 0
20200623114342 9.4 GB 0 1823 1823 551850199 0 0
20200623113915 9.4 GB 0 1823 1823 546850199 0 0
20200623113438 9.3 GB 0 1823 1823 541850199 0 0
20200623113009 9.2 GB 0 1823 1823 536850199 0 0
20200623112534 9.1 GB 0 1823 1823 531850199 0 0
20200623112104 9.0 GB 0 1823 1823 526850199 0 0
20200623111641 9.0 GB 0 1823 1823 521850199 0 0
20200623111205 8.9 GB 0 1823 1823 516850199 0 0
20200623110736 8.8 GB 0 1823 1823 511850199 0 0
20200623110320 8.7 GB 0 1823 1823 506850200 0 0
20200623105855 8.7 GB 0 1823 1823 501850200 0 0
20200623105435 8.6 GB 0 1823 1823 496850200 0 0
20200623105000 8.5 GB 0 1823 1823 491850200 0 0
20200623104543 8.4 GB 0 1823 1823 486850200 0 0
20200623104120 8.3 GB 0 1823 1823 481850200 0 0
20200623103705 8.3 GB 0 1823 1823 476850200 0 0
20200623103305 8.2 GB 0 1823 1823 471850200 0 0
20200623102848 8.1 GB 0 1823 1823 466850200 0 0
20200623102440 8.0 GB 0 1823 1823 461850200 0 0
20200623102030 7.9 GB 0 1823 1823 456850200 0 0
20200623101628 7.9 GB 0 1823 1823 451850200 0 0
20200623101229 7.8 GB 0 1823 1823 446850200 0 0

3.2.3 数据持续写入MOR表

nohup spark-submit \--master yarn \--num-executors 12 \--executor-memory 18G \--executor-cores 6 \--deploy-mode cluster \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--continuous \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field shop_date \--target-base-path hdfs://beh001/user/hive/warehouse/store_sales_mor \--target-table store_sales_mor \--props hdfs://beh001/hudi/store_sales/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider&

压测结果如下:

CommitTime Total Bytes Written Total Files Added Total Files Updated Total Partitions Written Total Records Written Total Update Records Written Total Errors
20200623165326 3.6 GB 0 1823 1823 179999235 0 0
20200623165027 3.5 GB 0 1823 1823 174999235 0 0
20200623164729 3.4 GB 0 1823 1823 169999235 0 0
20200623164431 3.3 GB 0 1823 1823 164999235 0 0
20200623164141 3.3 GB 0 1823 1823 159999235 0 0
20200623163838 3.2 GB 0 1823 1823 154999235 0 0
20200623163550 3.1 GB 0 1823 1823 149999235 0 0
20200623163254 3.0 GB 0 1823 1823 144999235 0 0
20200623163017 3.0 GB 0 1823 1823 139999235 0 0
20200623162735 2.9 GB 0 1823 1823 134999235 0 0
20200623162459 2.8 GB 0 1823 1823 129999235 0 0
20200623162223 2.7 GB 0 1823 1823 124999235 0 0
20200623161945 2.6 GB 0 1823 1823 119999235 0 0
20200623161707 2.6 GB 0 1823 1823 114999235 0 0
20200623161441 2.5 GB 0 1823 1823 109999235 0 0
20200623161211 2.4 GB 0 1823 1823 104999235 0 0
20200623160943 2.3 GB 0 1823 1823 99999235 0 0
20200623160700 2.2 GB 0 1823 1823 94999235 0 0
20200623160440 2.2 GB 0 1823 1823 89999235 0 0
20200623160225 2.1 GB 0 1823 1823 84999235 0 0
20200623160002 2.0 GB 0 1823 1823 79999235 0 0
20200623155741 1.9 GB 0 1823 1823 74999235 0 0
20200623155527 1.8 GB 0 1823 1823 69999235 0 0

3.3 疑点

3.3.1 测试数据

自定义的数据全部是新增操作,不含有更新的情况。

3.3.2 写MOR时,没有delta log

在写MOR表时,自定义数据中没有更新数据的情况,并没有产生delta log。调整自定义数据增加部分更新操作,再次写MOR表时,确认是生成delta log。

4 问题记录

4.1 Kafkacat工具使用

[hadoop@bdev001 beh]$ cat batch_1.json | kafkacat -F kafkacat.conf -t zjh -P             % Reading configuration from file kafkacat.conf%2|1591338623.769|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev001.bonc.com:9092/bootstrap]: sasl_plaintext://bdev001.bonc.com:9092/bootstrap: No worthy mechs found% ERROR: Local: Authentication failure: sasl_plaintext://bdev001.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)%2|1591338624.768|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev003.bonc.com:9092/bootstrap]: sasl_plaintext://bdev003.bonc.com:9092/bootstrap: No worthy mechs found% ERROR: Local: Authentication failure: sasl_plaintext://bdev003.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)%2|1591338625.768|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev002.bonc.com:9092/bootstrap]: sasl_plaintext://bdev002.bonc.com:9092/bootstrap: No worthy mechs found% ERROR: Local: Authentication failure: sasl_plaintext://bdev002.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)% ERROR: Local: All broker connections are down: 3/3 brokers are down : terminating

解决方式:

yum install cyrus-sasl-plain cyrus-sasl-devel cyrus-sasl-gssapi

4.2 Hive Queries

执行hive查询语句时,applicationmaster发现如下异常:

ERROR : Job failed with java.lang.ClassNotFoundException: org.apache.avro.LogicalType

java.util.concurrent.ExecutionException: Exception thrown by job

at org.apache.spark.JavaFutureActionWrapper.getImpl(FutureAction.scala:337)

at org.apache.spark.JavaFutureActionWrapper.get(FutureAction.scala:342)

at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:382)

at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:343)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 4, bdev002.bonc.com, executor 1): java.lang.NoClassDefFoundError: org/apache/avro/LogicalType

at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:335)

at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.(AbstractRealtimeRecordReader.java:108)

at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.(RealtimeCompactedRecordReader.java:50)

at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:69)

at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.(HoodieRealtimeRecordReader.java:47)

at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:251)

at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:418)

at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)

at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:256)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.avro.LogicalType

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 23 more

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

at scala.Option.foreach(Option.scala:257)

at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Caused by: java.lang.NoClassDefFoundError: org/apache/avro/LogicalType

at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:335)

at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.(AbstractRealtimeRecordReader.java:108)

at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.(RealtimeCompactedRecordReader.java:50)

at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:69)

at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.(HoodieRealtimeRecordReader.java:47)

at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:251)

at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:418)

at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)

at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:256)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.avro.LogicalType

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 23 more

解决方式:

经排查发现我们的hadoop包里的avro-1.7.4.jar文件确实不包含相应的类。而在hudi的编译工程里发现高版本avro-1.8.2.jar里有对应的类。于是进行替换/opt/beh/core/hadoop/share/hadoop/common/lib和/opt/beh/core/hive/lib/目录下的低版本avro包。然后尝试成功。

4.3 Presto Queries

1. HoodieException

org.apache.hudi.exception.HoodieException: Error reading Hoodie partition metadata for hdfs://beh001/user/hive/warehouse/stock_ticks_cow/2018/08/31### Presto Coordinator节点开启debug编辑${PRESTO_HOME}/etc/** **log.propertiesio.prestosql=DEBUG输出日志org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=kafka, access=EXECUTE, inode="/user/hive/warehouse":hadoop:hadoop:drwx------

根本原因是kafka用户没有权限。

为什么是kafka用户?presto刷新了cache

4.4 编译测试

4.4.1 加载不到hadoop native包

java.lang.ExceptionInInitializerError

at io.prestosql.plugin.hive.HdfsEnvironment$$FastClassByGuice$$2c8553d4.newInstance()

at com.google.inject.internal.DefaultConstructionProxyFactory$FastClassProxy.newInstance(DefaultConstructionProxyFactory.java:89)

at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:114)

at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91)

at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306)

at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)

at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)

at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)

at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:42)

at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:65)

at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:113)

at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91)

at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306)

at com.google.inject.internal.FactoryProxy.get(FactoryProxy.java:62)

at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)

at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)

at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)

at com.google.inject.internal.InternalInjectorCreator.loadEagerSingletons(InternalInjectorCreator.java:211)

at com.google.inject.internal.InternalInjectorCreator.injectDynamically(InternalInjectorCreator.java:182)

at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:109)

at com.google.inject.Guice.createInjector(Guice.java:87)

at io.airlift.bootstrap.Bootstrap.initialize(Bootstrap.java:240)

at io.prestosql.plugin.hive.HiveConnectorFactory.create(HiveConnectorFactory.java:123)

at io.prestosql.connector.ConnectorManager.createConnector(ConnectorManager.java:321)

at io.prestosql.connector.ConnectorManager.addCatalogConnector(ConnectorManager.java:195)

at io.prestosql.connector.ConnectorManager.createConnection(ConnectorManager.java:187)

at io.prestosql.connector.ConnectorManager.createConnection(ConnectorManager.java:173)

at io.prestosql.metadata.StaticCatalogStore.loadCatalog(StaticCatalogStore.java:96)

at io.prestosql.metadata.StaticCatalogStore.loadCatalogs(StaticCatalogStore.java:74)

at io.prestosql.server.PrestoServer.run(PrestoServer.java:122)

at io.prestosql.server.PrestoServer.main(PrestoServer.java:68)

Caused by: java.lang.RuntimeException: failed to load Hadoop native library

at io.prestosql.hadoop.HadoopNative.requireHadoopNative(HadoopNative.java:58)

at io.prestosql.plugin.hive.HdfsEnvironment.(HdfsEnvironment.java:37)

... 31 more

Caused by: java.lang.RuntimeException: native snappy library not available: SnappyCompressor has not been loaded.

at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:72)

at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)

at io.prestosql.hadoop.HadoopNative.loadAllCodecs(HadoopNative.java:71)

at io.prestosql.hadoop.HadoopNative.requireHadoopNative(HadoopNative.java:52)

... 32 more

解决方式:

export LD_LIBRARY_PATH=/opt/beh/core/hadoop/lib/native

或者在编译时处理。

4.4.2 加载不到org.apache.hadoop.yarn.conf.HAUtil

java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/conf/HAUtil

at org.apache.hadoop.mapred.Master.getMasterAddress(Master.java:61)

at org.apache.hadoop.mapred.Master.getMasterPrincipal(Master.java:88)

at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:81)

at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:213)

at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:105)

at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:362)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)

at io.prestosql.plugin.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)

at io.prestosql.plugin.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)

at io.prestosql.plugin.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)

at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.conf.HAUtil

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at io.prestosql.server.PluginClassLoader.loadClass(PluginClassLoader.java:80)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 18 more

解决方式:将hadoop-yarn-api拷贝到${PRESTO_HOME}/plugin/hive-hadoop2

目录
相关文章
|
3月前
|
运维 Ubuntu 测试技术
自动化运维的利剑:Ansible在配置管理中的应用软件测试的艺术:探索性测试的深度与广度
【8月更文挑战第27天】 在数字化浪潮中,高效的运维工作是支撑企业IT系统稳定运行的关键。Ansible,作为一款简易而强大的自动化运维工具,正逐渐成为IT专业人士的新宠。本文将通过浅显易懂的语言和生动的案例,带你了解Ansible的核心概念、安装步骤、基础命令以及它在配置管理中的实际应用。我们的目标是让初学者能够轻松上手Ansible,同时为有经验的运维工程师提供一些实用的技巧和思路。
|
21天前
|
运维
【运维基础知识】用dos批处理批量替换文件中的某个字符串(本地单元测试通过,部分功能有待优化,欢迎指正)
该脚本用于将C盘test目录下所有以t开头的txt文件中的字符串“123”批量替换为“abc”。通过创建批处理文件并运行,可实现自动化文本替换,适合初学者学习批处理脚本的基础操作与逻辑控制。
115 56
|
3月前
|
运维 Kubernetes 监控
|
4月前
|
运维 测试技术 调度
自动化测试框架的设计与实现自动化运维的利器:Ansible Role 实践指南
【7月更文挑战第31天】随着软件开发周期的缩短和迭代速度的加快,手动软件测试已难以满足效率与质量的双重需求。本文将深入探讨如何设计并实现一个高效的自动化测试框架,以提升测试工作的效率和准确性。我们将通过具体的代码示例,展示框架的核心组件和实现逻辑,帮助读者理解自动化测试框架的构建过程及其在实际项目中的应用价值。
47 5
|
4月前
|
Java 编译器 运维
开发与运维测试问题之在JVM中方法区也被称之为什么如何解决
开发与运维测试问题之在JVM中方法区也被称之为什么如何解决
24 1
|
4月前
|
Java 开发者 运维
开发与运维测试问题之OpenJDK官方还未正式发布Compact Object Headers如何解决
开发与运维测试问题之OpenJDK官方还未正式发布Compact Object Headers如何解决
36 1
|
4月前
|
Java 大数据 测试技术
开发与运维测试问题之UseCompactObjectHeaders在SPECjbb2015基准测试中的表现如何解决
开发与运维测试问题之UseCompactObjectHeaders在SPECjbb2015基准测试中的表现如何解决
198 1
|
4月前
|
NoSQL 开发工具 数据库
开发与运维测试问题之应用启动报 Can not load this fake sdk class 的异常如何解决
开发与运维测试问题之应用启动报 Can not load this fake sdk class 的异常如何解决
|
4月前
|
测试技术 数据库 容器
开发与运维测试问题之操作数据库进行DAO层测试如何解决
开发与运维测试问题之操作数据库进行DAO层测试如何解决
|
4月前
|
测试技术 数据库 开发者
开发与运维测试问题之高代码覆盖率意味着高代码质量如何解决
开发与运维测试问题之高代码覆盖率意味着高代码质量如何解决

推荐镜像

更多