我在AWS端有一个Athena数据库。我想访问它并进行一些查询。这是我的.java类
客户建设者
public class athenaCodeFactory {
private final AthenaClientBuilder builder = AthenaClient.builder()
.region(Region.US_WEST_2).credentialsProvider(EnvironmentVariableCredentialsProvider.create());
public AthenaClient createClient() {
return builder.build();
}
}
叫做java类
public class athenaReportsClient {
private static String solutionId=null;
private static String bucketName=null;
private static String tenantId=null;
private static String ATHENA_OUTPUT_BUCKET=null;
private static String ATHENA_DEFAULT_DATABASE=null;
private static String ATHENA_SAMPLE_QUERY=null;
private static String tenantDb=null;
public static final long SLEEP_AMOUNT_IN_MS = 1000;
public athenaReportsClient(String solutionId, String tenantId,String bucketName){
this.solutionId=solutionId;
this.tenantId=tenantId;
this.bucketName=bucketName;
this.ATHENA_OUTPUT_BUCKET="s3://"+bucketName+"/"+solutionId;
System.out.println("bucketloc"+ATHENA_OUTPUT_BUCKET);
this.tenantDb="xyz";
System.out.println("tenanentDB:"+tenantDb);
}
public static final int CLIENT_EXECUTION_TIMEOUT = 100000;
/**
* AthenaClientClientBuilder to build Athena with the following properties:
* - Set the region of the client
* - Use the instance profile from the EC2 instance as the credentials provider
* - Configure the client to increase the execution timeout.
*/
public void executeQuery(String tableType) throws InterruptedException {
// Build an AthenaClient client
athenaCodeFactory factory = new athenaCodeFactory ();
AthenaClient athenaClient = factory.createClient();
String queryExecutionId = submitAthenaQuery(athenaClient,tableType);
waitForQueryToComplete(athenaClient, queryExecutionId);
processResultRows(athenaClient, queryExecutionId);
}
/**
* @tableType: "events", "reports"
*/
private static void getDBName(String tableType){
switch(tableType){
case "events": ATHENA_DEFAULT_DATABASE=tenantDb;
ATHENA_SAMPLE_QUERY="select * from "+tenantDb+".all_"+solutionId+";";
break;
case "reports": ATHENA_DEFAULT_DATABASE=solutionId;
ATHENA_SAMPLE_QUERY="select * from "+solutionId+".reports"+";";
break;
case "default": new RuntimeException("No such table type"+tableType);
}
}
/**
* Submits a sample query to Athena and returns the execution ID of the query.
*/
private static String submitAthenaQuery(AthenaClient athenaClient,String tableType) {
// The QueryExecutionContext allows us to set the Database.
getDBName(tableType);
System.out.println("Athena Data base:"+ATHENA_DEFAULT_DATABASE);
System.out.println("Athena Query:"+ATHENA_SAMPLE_QUERY);
System.out.println("Athena output bucket:"+ATHENA_OUTPUT_BUCKET);
QueryExecutionContext queryExecutionContext = QueryExecutionContext.builder()
.database(ATHENA_DEFAULT_DATABASE).build();
// The result configuration specifies where the results of the query should go in S3 and encryption options
ResultConfiguration resultConfiguration = ResultConfiguration.builder()
// You can provide encryption options for the output that is written.
// .withEncryptionConfiguration(encryptionConfiguration)
.outputLocation(ATHENA_OUTPUT_BUCKET).build();
// Create the StartQueryExecutionRequest to send to Athena which will start the query.
StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder()
.queryString(ATHENA_SAMPLE_QUERY)
.queryExecutionContext(queryExecutionContext)
.resultConfiguration(resultConfiguration).build();
StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest);
return startQueryExecutionResponse.queryExecutionId();
}
/**
* Wait for an Athena query to complete, fail or to be cancelled. This is done by polling Athena over an
* interval of time. If a query fails or is cancelled, then it will throw an exception.
*/
private static void waitForQueryToComplete(AthenaClient athenaClient, String queryExecutionId) throws InterruptedException {
GetQueryExecutionRequest getQueryExecutionRequest = GetQueryExecutionRequest.builder()
.queryExecutionId(queryExecutionId).build();
GetQueryExecutionResponse getQueryExecutionResponse;
boolean isQueryStillRunning = true;
while (isQueryStillRunning) {
getQueryExecutionResponse = athenaClient.getQueryExecution(getQueryExecutionRequest);
String queryState = getQueryExecutionResponse.queryExecution().status().state().toString();
if (queryState.equals(QueryExecutionState.FAILED.toString())) {
throw new RuntimeException("Query Failed to run with Error Message: " + getQueryExecutionResponse
.queryExecution().status().stateChangeReason());
} else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) {
throw new RuntimeException("Query was cancelled.");
} else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) {
isQueryStillRunning = false;
} else {
// Sleep an amount of time before retrying again.
Thread.sleep(SLEEP_AMOUNT_IN_MS);
}
System.out.println("Current Status is: " + queryState);
}
}
/**
* This code calls Athena and retrieves the results of a query.
* The query must be in a completed state before the results can be retrieved and
* paginated. The first row of results are the column headers.
*/
private static void processResultRows(AthenaClient athenaClient, String queryExecutionId) {
GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
// Max Results can be set but if its not set,
// it will choose the maximum page size
// As of the writing of this code, the maximum value is 1000
// .withMaxResults(1000)
.queryExecutionId(queryExecutionId).build();
GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
for (GetQueryResultsResponse Resultresult : getQueryResultsResults) {
List<ColumnInfo> columnInfoList = Resultresult.resultSet().resultSetMetadata().columnInfo();
List<Row> results = Resultresult.resultSet().rows();
processRow(results, columnInfoList);
}
}
private static void processRow(List<Row> row, List<ColumnInfo> columnInfoList) {
for (ColumnInfo columnInfo : columnInfoList) {
System.out.println("Col:"+columnInfo.name());
}
for (Row rw : row) {
System.out.println(rw.data());
}
}
}
现在,该行StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest);
抛出一些异常
java.lang.BootstrapMethodError:在java.lang.invoke.CallSite.makeSite(CallSite.java:341)处调用网站初始化异常
我检查了很少的资源,他们说这可能是因为某些依赖性问题。我尝试了他们的建议并使用了以下内容
pom.xml
<dependencies>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.9</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.2</version>
</dependency>
[...]
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>athena</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
<version>2.5.19</version>
</dependency>
[...]
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.5.19</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。