Apache Hudi测试、运维操作万字总结

简介: Apache Hudi测试、运维操作万字总结

1. 生产测试数据

1.1 数据源

1.1.1 Hudi官方的测试数据

https://github.com/apache/hudi/tree/master/docker/demo/data

1.1.2 利用Hive的TPC-DS数据改造

利用TPC-DS的数据改造,具体查看GitLab:

https://code.bonc.com.cn/bdcut/hive-testbench/snippets/10

部分数据如下:

[hadoop@hadoop01 ~]$ head tpc-ds.json {"customer_id":"AAAAAAAAOKKNEKBA","shop_date":"1999/09/25","sum_cost":"140.33000069856644"}{"customer_id":"AAAAAAAAPNAJLMAA","shop_date":"2002/12/26","sum_cost":"29.320000052452087"}{"customer_id":"AAAAAAAAHNPHCLBA","shop_date":"1999/07/18","sum_cost":"45.949999272823334"}{"customer_id":"AAAAAAAAANHGIPAA","shop_date":"1998/10/04","sum_cost":"21.369999915361404"}{"customer_id":"AAAAAAAAPAHKHIBA","shop_date":"2001/12/13","sum_cost":"58.009999826550484"}{"customer_id":"AAAAAAAABNJBBABA","shop_date":"1998/11/06","sum_cost":"205.01999327540398"}{"customer_id":"AAAAAAAAILDDDKBA","shop_date":"1998/02/15","sum_cost":"107.06000108271837"}{"customer_id":"AAAAAAAAHFJMPEAA","shop_date":"1999/09/29","sum_cost":"43.04000025987625"}{"customer_id":"AAAAAAAAIBOOIHAA","shop_date":"2000/10/16","sum_cost":"122.53999684005976"}{"customer_id":"AAAAAAAAGEHOPKBA","shop_date":"2001/09/13","sum_cost":"73.4099994301796"}

1.2 将测试数据导入Kafka

1.2.1 利用kafkacat工具

1.2.1.1 接收

项目地址:https://github.com/edenhill/kafkacat

Kafkacat可以认为是kafka工具中的[netcat],是通用的非JVM生产消费工具 Apache Kafka >= 0.8。

1.2.1.2 非kerberized Kafka

1. 生产数据

cat batch_1.json | kafkacat -b 172.16.13.116:9092-t stock_ticks -P

2. 查看topic

kafkacat -L -b 172.16.13.116:9092-t stock_ticks

3. 查看topic元数据

kafkacat -b 172.16.13.116:9092-L -J | jq

1.2.1.3 Kerberized Kafka

1. 创建topic

kafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic stock-ticks

2. 编辑kafkacat.conf文件,通过-F指定配置文件,通过kafkacat –X list查看可配置的参数

bootstrap.servers=bdev001.bonc.com:9092,bdev002.bonc.com:9092##可选security.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.kerberos.principal=kafka@BONC.COMsasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab

3. 生产数据

cat batch.txt | kafkacat \-F kafkacat.conf \-b hadoop02.bonc.com:9092 \-t stock-ticks \-P

或者

cat batch.txt | kafkacat \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=GSSAPI \-X sasl.kerberos.service.name=kafka \-X sasl.kerberos.principal=kafka@BONC.COM \-X sasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab \-b hadoop02.bonc.com:9092 \-t stock-ticks \-P

4. 查看topic

kafkacat -L \-b hadoop02.bonc.com:9092 \-F kafkacat.conf \-t stock-ticks

5. 查看topic元数据

kafkacat \-b hadoop02.bonc.com:9092 \-F kafkacat.conf \-L -J | jq

1.2.2 利用kafka自带的测试工具

Kafka自带的生产性能测试工具kafka-producer-perf-test.sh可以用来加载数据。

### 创建topickafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic stock-tickskafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic store-sales### 生产Hudi自带的数据kafka-producer-perf-test.sh --topic stock-ticks --throughput 1000--producer.config /opt/beh/core/kafka/config/producer.properties --print-metrics --payload-file batch_1.json --num-records 3482### 生产改造的数据kafka-producer-perf-test.sh --topic store-sales --throughput 100000--producer.config /opt/beh/core/kafka/config/producer.properties --print-metrics --payload-file store_sales.json --num-records 479456

注:

--payload-delimiter —— 指定分隔符,默认是新行\n

--throughput —— Required, 将吞吐量(msg/s)调整到大约等于该值,如果该值很大,将达不到

--num-records —— *指定生产的数据量(msgs***),这个自然要小于等于文件中的records数。

1.3 写入HDFS

1.3.1 stock_ticks

1. 编辑kafka-source.properties

# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at##   http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.###include=base.properties# Key fields, for kafka examplehoodie.datasource.write.recordkey.field=keyhoodie.datasource.write.partitionpath.field=date# schema provider configs#schema.registry.url=http://localhost:8081#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latesthoodie.deltastreamer.schemaprovider.source.schema.file=/opt/beh/core/spark/hudi/config/stock_ticks/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=file:///opt/beh/core/hudi/config/stock_ticks/schema.avsc# Kafka Source#hoodie.deltastreamer.source.kafka.topic=uber_tripshoodie.deltastreamer.source.kafka.topic=stock-ticks#Kafka propsbootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092auto.offset.reset=earliestsecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";

2. base.properties

#### Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at##   http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.###hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.embed.timeline.server=truehoodie.filesystem.view.type=EMBEDDED_KV_STOREhoodie.compact.inline=false

3. schema.avsc

{"type":"record","name":"stock_ticks","fields":[{"name": "volume","type": "long"}, {"name": "ts", "type": "string"}, {"name": "symbol", "type": "string"},{"name": "year", "type": "int"},{"name": "month", "type": "string"},{"name": "high", "type": "double"},{"name": "low", "type": "double"},{"name": "key", "type": "string"},{"name": "date", "type":"string"}, {"name": "close", "type": "double"}, {"name": "open", "type": "double"}, {"name": "day", "type":"string"}]}

4. 写出HDFS-Copy On Write类型

### local 模式spark-submit \--master local[2] \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--target-table stock_ticks_cow \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider### Spark On Yarn模式spark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--target-table stock_ticks_cow \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider

5. 写出HDFS——Merge On Read类型

### Local模式spark-submit \--master local[2] \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--target-table stock_ticks_mor \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--disable-compaction### Spark On Yarn模式spark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--target-table stock_ticks_mor \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--disable-compaction

1.3.2 store_sales

1. 编辑kafka-source.properties

include=base.properties# Key fields, for kafka examplehoodie.datasource.write.recordkey.field= customer_idhoodie.datasource.write.partitionpath.field= shop_date# schema provider configs#schema.registry.url=http://localhost:8081#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latesthoodie.deltastreamer.schemaprovider.source.schema.file=/opt/beh/core/spark/hudi/config/stock_ticks/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=file:///opt/beh/core/hudi/config/stock_ticks/schema.avsc# Kafka Source#hoodie.deltastreamer.source.kafka.topic=uber_tripshoodie.deltastreamer.source.kafka.topic= store-sales-1#Kafka propsbootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092auto.offset.reset=earliestsecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";

2. Base.properties

hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.embed.timeline.server=truehoodie.filesystem.view.type=EMBEDDED_KV_STOREhoodie.compact.inline=false

3. schema.avsc

{"type":"record","name":"store_sales","fields":[{"name": "customer_id","type": "string"}, {"name": "shop_date", "type": "string"}, {"name": "sum_cost", "type": "double"}]}

1.4 将hudi数据同步到Hive metastore

1. 前提搭建好hive环境

2. 从build的hoodie的hudi-hive模块获取run_sync_tool.sh文件

${Hoodie-Build}/hudi-hive/run_sync_tool.sh

3. 调整run_sync_tool.sh

[hadoop@hadoop02 tools]$ git diff run_sync_tool.sh.template run_sync_tool.shdiff --git a/run_sync_tool.sh.template b/run_sync_tool.shindex 42d2b9a..66c8180100755--- a/run_sync_tool.sh.template+++ b/run_sync_tool.sh@@ -47,9+47,11@@ if[ -z "${HIVE_JDBC}"]; then  HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | grep -v handler | tr '\n' ':'`fi HIVE_JACKSON=`ls ${HIVE_HOME}/lib/jackson-*.jar | tr '\n' ':'`-HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON+# HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON+HIVE_JARS=`ls ${HIVE_HOME}/lib/*.jar | tr '\n' ':'`-HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/+# HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/li+HADOOP_HIVE_JARS=${HIVE_JARS}:${HIVE_HOME}/conf:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/* echo "Running Command : java -cp ${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:$HUDI_HIVE_UBER_JAR org.apache.hudi.hive.HiveSyncTool $@" java -cp $HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR} org.apache.hudi.hive.HiveSyncTool"$@"

4. 将hudi-utilities-*.jar放在${HIVE_HOME}/lib目录下,并重启Hive服务

5. 初始化票据,并创建hudi_stock数据库

$ kinit hadoop$ beeline –u “jdbc:hive2://hadoop01.bonc.com:10000/;principal=hs2/hadoop01.bonc.com@BONC.COM”

6. 同步1.3节写入HDFS的stock_ticks_cow

./run_sync_tool.sh \--jdbc-url "jdbc:hive2://hadoop02.bonc.com:10000/;principal=hs2/hadoop02.bonc.com@BONC.COM" \--user hadoop \--pass hadoop \--partitioned-by dt \--base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--database hudi_stock \--table stock_ticks_cow

注:

1、执行需要kinit缓存票据

2、即使使用缓存—user –pass也必须指定,脚本定义

7. 同步1.3节写入HDFS的stock_ticks_mor

./run_sync_tool.sh \--jdbc-url "jdbc:hive2://hadoop02.bonc.com:10000/;principal=hs2/hadoop02.bonc.com@BONC.COM" \--user hadoop \--pass hadoop \--partitioned-by dt \--base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--database hudi_stock \--table stock_ticks_mor

8. 查看同步结果

0: jdbc:hive2://hadoop02.bonc.com:10000/> show tables from hudi_stock;+---------------------+|   tab_name    |+---------------------+| stock_ticks_cow   || stock_ticks_mor_ro || stock_ticks_mor_rt |+---------------------+

说明:

表stock_ticks_cow由步骤6产生,支持基于HoodieParquetInputFormat的snapshot query和incremental query;

表stock_ticks_mor_rt和stock_ticks_mor_ro由步骤7产生;

其中表stock_ticks_mor_rt支持基于HoodieParquetRealtimeInputFormat的snapshot query和incremental query;表stock_ticks_mor_ro支持基于HoodieParquetInputFormat的read optimized query。

1.5 Query

1.5.1 Run Hive Queries

前提条件:需要处理的问题如下

1. 调整hive-site.xml的配置,并重启hive服务

### 修改配置,默认值是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat<property><name>hive.input.format</name><value>org.apache.hadoop.hive.ql.io.HiveInputFormat</value></property>

2. 指定参数set hive.fetch.task.conversion=none;

3. 将hudi-utilities-*.jar拷贝到${HIVE_HOME}/lib目录下

4. 另外将hudi-hadoop-mr-*.jar分发到${HADOOP_HOME}/share/hadoop/common/及${HIVE_HOME}/lib/目录下。

beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=noneshow tables;show partitions stock_ticks_mor_rt;### Copy_On_Write Queriesselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG'; select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read Queries# Read Optimized Query,查询最新的时间戳select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';# Snapshot Query 查询最新的时间戳select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';#select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';

注:实际上以上查询主要是针对各种类型(三种)的表执行两种查询

第一种:查看最新的时间戳

第二种:查询当前数据的部分投影,及部分数据

1.5.2 Run Spark-SQL Queries

运行Spark-SQL查询时,将hudi-spark-*.jar包拷贝至${SPARK_HOME}/jars目录下,同时间接依赖hive query中介绍的hudi-hadoop-mr-*.jar(放置到集群中hadoop/hive安装环境下)。

hudi-hadoop-mr-*.jar和RecordReader相关

./bin/spark-sql \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jarSql:show databases;use hudi_stock;select symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
./bin/spark-shell \--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \--driver-class-path /opt/beh/core/hive/conf \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--deploy-mode client \--driver-memory 1G \--executor-memory 3G \--num-executors 1Sql:spark.sql("show databases").show(100, false)spark.sql("show tables from hudi_stock").show(100, false)spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_cow where symbol = 'GOOG'").show(100, false)spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false):quit

1.5.3 Run Presto Queries(降低Presto版本成功)

1. 将hudi-presto-*.jar分发到${PRESTO_HOME}/plugin/hive-hadoop2/目录下,并重启Presto。

2. 调整${PRESTO_HOME}/etc/catalog/hive.properties,添加yarn-site的配置文件

hive.config.resources=/opt/beh/core/hadoop/etc/hadoop/core-site.xml,/opt/beh/core/hadoop/etc/hadoop/hdfs-site.xml,/opt/beh/core/hadoop/etc/hadoop/yarn-site.xml
./bin/presto \--server https://hadoop01.bonc.com:7778 \--krb5-config-path /etc/krb5.conf \--krb5-principal hadoop@BONC.COM \--krb5-keytab-path /opt/beh/metadata/key/presto.keytab \--krb5-remote-service-name presto \--keystore-path /opt/beh/metadata/key/hadoop.keystore \--keystore-password hadoop \--catalog hive \--schema hudi_stock### Copy_On_Writeselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select"_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read_ROselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select"_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';

注:Presto不支持增量查询,只支持HoodieParquetInputFormat格式的查询即Copy_On_Write上的snapshot query以及Merge_On_Read_RO上的read optimized query。

异常终止:版本不一致导致

2020-05-28T18:00:07.263+0800 WARN hive-hive-0 io.prestosql.plugin.hive.util.ResumableTasks ResumableTask completed exceptionally

java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/client/util/YarnClientUtils

at org.apache.hadoop.mapred.Master.getMasterPrincipal(Master.java:58)

at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:81)

at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:216)

at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:105)

at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:362)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)

at io.prestosql.plugin.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)

at io.prestosql.plugin.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)

at io.prestosql.plugin.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)

at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.client.util.YarnClientUtils

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at io.prestosql.server.PluginClassLoader.loadClass(PluginClassLoader.java:80)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 17 more

1.6 写入第二批数据

1.6.1 Hoodie自带的数据

1. 利用kafkacat导入Kafka

cat /opt/beh/core/hudi/data/batch_2.json | kafkacat \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=GSSAPI \-X sasl.kerberos.service.name=kafka \-X sasl.kerberos.principal=kafka@BONC.COM \-X sasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab \-b hadoop02.bonc.com:9092 \-t stock-ticks \-P

2. 写入HDFS

### Copy_On_Writespark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--target-table stock_ticks_cow \--props file:///opt/beh/core/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider### Merge_On_Readspark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--target-table stock_ticks_mor \--props file:///opt/beh/core/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--disable-compaction

3. 查看导入结果

### Copy_On_Read[hadoop@hadoop02 tools]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_cow/2018/08/31Found3 items-rw-------  3 hadoop hadoop     932020-05-3018:40/user/hive/warehouse/stock_ticks_cow/2018/08/31/.hoodie_partition_metadata-rw-------  3 hadoop hadoop   4437892020-05-3018:40/user/hive/warehouse/stock_ticks_cow/2018/08/31/da906d57-1987-4439-8f18-14f14e380705-0_0-21-21_20200530184035.parquet-rw-------  3 hadoop hadoop   4435182020-06-0109:44/user/hive/warehouse/stock_ticks_cow/2018/08/31/da906d57-1987-4439-8f18-14f14e380705-0_0-21-24_20200601094404.parquet### Merge_On_Read[hadoop@hadoop02 tools]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_mor/2018/08/31Found3 items-rw-------  3 hadoop hadoop   216232020-06-0109:43/user/hive/warehouse/stock_ticks_mor/2018/08/31/.d972178a-2139-48cb-adea-909a1735266d-0_20200530184111.log.1_0-21-24-rw-------  3 hadoop hadoop     932020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/.hoodie_partition_metadata-rw-------  3 hadoop hadoop   4437772020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-21-21_20200530184111.parquet

注:可以看到Copy_On_Read表,直接写出到Columnar文件中

Merge_On_Read表,增量写入log文件中

1.6.2 重新同步到hive metastore

由于本次写入的数据并没有新建分区,因此不需要重新同步。

1.7 Query

1.7.1 Run Hive Queries

beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=none### Copy_On_Writeselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read# Read Optimized Queryselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';# Snapshot Queryselect symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';

注:具体的查询结果不在上面展示,可以查看官网Docker Demo章节http://hudi.apache.org/docs/docker_demo.html

主要演示的是Copy_On_Read和Merge_On_Read的底层写入机制。

1.7.2 Run Spark-SQL Queries

./bin/spark-sql \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar ### 切换数据库show databases;use hudi_stock;### Copy_On_Writeselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read # Read Optimized Queryselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';# Snapshot Queryselect symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';

1.7.3 Run Presto Queries

重新编译prestosql.presto-302,编译测试问题见4.4记录。

将hudi-presto-bundle-0.5.2-incubating.jar拷贝至${PRESTO_HOME}/plugin/hive-hadoop2目录下,重启Presto。

1.8 增量查询Copy_On_Write

1. Hive Query

beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=noneselect`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';+----------------------+---------+----------------------+---------+------------+-----------+| _hoodie_commit_time | symbol |     ts     | volume |  open  |  close  |+----------------------+---------+----------------------+---------+------------+-----------+| 20200530184035| GOOG  | 2018-08-3109:59:00| 6330| 1230.5| 1230.02|| 20200601094404| GOOG  | 2018-08-3110:59:00| 9021| 1227.1993| 1227.215|+----------------------+---------+----------------------+---------+------------+-----------+select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530185035';

注:指定的时间戳是居于两次提交时间的中间。

2. Spark-SQL

./bin/spark-sql \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jarselect`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530185035';

注:指定的时间戳是居于两次提交时间的中间。

3. Spark-shell

./bin/spark-shell \--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \--driver-class-path /opt/beh/core/hive/conf \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--deploy-mode client \--driver-memory 1G \--executor-memory 3G \--num-executors 1import org.apache.hudi.DataSourceReadOptionsval hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200530185035").load("/user/hive/warehouse/stock_ticks_cow")hoodieIncViewDF.registerTempTable("stock_ticks_cow_incr_tmp1")spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_incr_tmp1 where symbol = 'GOOG'").show(100, false);

1.9 在Merge_On_Read表上调度和运行compaction

1. 编译hudi,拷贝至管理节点

2. 进入hudi-cli模块

[hadoop@hadoop02 hudi]$ pwd/home/hadoop/hudi[hadoop@hadoop02 hudi]$ tree -L 2 hudi-clihudi-cli├── conf│  └── **hudi-env.sh**├── hoodie-cmd.log├── **hudi-cli.sh**├── pom.xml├── src│  └── main└── target├── checkstyle-cachefile├── checkstyle-checker.xml├── checkstyle-result.xml├── checkstyle-suppressions.xml├── classes├── classes.-502447588.timestamp├── generated-sources├── hudi-cli-0.5.2-incubating.jar├── hudi-cli-0.5.2-incubating-sources.jar├── **lib**├── maven-archiver├── maven-shared-archive-resources├── maven-status├── rat.txt└── test-classes

3. 编辑hudi-cli文件

### echo `hadoop classpath`**/opt/beh/core/hadoop/etc/hadoop:/opt/beh/core/hadoop/share/hadoop/common/lib/*:/opt/beh/core/hadoop/share/hadoop/common/*:/opt/beh/core/hadoop/share/hadoop/hdfs:/opt/beh/core/hadoop/share/hadoop/hdfs/lib/*:/opt/beh/core/hadoop/share/hadoop/hdfs/*:/opt/beh/core/hadoop/share/hadoop/yarn/lib/*:/opt/beh/core/hadoop/share/hadoop/yarn/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/lib/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/*:/opt/beh/core/hadoop/contrib/capacity-scheduler/*.jar### 编辑hudi-cli.sh文件,添加HADOOP依赖diff --git a/tmp/hudi-cli.sh b/hudi-cli/hudi-cli.shindex b6e708c..3086203100755--- a/tmp/hudi-cli.sh+++ b/hudi-cli/hudi-cli.sh@@ -25,4+25,4@@ if[ -z "$CLIENT_JAR"]; then  echo "Client jar location not set, please set it in conf/hudi-env.sh"fi-java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@+java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:/opt/beh/core/hadoop/etc/hadoop:/opt/beh/core/hadoop/share/hadoop/common/lib/*:/opt/beh/core/hadoop/share/hadoop/common/*:/opt/beh/core/hadoop/share/hadoop/hdfs:/opt/beh/core/hadoop/share/hadoop/hdfs/lib/*:/opt/beh/core/hadoop/share/hadoop/hdfs/*:/opt/beh/core/hadoop/share/hadoop/yarn/lib/*:/opt/beh/core/hadoop/share/hadoop/yarn/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/lib/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@

4. 将写入kafka时定义的schema.avsc,上传至hdfs:////var/demo/config

[hadoop@hadoop02 hudi]$ hdfs dfs -ls /var/demo/config/schema.avsc-rw-------  3 hadoop hadoop    14642020-06-0114:58/var/demo/config/schema.avsc

5. 执行hudi-cli.sh

[hadoop@hadoop02 hudi-cli]$ ./hudi-cli.shhudi->connect --path /user/hive/warehouse/stock_ticks_morhudi:stock_ticks_mor->compactions show all![](./1.png)hudi:stock_ticks_mor->compaction scheduleCompaction successfully completed for***20200601144520\***hudi:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_morhudi:stock_ticks_mor->compactions show allhudi:stock_ticks_mor->compaction run --compactionInstant 20200601144520--parallelism 2--sparkMemory 1G--schemaFilePath /var/demo/config/schema.avsc --retry1Compaction successfully completed for20200601144520hudi:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_morhudi:stock_ticks_mor->compactions show all

6. 查看hudi数据

[hadoop@hadoop02 hudi]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_mor/2018/08/31Found4 items-rw-------  3 hadoop hadoop   216232020-06-0109:43/user/hive/warehouse/stock_ticks_mor/2018/08/31/.d972178a-2139-48cb-adea-909a1735266d-0_20200530184111.log.1_0-21-24-rw-------  3 hadoop hadoop     932020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/.hoodie_partition_metadata-rw-------  3 hadoop hadoop   4434792020-06-0114:59/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-0-0_20200601144520.parquet-rw-------  3 hadoop hadoop   4437772020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-21-21_20200530184111.parquet

1.10 Hive Queries ON Merge_On_Read

beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=none# Read Optimized Queryselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';# Snapshot Queryselect symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';# Incremental Query**select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530284111';

注:增量查询指定的提交时间,是合并提交时间戳。官方是2018年,但是截至文档书写时间是2020.

1.11 Spark-SQL ON Merge_On_Read

./bin/spark-shell \--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \--driver-class-path /opt/beh/core/hive/conf \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--deploy-mode client \--driver-memory 1G \--executor-memory 3G \--num-executors 1# Read Optimized Queryspark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)# Snapshot Queryspark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)# Incremental Queryspark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '20200530284111'").show(100, false)

2. 管理

在测试过程中,存在两处管理操作

一处是使用run_sync_tool.sh脚本同步到hive metastore一处是Merge_On_Read表通过hudi-cli.sh执行compaction操作

此两处均依赖hudi打包后生成的jars。可能需要规划hudi的目录。

3. 自定义数据压测

注:自定义数据存在的问题

首先,全是新增数据,并不含有更新数据。测试使用的是默认操作upsert,还支持insert,bulk_insert。

其次,数据并非是随着时间逐步更新的,数据在第一批次,基本上所有的分区就已经落地。

3.1 数据准备—落入kafka

1. 自定义数据准备参考1.1.2节

2. 数据落kafka及配置参考1.2节

3.2 从Kafka写入HDFS

3.2.1 相关配置

1. base.properties

hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.embed.timeline.server=truehoodie.filesystem.view.type=EMBEDDED_KV_STOREhoodie.compact.inline=false

2. schema.avsc

{"type":"record","name":"store_sales","fields":[{"name": "customer_id","type": "string"}, {"name": "shop_date", "type": "string"}, {"name": "sum_cost", "type": "double"}]}

3. kafka-source.properties

include=hdfs://beh001/hudi/store2/base.properties# Key fields, for kafka examplehoodie.datasource.write.recordkey.field=customer_idhoodie.datasource.write.partitionpath.field=shop_date# schema provider configs#schema.registry.url=http://localhost:8081#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latesthoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://beh001/hudi/store2/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://beh001/hudi/store2/schema.avsc# Kafka Source#hoodie.deltastreamer.source.kafka.topic=uber_tripshoodie.deltastreamer.source.kafka.topic=store-2#Kafka propsbootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092auto.offset.reset=earliestsecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";

注:配置文件上传至hdfs,因此配置文件中文件引用路径皆以hdfs://开头

3.2.2 数据持续写入COW表

nohup spark-submit \--master yarn \--num-executors 12 \--executor-memory 18G \--executor-cores 6 \--deploy-mode cluster \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--continuous \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field shop_date \--target-base-path hdfs://beh001/user/hive/warehouse/store_sales_cow \--target-table store_sales_cow \--props hdfs://beh001/hudi/store_sales/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider&

压测结果:

CommitTime Total Bytes Written Total Files Added Total Files Updated Total Partitions Written Total Records Written Total Update Records Written Total Errors
20200623114826 9.5 GB 0 1823 1823 555059000 0 0
20200623114342 9.4 GB 0 1823 1823 551850199 0 0
20200623113915 9.4 GB 0 1823 1823 546850199 0 0
20200623113438 9.3 GB 0 1823 1823 541850199 0 0
20200623113009 9.2 GB 0 1823 1823 536850199 0 0
20200623112534 9.1 GB 0 1823 1823 531850199 0 0
20200623112104 9.0 GB 0 1823 1823 526850199 0 0
20200623111641 9.0 GB 0 1823 1823 521850199 0 0
20200623111205 8.9 GB 0 1823 1823 516850199 0 0
20200623110736 8.8 GB 0 1823 1823 511850199 0 0
20200623110320 8.7 GB 0 1823 1823 506850200 0 0
20200623105855 8.7 GB 0 1823 1823 501850200 0 0
20200623105435 8.6 GB 0 1823 1823 496850200 0 0
20200623105000 8.5 GB 0 1823 1823 491850200 0 0
20200623104543 8.4 GB 0 1823 1823 486850200 0 0
20200623104120 8.3 GB 0 1823 1823 481850200 0 0
20200623103705 8.3 GB 0 1823 1823 476850200 0 0
20200623103305 8.2 GB 0 1823 1823 471850200 0 0
20200623102848 8.1 GB 0 1823 1823 466850200 0 0
20200623102440 8.0 GB 0 1823 1823 461850200 0 0
20200623102030 7.9 GB 0 1823 1823 456850200 0 0
20200623101628 7.9 GB 0 1823 1823 451850200 0 0
20200623101229 7.8 GB 0 1823 1823 446850200 0 0

3.2.3 数据持续写入MOR表

nohup spark-submit \--master yarn \--num-executors 12 \--executor-memory 18G \--executor-cores 6 \--deploy-mode cluster \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--continuous \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field shop_date \--target-base-path hdfs://beh001/user/hive/warehouse/store_sales_mor \--target-table store_sales_mor \--props hdfs://beh001/hudi/store_sales/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider&

压测结果如下:

CommitTime Total Bytes Written Total Files Added Total Files Updated Total Partitions Written Total Records Written Total Update Records Written Total Errors
20200623165326 3.6 GB 0 1823 1823 179999235 0 0
20200623165027 3.5 GB 0 1823 1823 174999235 0 0
20200623164729 3.4 GB 0 1823 1823 169999235 0 0
20200623164431 3.3 GB 0 1823 1823 164999235 0 0
20200623164141 3.3 GB 0 1823 1823 159999235 0 0
20200623163838 3.2 GB 0 1823 1823 154999235 0 0
20200623163550 3.1 GB 0 1823 1823 149999235 0 0
20200623163254 3.0 GB 0 1823 1823 144999235 0 0
20200623163017 3.0 GB 0 1823 1823 139999235 0 0
20200623162735 2.9 GB 0 1823 1823 134999235 0 0
20200623162459 2.8 GB 0 1823 1823 129999235 0 0
20200623162223 2.7 GB 0 1823 1823 124999235 0 0
20200623161945 2.6 GB 0 1823 1823 119999235 0 0
20200623161707 2.6 GB 0 1823 1823 114999235 0 0
20200623161441 2.5 GB 0 1823 1823 109999235 0 0
20200623161211 2.4 GB 0 1823 1823 104999235 0 0
20200623160943 2.3 GB 0 1823 1823 99999235 0 0
20200623160700 2.2 GB 0 1823 1823 94999235 0 0
20200623160440 2.2 GB 0 1823 1823 89999235 0 0
20200623160225 2.1 GB 0 1823 1823 84999235 0 0
20200623160002 2.0 GB 0 1823 1823 79999235 0 0
20200623155741 1.9 GB 0 1823 1823 74999235 0 0
20200623155527 1.8 GB 0 1823 1823 69999235 0 0

3.3 疑点

3.3.1 测试数据

自定义的数据全部是新增操作,不含有更新的情况。

3.3.2 写MOR时,没有delta log

在写MOR表时,自定义数据中没有更新数据的情况,并没有产生delta log。调整自定义数据增加部分更新操作,再次写MOR表时,确认是生成delta log。

4 问题记录

4.1 Kafkacat工具使用

[hadoop@bdev001 beh]$ cat batch_1.json | kafkacat -F kafkacat.conf -t zjh -P             % Reading configuration from file kafkacat.conf%2|1591338623.769|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev001.bonc.com:9092/bootstrap]: sasl_plaintext://bdev001.bonc.com:9092/bootstrap: No worthy mechs found% ERROR: Local: Authentication failure: sasl_plaintext://bdev001.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)%2|1591338624.768|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev003.bonc.com:9092/bootstrap]: sasl_plaintext://bdev003.bonc.com:9092/bootstrap: No worthy mechs found% ERROR: Local: Authentication failure: sasl_plaintext://bdev003.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)%2|1591338625.768|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev002.bonc.com:9092/bootstrap]: sasl_plaintext://bdev002.bonc.com:9092/bootstrap: No worthy mechs found% ERROR: Local: Authentication failure: sasl_plaintext://bdev002.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)% ERROR: Local: All broker connections are down: 3/3 brokers are down : terminating

解决方式:

yum install cyrus-sasl-plain cyrus-sasl-devel cyrus-sasl-gssapi

4.2 Hive Queries

执行hive查询语句时,applicationmaster发现如下异常:

ERROR : Job failed with java.lang.ClassNotFoundException: org.apache.avro.LogicalType

java.util.concurrent.ExecutionException: Exception thrown by job

at org.apache.spark.JavaFutureActionWrapper.getImpl(FutureAction.scala:337)

at org.apache.spark.JavaFutureActionWrapper.get(FutureAction.scala:342)

at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:382)

at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:343)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 4, bdev002.bonc.com, executor 1): java.lang.NoClassDefFoundError: org/apache/avro/LogicalType

at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:335)

at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.(AbstractRealtimeRecordReader.java:108)

at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.(RealtimeCompactedRecordReader.java:50)

at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:69)

at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.(HoodieRealtimeRecordReader.java:47)

at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:251)

at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:418)

at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)

at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:256)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.avro.LogicalType

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 23 more

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

at scala.Option.foreach(Option.scala:257)

at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Caused by: java.lang.NoClassDefFoundError: org/apache/avro/LogicalType

at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:335)

at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.(AbstractRealtimeRecordReader.java:108)

at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.(RealtimeCompactedRecordReader.java:50)

at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:69)

at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.(HoodieRealtimeRecordReader.java:47)

at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:251)

at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:418)

at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)

at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:256)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.avro.LogicalType

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 23 more

解决方式:

经排查发现我们的hadoop包里的avro-1.7.4.jar文件确实不包含相应的类。而在hudi的编译工程里发现高版本avro-1.8.2.jar里有对应的类。于是进行替换/opt/beh/core/hadoop/share/hadoop/common/lib和/opt/beh/core/hive/lib/目录下的低版本avro包。然后尝试成功。

4.3 Presto Queries

1. HoodieException

org.apache.hudi.exception.HoodieException: Error reading Hoodie partition metadata for hdfs://beh001/user/hive/warehouse/stock_ticks_cow/2018/08/31### Presto Coordinator节点开启debug编辑${PRESTO_HOME}/etc/** **log.propertiesio.prestosql=DEBUG输出日志org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=kafka, access=EXECUTE, inode="/user/hive/warehouse":hadoop:hadoop:drwx------

根本原因是kafka用户没有权限。

为什么是kafka用户?presto刷新了cache

4.4 编译测试

4.4.1 加载不到hadoop native包

java.lang.ExceptionInInitializerError

at io.prestosql.plugin.hive.HdfsEnvironment$$FastClassByGuice$$2c8553d4.newInstance()

at com.google.inject.internal.DefaultConstructionProxyFactory$FastClassProxy.newInstance(DefaultConstructionProxyFactory.java:89)

at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:114)

at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91)

at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306)

at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)

at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)

at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)

at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:42)

at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:65)

at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:113)

at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91)

at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306)

at com.google.inject.internal.FactoryProxy.get(FactoryProxy.java:62)

at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)

at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)

at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)

at com.google.inject.internal.InternalInjectorCreator.loadEagerSingletons(InternalInjectorCreator.java:211)

at com.google.inject.internal.InternalInjectorCreator.injectDynamically(InternalInjectorCreator.java:182)

at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:109)

at com.google.inject.Guice.createInjector(Guice.java:87)

at io.airlift.bootstrap.Bootstrap.initialize(Bootstrap.java:240)

at io.prestosql.plugin.hive.HiveConnectorFactory.create(HiveConnectorFactory.java:123)

at io.prestosql.connector.ConnectorManager.createConnector(ConnectorManager.java:321)

at io.prestosql.connector.ConnectorManager.addCatalogConnector(ConnectorManager.java:195)

at io.prestosql.connector.ConnectorManager.createConnection(ConnectorManager.java:187)

at io.prestosql.connector.ConnectorManager.createConnection(ConnectorManager.java:173)

at io.prestosql.metadata.StaticCatalogStore.loadCatalog(StaticCatalogStore.java:96)

at io.prestosql.metadata.StaticCatalogStore.loadCatalogs(StaticCatalogStore.java:74)

at io.prestosql.server.PrestoServer.run(PrestoServer.java:122)

at io.prestosql.server.PrestoServer.main(PrestoServer.java:68)

Caused by: java.lang.RuntimeException: failed to load Hadoop native library

at io.prestosql.hadoop.HadoopNative.requireHadoopNative(HadoopNative.java:58)

at io.prestosql.plugin.hive.HdfsEnvironment.(HdfsEnvironment.java:37)

... 31 more

Caused by: java.lang.RuntimeException: native snappy library not available: SnappyCompressor has not been loaded.

at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:72)

at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)

at io.prestosql.hadoop.HadoopNative.loadAllCodecs(HadoopNative.java:71)

at io.prestosql.hadoop.HadoopNative.requireHadoopNative(HadoopNative.java:52)

... 32 more

解决方式:

export LD_LIBRARY_PATH=/opt/beh/core/hadoop/lib/native

或者在编译时处理。

4.4.2 加载不到org.apache.hadoop.yarn.conf.HAUtil

java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/conf/HAUtil

at org.apache.hadoop.mapred.Master.getMasterAddress(Master.java:61)

at org.apache.hadoop.mapred.Master.getMasterPrincipal(Master.java:88)

at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:81)

at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:213)

at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:105)

at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:362)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)

at io.prestosql.plugin.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)

at io.prestosql.plugin.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)

at io.prestosql.plugin.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)

at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.conf.HAUtil

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at io.prestosql.server.PluginClassLoader.loadClass(PluginClassLoader.java:80)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 18 more

解决方式:将hadoop-yarn-api拷贝到${PRESTO_HOME}/plugin/hive-hadoop2

目录
相关文章
|
11天前
|
存储 Apache
Apache Hudi Savepoint实现分析
Apache Hudi Savepoint实现分析
46 0
|
11天前
|
存储 SQL Apache
Apache Hudi与Delta Lake对比
Apache Hudi与Delta Lake对比
43 0
|
11天前
|
Apache
Apache Hudi Rollback实现分析
Apache Hudi Rollback实现分析
28 0
|
3天前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错之Apache Flink中的SplitFetcher线程在读取数据时遇到了未预期的情况,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6天前
|
测试技术 Apache Windows
如何使用apache的ab压力测试小工具传参数
该内容是关于在Windows环境下使用PHPStudy中的Apache集成的ab工具进行性能测试的简要教程。
25 9
|
8天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
71 5
|
11天前
|
数据采集 DataWorks 关系型数据库
DataWorks操作报错合集之在DataWorks运行任务时出现链接超时,但在测试连通性时显示正常连通是什么原因导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
31 0
|
11天前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
357 3
|
11天前
|
SQL 分布式计算 测试技术
hudi性能测试
hudi性能测试
56 0
|
11天前
|
存储 SQL 分布式计算
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
使用Amazon EMR和Apache Hudi在S3上插入,更新,删除数据
150 0

推荐镜像

更多