概述
前面通过博客:流数据同步DataConnector测试整理简要介绍了DataConnector的配置。下面通过一个示例,从maxcompute建表开始,介绍整个链路的实现。
实验目的
使用Datahub SDK写入数据到Topic,进而经过Dataconnector推送数据到maxcompute的数据表。
实验步骤
1、Dataworks建表SQL脚本
CREATE TABLE IF NOT EXISTS ods_log_tracker( ip STRING COMMENT 'client ip address', user STRING, accesstime string, method STRING COMMENT 'HTTP request type, such as GET POST...', url STRING, protocol STRING, status BIGINT COMMENT 'HTTP reponse code from server', byte_cnt BIGINT, referer STRING, agent STRING) PARTITIONED BY(dt STRING);
2、Datahub控制台创建Topic,并关联创建DataConnector
关于分区选择细节参考链接。
3、创建效果
4、Java SDK发送信息到Topic
- pom.xml
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.13.0-public</version>
</dependency>
- Java Code Sample
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.DatahubClientBuilder;
import com.aliyun.datahub.client.auth.AliyunAccount;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.*;
import com.aliyun.datahub.client.http.HttpConfig;
import com.aliyun.datahub.client.model.*;
import java.util.ArrayList;
import java.util.List;
public class SendData1 {
public static void main(String[] args) {
// Endpoint以Region: 华北2为例,其他Region请按实际情况填写
String endpoint = "http://dh-cn-beijing.aliyuncs.com";
String accessId = "********";
String accessKey = "********";
String projectName = "odpsdemo"; // project项目名称
String topicName = "ods_log_tracker"; // topic名称
String shardId = "0"; // 分区ID
// 创建DataHubClient实例
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
// 是否开启二进制传输,服务端2.12版本开始支持
new AliyunAccount(accessId, accessKey), true))
//专有云使用出错尝试将参数设置为 false
// HttpConfig可不设置,不设置时采用默认值
.setHttpConfig(new HttpConfig().setConnTimeout(10000))
.build();
// 写入Tuple型数据
RecordSchema recordSchema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
// 生成100条数据
List<RecordEntry> recordEntries = new ArrayList<>();
for (int i = 0; i < 100; ++i) {
RecordEntry recordEntry = new RecordEntry();
TupleRecordData data = new TupleRecordData(recordSchema);
data.setField("ip","ip");
data.setField("user","user");
data.setField("accesstime","accesstime");
data.setField("method","method");
data.setField("url","url");
data.setField("protocol","protocol");
data.setField("referer","referer");
data.setField("agent","agent");
data.setField("dt","dt");
data.setField("status",1L);
data.setField("byte_cnt",100L);
recordEntry.setRecordData(data);
recordEntry.setShardId(shardId);
recordEntries.add(recordEntry);
}
try {
// 服务端从2.12版本开始支持,之前版本请使用putRecords接口
//datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
datahubClient.putRecords(projectName, topicName, recordEntries);
System.out.println("write data successful");
} catch (InvalidParameterException e) {
System.out.println("invalid parameter, please check your parameter");
System.exit(1);
} catch (AuthorizationFailureException e) {
System.out.println("AK error, please check your accessId and accessKey");
System.exit(1);
} catch (ResourceNotFoundException e) {
System.out.println("project or topic or shard not found");
System.exit(1);
} catch (ShardSealedException e) {
System.out.println("shard status is CLOSED, can not write");
System.exit(1);
} catch (DatahubClientException e) {
System.out.println("other error");
System.out.println(e);
System.exit(1);
}
}
}
- 写入Topic情况查看
4、数据流转情况查看