DataHub通过DataConnector流转到MaxCompute全链路测试

简介: 前面通过博客:流数据同步DataConnector测试整理简要介绍了DataConnector的配置。下面通过一个示例,从maxcompute建表开始,介绍整个链路的实现。实现使用Datahub SDK写入数据到Topic,进而经过Dataconnector推送数据到maxcompute的数据表。

概述

前面通过博客:流数据同步DataConnector测试整理简要介绍了DataConnector的配置。下面通过一个示例,从maxcompute建表开始,介绍整个链路的实现。

实验目的

使用Datahub SDK写入数据到Topic,进而经过Dataconnector推送数据到maxcompute的数据表。

实验步骤

1、Dataworks建表SQL脚本

CREATE TABLE IF NOT EXISTS ods_log_tracker( ip STRING COMMENT 'client ip address', user STRING, accesstime string, method STRING COMMENT 'HTTP request type, such as GET POST...', url STRING, protocol STRING, status BIGINT COMMENT 'HTTP reponse code from server', byte_cnt BIGINT, referer STRING, agent STRING) PARTITIONED BY(dt STRING);

_

2、Datahub控制台创建Topic,并关联创建DataConnector

_
_

关于分区选择细节参考链接

3、创建效果
_

4、Java SDK发送信息到Topic

  • pom.xml
        <dependency>
            <groupId>com.aliyun.datahub</groupId>
            <artifactId>aliyun-sdk-datahub</artifactId>
            <version>2.13.0-public</version>
        </dependency>
  • Java Code Sample
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.DatahubClientBuilder;
import com.aliyun.datahub.client.auth.AliyunAccount;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.*;
import com.aliyun.datahub.client.http.HttpConfig;
import com.aliyun.datahub.client.model.*;
import java.util.ArrayList;
import java.util.List;

public class SendData1 {

    public static void main(String[] args) {

        // Endpoint以Region: 华北2为例,其他Region请按实际情况填写
        String endpoint = "http://dh-cn-beijing.aliyuncs.com";
        String accessId = "********";
        String accessKey = "********";
        String projectName = "odpsdemo";  // project项目名称
        String topicName = "ods_log_tracker";  // topic名称
        String shardId = "0";  // 分区ID
        // 创建DataHubClient实例
        DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
                .setDatahubConfig(
                        new DatahubConfig(endpoint,
                                // 是否开启二进制传输,服务端2.12版本开始支持
                                new AliyunAccount(accessId, accessKey), true))
                //专有云使用出错尝试将参数设置为       false
                // HttpConfig可不设置,不设置时采用默认值
                .setHttpConfig(new HttpConfig().setConnTimeout(10000))
                .build();

        // 写入Tuple型数据
        RecordSchema recordSchema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
        // 生成100条数据
        List<RecordEntry> recordEntries = new ArrayList<>();
            for (int i = 0; i < 100; ++i) {
                RecordEntry recordEntry = new RecordEntry();
                TupleRecordData data = new TupleRecordData(recordSchema);
                data.setField("ip","ip");
                data.setField("user","user");
                data.setField("accesstime","accesstime");
                data.setField("method","method");
                data.setField("url","url");
                data.setField("protocol","protocol");
                data.setField("referer","referer");
                data.setField("agent","agent");
                data.setField("dt","dt");
                data.setField("status",1L);
                data.setField("byte_cnt",100L);

                recordEntry.setRecordData(data);
                recordEntry.setShardId(shardId);
                recordEntries.add(recordEntry);
            }
            try {
                // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
                //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
                datahubClient.putRecords(projectName, topicName, recordEntries);
                System.out.println("write data successful");
            } catch (InvalidParameterException e) {
                System.out.println("invalid parameter, please check your parameter");
                System.exit(1);
            } catch (AuthorizationFailureException e) {
                System.out.println("AK error, please check your accessId and accessKey");
                System.exit(1);
            } catch (ResourceNotFoundException e) {
                System.out.println("project or topic or shard not found");
                System.exit(1);
            } catch (ShardSealedException e) {
                System.out.println("shard status is CLOSED, can not write");
                System.exit(1);
            } catch (DatahubClientException e) {
                System.out.println("other error");
                System.out.println(e);
                System.exit(1);
            }
    }
}
  • 写入Topic情况查看
    _

4、数据流转情况查看
_
_

参考链接

DataHub Java SDK介绍
DataWorks 创建业务流程

相关实践学习
实时数据及离线数据上云方案
本实验通过使用CANAL、DataHub、DataWorks、MaxCompute服务,实现数据上云,解决了数据孤岛问题,同时把数据迁移到云计算平台,对后续数据的计算和应用提供了第一步开山之路。
相关文章
|
3月前
|
分布式计算 大数据 Hadoop
如何对大数据应用进行性能测试
如何对大数据应用进行性能测试
|
3月前
|
存储 分布式计算 大数据
【大数据分布并行处理】单元测试(八)
【大数据分布并行处理】单元测试(八)
76 1
|
3月前
|
SQL 分布式计算 HIVE
【大数据分布并行处理】单元测试(九)
【大数据分布并行处理】单元测试(九)
53 0
|
4月前
|
Oracle 关系型数据库 大数据
助力工业物联网,工业大数据之服务域:Shell调度测试【三十三】
助力工业物联网,工业大数据之服务域:Shell调度测试【三十三】
20 1
|
4月前
|
SQL Oracle 关系型数据库
助力工业物联网,工业大数据之ODS层构建:申明分区代码及测试【十】
助力工业物联网,工业大数据之ODS层构建:申明分区代码及测试【十】
43 0
|
4月前
|
存储 分布式计算 大数据
首批!阿里云MaxCompute完成中国信通院基于无服务器架构大数据平台测试
近日,阿里云计算有限公司MaxCompute产品顺利完成中国信通院首批无服务器架构(Serverless)大数据平台测试。
221 0
|
6月前
|
运维 Kubernetes jenkins
【Kubernetes测试生产环境整体部署及全链路测试、自动化运维平台Jenkins与Devops环境搭建】
【Kubernetes测试生产环境整体部署及全链路测试、自动化运维平台Jenkins与Devops环境搭建】
221 0
|
11月前
|
分布式计算 DataWorks 关系型数据库
带你读《全链路数据治理-全域数据集成》之22:1. 背景信息
带你读《全链路数据治理-全域数据集成》之22:1. 背景信息
168 0
|
11月前
带你读《全链路数据治理-全域数据集成》之23:2. 使用限制
带你读《全链路数据治理-全域数据集成》之23:2. 使用限制
135 0
|
11月前
|
分布式计算 DataWorks 关系型数据库
带你读《全链路数据治理-全域数据集成》之24:3. 准备工作:添加数据源
带你读《全链路数据治理-全域数据集成》之24:3. 准备工作:添加数据源
157 0