1. 专属Olap集群-Trino/Presto
1.1. 客户需求
EMR-Trino集群,连接Ack集群上的Hive-metastore、Mysql以及ODPS。
其中最为复杂的为自建hive connector的相关配置:
- 从表类型方面看包含普通hive表以及delta表
- 从存储方面看包含hdfs本地以及深圳region的oss
- 从协议上来看,客户hive读写oss数据使用的是aws s3协议
1.2. 集群选型
- Hadoop-Commen和Trino为必选
- Hive、Yarn、Spark等组件可选,根据自身业务需要
- DLF:如果集群同时配置了Hive服务组件,则hive、iceberg、hudi、delta lake连接器将会默认配置好DLF相关参数,可以直接使用。如果集群不需要配置Hive服务组件,也可以直接在创建EMR集群时勾选「TRINO与数据湖构建(DLF)自动连接」,也可以修改hive.metastore=DLF并辅以DLF相关信息来手动配置,参考文档:DLF手动配置。
- Knox:通过外网代理访问Trino UI的时候则需要部署该服务
- Kerberos:只能在创建集群的时候开启,创建后不能开启或关闭
1.3. 高可用
不支持,默认仅在master-1-1节点上部署Coordinator,所以如果作为Trino的专属集群并没有部署其他服务组件的话,可以不选择master的节点高可用来节省成本。
2. 外连引擎及基础使用
2.1. 网络问题
需保证集群之间ip+端口互通。测试样例EMR-Trino集群与自建Hive metastore或mysql等处于同VPC下,放行了交换机下所有内网IP的1/65536端口,可以通过实例白名单或者集群安全组进行操作。
2.2. Hive
- 参考文档:
- 阿里云官方文档:https://help.aliyun.com/zh/emr/emr-on-ecs/user-guide/hive-connector-1?spm=a2c4g.11186623.0.0.693d28e5RnAjZ4
- 开源文档:https://trino.io/docs/current/connector/hive.html
- hive连接器配置
- hive.properties。如果集群未混部Hive或者配置DLF的情况下,可以直接修改这个配置文件,更改 hive.metastore.uri 为目标集群的 hive metastore 的地址即可,如果不必须访问HDFS,可以不用配置hive.config.resources。
- connectorX.properties(阿里云EMR-Trino服务在控制台上预留了5个connector的配置)。如果集群存在Hive服务或者配置了DLF,可以在EMR预制的connectorX的几个配置文件中,依次配置connector.name、hive.metastore.uri和hive.config.resources。这里需要注意修改配置connector.name从memory改为hive。否则trinoserver初始化的时候插件加载的将会是io.trino.plugin.memory.MemoryConnectorFactory.create而不是io.trino.plugin.hive.InternalHiveConnectorFactory.createConnector。不同connector所校验的参数不同,Trino对于配置项校验非常严格,如果某个配置项有问题会导致整个trinoserver一直无法正常启动。
ERROR main io.trino.server.Server Configuration is invalid ERROR main io.trino.server.Server Configuration errors: 1) Error: Configuration property '<conf>' was not used io.airlift.bootstrap.ApplicationConfigurationException: Configuration errors:
2.2.1. 兼容s3协议读取oss上的delta表
2.2.1.1. delta表读取异常,Cannot query Delta Lake table
Caused by: io.trino.spi.TrinoException: Cannot query Delta Lake table 'schema.table_name' at io.trino.plugin.hive.HiveMetadata.getTableHandle(HiveMetadata.java:497) at io.trino.plugin.hive.HiveMetadata.getTableHandle(HiveMetadata.java:339) at io.trino.spi.connector.ConnectorMetadata.getTableHandle(ConnectorMetadata.java:122) at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.getTableHandle(ClassLoaderSafeConnectorMetadata.java:1074) at io.trino.tracing.TracingConnectorMetadata.getTableHandle(TracingConnectorMetadata.java:142) at io.trino.metadata.MetadataManager.lambda$getTableHandle$5(MetadataManager.java:282) at java.base/java.util.Optional.flatMap(Optional.java:289) at io.trino.metadata.MetadataManager.getTableHandle(MetadataManager.java:276) at io.trino.metadata.MetadataManager.getRedirectionAwareTableHandle(MetadataManager.java:1579) at io.trino.metadata.MetadataManager.getRedirectionAwareTableHandle(MetadataManager.java:1571) at io.trino.tracing.TracingMetadata.getRedirectionAwareTableHandle(TracingMetadata.java:1291) at io.trino.sql.analyzer.StatementAnalyzer$Visitor.getTableHandle(StatementAnalyzer.java:5444) at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitTable(StatementAnalyzer.java:2218) at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitTable(StatementAnalyzer.java:488) at io.trino.sql.tree.Table.accept(Table.java:60) at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27) at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:505)
原因:针对Iceberg、Hudi和Delta Lake,Trino分别提供了单独的连接器。建议您使用各自的独立连接器来执行查询。如果您的作业必须使用Hive连接器,请使用提供的Table Redirection功能将查询转发到相应的独立连接器上。
参考文档:
- 阿里云官方文档:https://help.aliyun.com/zh/emr/emr-on-ecs/user-guide/faq-about-trino?spm=a2c4g.11186623.0.i42#fcb9df207ch3q
- 开源文档:https://trino.io/docs/current/connector/hive.html
更改配置:
hive connecor添加配置:
hive.delta-lake-catalog-name=delta
delta.properties添加配置:
connector.name=delta-lake(默认) hive.hdfs.impersonation.enabled=false hive.metastore.uri=thrift://<自建集群地址>:9083 hive.config.resources=<core-site.xml>,<hdfs-site.xml>
2.2.1.2. 鉴权异常报错,Unable to load AWS credentials
Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName, com.amazonaws.auth.profile.ProfileCredentialsProvider@6f048d2a: profile file cannot be null, com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@10c6141b: Failed to connect to service endpoint: ]
看报错抛出的异常为aws鉴权验证异常,了解客户的具体表信息后得知,客户自建ack的hadoop集群,delta表存储在ali oss,但是使用的是aws s3的协议访问,相关信息如下图所示(desc formatted table_name)。初步判断trino集群没有相关的鉴权配置。
|Location |s3a://<oss-bucket>/path/<tablename> | | |Provider |delta | | |Owner |185 | | |External |true | | |Table Properties |[delta.minReaderVersion=1,delta.minWriterVersion=2] | |
2.2.1.2.1. 问题复现
emr集群配置aws s3兼容协议读取oss,创建测试表
- 获取aws s3的相关jar包。EMR集群上在如下路径为我们提供了当前版本对应的相关jar包,我们将该路径添加到HADOOP_CLASSPATH中。(hadoop-aws-3.2.1.jar,aws-java-sdk-bundle-1.11.375.jar)
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/tools/lib/*
- 修改hadoop-common中关于s3a的实现类,endpoint和ak
fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem fs.AbstractFileSystem.s3a.impl=org.apache.hadoop.fs.s3a.S3A fs.s3a.access.key=<aliyun_access_id> fs.s3a.secret.key=<aliyun_access_key> fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider fs.s3a.endpoint=oss-cn-hangzhou-internal.aliyuncs.com
- 使用spark创建delta表并使用s3a的兼容协议写到oss bucket的路径下
spark-sql --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \ --conf spark.hadoop.fs.s3a.endpoint=oss-cn-hangzhou-internal.aliyuncs.com \ --conf spark.hadoop.fs.s3a.access.key=<aliyun_access_id> \ --conf spark.hadoop.fs.s3a.secret.key=<aliyun_access_key> \ --conf spark.hadoop.fs.s3a.path.style.access=false \ --jars /opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/tools/lib/hadoop-aws-3.2.1.jar,/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/tools/lib/aws-java-sdk-bundle-1.11.375.jar create table s3_table4(name string) using delta location "s3a://<oss_bucket_name>/<path>/s3_table4/"; insert into s3_table4 values('zhangsan');
- trino查询指定catalog下的该表,复现该问题
2.2.1.2.2. 问题解决
参考开源文档:https://trino.io/docs/current/object-storage/legacy-s3.html
配置trino hive connector相关参数,重启trino server
hive.s3.aws-access-key=<aliyun_access_id> hive.s3.aws-secret-key=<aliyun_access_key> hive.s3.endpoint=oss-cn-hangzhou-internal.aliyuncs.com hive.s3.path-style-access=false
查询成功
2.2.1.2.3. 踩坑记录
- 尝试过在trino hive connector中配置hive.config.resources为自定义的core-site.xml,并在其中配置aws s3在hadoop fs上的实现类、鉴权类、ak以及endpoint,无效。
- 尝试过根据堆栈抛错,使用emr脚本执行的功能在所有的master和core节点上通过修改/etc/profile修改linux环境变量,export AWS_ACCESS_KEY_ID=xxx,无效。
- 具体无效原因可能需要参考堆栈报错的逻辑结合源码进行分析。
2.3. Mysql
- 参考文档:
- 阿里云官方文档:https://help.aliyun.com/zh/emr/emr-on-ecs/user-guide/mysql-connector?spm=a2c4g.11186623.0.0.ca8628e57wfWMQ
- 开源文档:https://trino.io/docs/current/connector/mysql.html
- 配置样例:
2.4. Odps
- 配置样例:
2.5. trino命令行基础使用
trino --server master-1-1:9090 # 切换catalog和schema use <catalog>.<schema>; show tables; select * from <table_name>;
- trino - hive metastore查询
- trino - mysql查询
- trino - odps查询
3. 开源LDAP权限管控
参考文档:
- 阿里云官方文档:https://help.aliyun.com/zh/emr/emr-on-ecs/user-guide/manage-ldap-authentication-20?spm=a2c4g.11186623.0.0.253b673bvi12cH
- 开源文档:https://trino.io/docs/current/security/ldap.html
3.1. 用户创建
可以在EMR集群-用户管理-添加用户来创建用户
3.2. 开启TrinoLDAP
3.3. 登录信息获取
- 所需信息
- 端口从9090变为7778,原9090端口在开启ldap后会报错如下
Error running command: Error starting query at http://master-1-1:9090/v1/statement returned an invalid response: JsonResponse{statusCode=403, headers={cache-control=[must-revalidate,no-cache,no-store], content-length=[422], content-type=[text/html;charset=iso-8859-1]}, hasValue=false} [Error: <html> <head> <meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/> <title>Error 403 Forbidden</title> </head> <body><h2>HTTP ERROR 403 Forbidden</h2> <table> <tr><th>URI:</th><td>/v1/statement</td></tr> <tr><th>STATUS:</th><td>403</td></tr> <tr><th>MESSAGE:</th><td>Forbidden</td></tr> <tr><th>SERVLET:</th><td>org.glassfish.jersey.servlet.ServletContainer-2ebd6a0d</td></tr> </table> </body> </html> ]
- hosts换为域名:master-1-1.<集群id>..emr.aliyuncs.com。本地连接则还需要在/etc/hosts中对于该域名进行ip解析的配置,仅在url里配置ip则可能会有如下的CN认证问题
Hostname 120.xx.xx.x not verified: certificate: sha256/QRPpmNxxxxxxxxxxWIlc1iUQWGn/CBLxxxxxn81WY= DN: CN=master-1-1.c-7101xxxxxc3d1.cn-hangzhou.emr.aliyuncs.com subjectAltNames: []
- keystore-path与keystrore-password查看方式:目前EMR管控上查询不到相关参数,需要登录到节点上查看(预期行为?)
- 创建用户时指定的user和password
3.4. 集群命令行登录
trino \ --server https://master-1-1.c-71015d6c35d9c3d1.cn-hangzhou.emr.aliyuncs.com:7778 \ --keystore-path /etc/taihao-apps/trino-conf/keystore \ --keystore-password <keystore_secret> \ --user syf373586 \ --password
3.5. 本地dbeaver
连接之前需要通过scp命令从master上将keystore的文件下载到执行环境,并更改SSLTrustStorePath的路径
jdbc:trino://master-1-1.c-7101xxxxxx3d1.cn-hangzhou.emr.aliyuncs.com:7778/hive/sunyf_meta_test?SSL=true&SSLTrustStorePath=/Users/adamsun/keystore&SSLTrustStorePassword=32d14510xxxxxxxxa9244e
3.6. 本地java
package org.example; import java.sql.*; import java.util.Properties; public class TrinoTest01 { public static void main(String[] args) throws SQLException { String url = "jdbc:trino://master-1-1.c-71015d6c35d9c3d1.cn-hangzhou.emr.aliyuncs.com:7778/hive/sunyf_meta_test"; Properties properties = new Properties(); properties.setProperty("user", "user"); properties.setProperty("password", ""); properties.setProperty("SSL", "true"); properties.setProperty("SSLTrustStorePath", "/Users/adamsun/keystore"); properties.setProperty("SSLTrustStorePassword", "32d14510xxxxxxxxa9244e"); Connection connection = DriverManager.getConnection(url, properties); // 创建Statement对象 Statement statement = connection.createStatement(); // 执行查询 ResultSet resultSet = statement.executeQuery("select count(*) as cnt from sunyf_test_table"); Integer index = 1; // 处理查询结果 while (resultSet.next()) { System.out.println(resultSet.getInt(1)); index++; // System.out.println(resultSet); } // 关闭资源 resultSet.close(); statement.close(); connection.close(); } }
4. kerberos权限管控
参考文档:
- 阿里云官方文档:https://help.aliyun.com/zh/emr/emr-on-ecs/user-guide/access-trino-by-using-the-cli?spm=a2c4g.11186623.0.0.37d5673bSk3BRe#db81fb8081p8k
- 开源文档:https://trino.io/docs/current/client/cli.html#kerberos-authentication
4.1. 用户创建
# 登录 kadmin.local # 添加test用户,并指定密码 addprinc test # 为test用户生成keytab,后续再生成keytab需指定-norandkey,避免重新随机导致之前的key失效 xst -norandkey -k /root/test.keytab test@EMR.<CLUSTER_ID>.COM
4.2. 集群命令行登录
trino \ --server https://master-1-1.<CLUSTER_ID>.cn-hangzhou.emr.aliyuncs.com:7778 \ --krb5-config-path /etc/krb5.conf \ --krb5-keytab-path /root/test.keytab \ --keystore-path /etc/emr/trino-conf/keystore \ --keystore-password <keystore_secret> \ --krb5-principal test@EMR.<CLUSTER_ID>.COM \ --krb5-remote-service-name trino \ --user test
4.3. 本地dbeaver
4.3.1. kdc认证超时,Receive timed out
报错堆栈:
Exception in thread "main" java.sql.SQLException: Kerberos error for [trino@master-1-1.c-13d43d0f090df583.cn-hangzhou.emr.aliyuncs.com]: Receive timed out at io.trino.jdbc.TrinoStatement.internalExecute(TrinoStatement.java:284) at io.trino.jdbc.TrinoStatement.execute(TrinoStatement.java:240) at io.trino.jdbc.TrinoStatement.executeQuery(TrinoStatement.java:77) at org.example.TrinoTest02.main(TrinoTest02.java:25) Caused by: io.trino.jdbc.$internal.client.ClientException: Kerberos error for [trino@master-1-1.c-13d43d0f090df583.cn-hangzhou.emr.aliyuncs.com]: Receive timed out at io.trino.jdbc.$internal.client.auth.kerberos.SpnegoHandler.generateToken(SpnegoHandler.java:157) at io.trino.jdbc.$internal.client.auth.kerberos.SpnegoHandler.authenticate(SpnegoHandler.java:123) at io.trino.jdbc.$internal.client.auth.kerberos.SpnegoHandler.authenticate(SpnegoHandler.java:111) at io.trino.jdbc.$internal.okhttp3.internal.http.RetryAndFollowUpInterceptor.followUpRequest(RetryAndFollowUpInterceptor.kt:223) at io.trino.jdbc.$internal.okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:108) at io.trino.jdbc.$internal.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) at io.trino.jdbc.$internal.client.auth.kerberos.SpnegoHandler.intercept(SpnegoHandler.java:98) at io.trino.jdbc.$internal.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) at io.trino.jdbc.$internal.client.OkHttpUtil.lambda$userAgent$0(OkHttpUtil.java:70) at io.trino.jdbc.$internal.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109) at io.trino.jdbc.$internal.okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201) at io.trino.jdbc.$internal.okhttp3.internal.connection.RealCall.execute(RealCall.kt:154) at io.trino.jdbc.$internal.client.JsonResponse.execute(JsonResponse.java:113) at io.trino.jdbc.$internal.client.StatementClientV1.<init>(StatementClientV1.java:119) at io.trino.jdbc.$internal.client.StatementClientFactory.newStatementClient(StatementClientFactory.java:28) at io.trino.jdbc.TrinoConnection.startQuery(TrinoConnection.java:757) at io.trino.jdbc.TrinoStatement.internalExecute(TrinoStatement.java:252) ... 3 more Caused by: javax.security.auth.login.LoginException: Receive timed out at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:812) at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755) at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) at javax.security.auth.login.LoginContext.login(LoginContext.java:587) at io.trino.jdbc.$internal.client.auth.kerberos.LoginBasedSubjectProvider.refresh(LoginBasedSubjectProvider.java:118) at io.trino.jdbc.$internal.client.auth.kerberos.SpnegoHandler.createGssCredential(SpnegoHandler.java:182) at io.trino.jdbc.$internal.client.auth.kerberos.SpnegoHandler.getGssCredential(SpnegoHandler.java:174) at io.trino.jdbc.$internal.client.auth.kerberos.SpnegoHandler.generateToken(SpnegoHandler.java:135) ... 19 more Caused by: java.net.SocketTimeoutException: Receive timed out at java.net.PlainDatagramSocketImpl.peekData(Native Method) at java.net.DatagramSocket.receive(DatagramSocket.java:787) at sun.security.krb5.internal.UDPClient.receive(NetClient.java:206) at sun.security.krb5.KdcComm$KdcCommunication.run(KdcComm.java:404) at sun.security.krb5.KdcComm$KdcCommunication.run(KdcComm.java:364) at java.security.AccessController.doPrivileged(Native Method) at sun.security.krb5.KdcComm.send(KdcComm.java:348) at sun.security.krb5.KdcComm.sendIfPossible(KdcComm.java:253) at sun.security.krb5.KdcComm.send(KdcComm.java:229) at sun.security.krb5.KdcComm.send(KdcComm.java:200) at sun.security.krb5.KrbAsReqBuilder.send(KrbAsReqBuilder.java:335) at sun.security.krb5.KrbAsReqBuilder.action(KrbAsReqBuilder.java:488) at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:780) ... 35 more Process finished with exit code 1
问题分析:
- 根据堆栈报错可以看到客户端在请求emr trino集群的kdc进行认证,获取tgt的时候抛出的异常
- 查看krb5.conf配置,kdc服务端口为88,通过主节点的kdc进程也可以看出
- 在emr安全组配置本地ip访问88端口的白名单后,报错依旧
- 仔细观察堆栈发现:at sun.security.krb5.internal.UDPClient.receive(NetClient.java:206),客户端使用的是UDPClient,通过krb5kdc的进程监听也可以发现有udp的监听,但是安全组仅放行了tcp的端口
- EMR安全组放行udp88端口后正常访问
4.3.2. 连接示例
jdbc:trino://master-1-1.c-13d43d0f090df583.cn-hangzhou.emr.aliyuncs.com:7778/connector1/sunyf_db?SSL=true&SSLTrustStorePath=/Users/adamsun/trino_cluster2_conf/keystore&SSLTrustStorePassword=<keystore_secret>&KerberosConfigPath=/Users/adamsun/trino_cluster2_conf/krb5.conf&KerberosKeytabPath=/Users/adamsun/trino_cluster2_conf/test.keytab&KerberosPrincipal=test@EMR.C-13D43D0F090DF583.COM&KerberosRemoteServiceName=trino
4.4. 本地java
package org.example; import java.sql.*; import java.util.Properties; public class TrinoTest02 { public static void main(String[] args) throws SQLException { String url = "jdbc:trino://master-1-1.c-13d43d0f090df583.cn-hangzhou.emr.aliyuncs.com:7778/connector1/sunyf_db"; Properties properties = new Properties(); properties.setProperty("user", "test"); // properties.setProperty("password", ""); properties.setProperty("SSL", "true"); properties.setProperty("SSLTrustStorePath", "/Users/adamsun/trino_cluster2_conf/keystore"); properties.setProperty("SSLTrustStorePassword", "<keystore_secret>"); properties.setProperty("KerberosConfigPath","/Users/adamsun/trino_cluster2_conf/krb5.conf"); properties.setProperty("KerberosKeytabPath","/Users/adamsun/trino_cluster2_conf/test.keytab"); properties.setProperty("KerberosPrincipal","test@EMR.C-13D43D0F090DF583.COM"); properties.setProperty("KerberosRemoteServiceName","trino"); Connection connection = DriverManager.getConnection(url, properties); // 创建Statement对象 Statement statement = connection.createStatement(); // 执行查询 ResultSet resultSet = statement.executeQuery("select count(*) as cnt from flink_window_test01"); Integer index = 1; // 处理查询结果 while (resultSet.next()) { System.out.println(resultSet.getInt(1)); index++; // System.out.println(resultSet); } // 关闭资源 resultSet.close(); statement.close(); connection.close(); } }