开发者社区> 问答> 正文

如何用DTS Java SDK完成一些基本的操作



初始化RegionContext


RegionContext 主要用于保存设置安全认证信息及访问网络模式设置。下面代码显示如何初始化RegionContext,设置安全认证凭证及网络访问模式。 import java.util.List;
import com.aliyun.drc.clusterclient.RegionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
   public static void main(String[] args) throws Exception {
      // 创建一个RegionContext
      RegionContext context = new RegionContext();
      // 配置阿里云账号的AccessKey及AccessKeySecret
      context.setAccessKey("<AccessKey>");
      context.setSecret("<AccessKeySecret>");
       // 运行SDK的服务器是否使用公网IP连接DTS订阅通道
      context.setUsePublicIp(true);
     // 下面为其他调用代码 ……
        …………
    }
}


如果要使用SDK,必须先初始化RegionContext,配置连接订阅通道的安全认证等信息。上面的接口setAccessKey设置的是阿里云账号的AccessKeysetSecret 设置的是阿里云账号的AccessKeySecretAccessKey及AccessKeySecret是由阿里云的系统直接分配给用户的,称为ID对,用户标识用户,可到阿里云用户中心创建获取。setUsePublicIp 是告诉DTS,您本地SDK运行服务器是否用公网IP连接订阅通道。如果设置为true,那么订阅数据流走公网,否则走内网。

初始化ClusterClient


SDK连接订阅通道,接受增量数据等操作都是通过类ClusterClient来完成的,下面代码创建了一个ClusterClient import java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
public class MainClass
{
      public static void main(String[] args) throws Exception {
        // 创建一个RegionContext
          RegionContext context = new RegionContext();
          context.setAccessKey("<AccessKey>");
          context.setSecret("<AccessKeySecret>");
          context.setUsePublicIp(true);
          // 创建订阅消费者
          final ClusterClient client = new DefaultClusterClient(context);
          // 下面是一些其他调用代码
          ……………
    }
}



初始化Listener


消费数据的功能通过类Listener来实现。初始化完ClusterClient,需要添加listener,Listener要定义notify函数来接受订阅数据并进行数据消费。下面的代码中实现了最简单的消费逻辑,将订阅到的数据打印到屏幕。 import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
public class MainClass
{
    public static void main(String[] args) throws Exception {
        // 初始化一个RegionContext对象
        ………
        //初始化一个ClusterClient对象
        ………
        ClusterListener listener = new ClusterListener(){
             @Override
             public void notify(List<ClusterMessage> messages) throws Exception {
                  for (ClusterMessage message : messages) {  
                    //打印订阅到的增量数据
                      System.out.println(message.getRecord() + ":" + message.getRecord().getTablename() + ":"
                      + message.getRecord().getOpt());  
                      //消费完数据后向DTS汇报ACK,必须调用
                      message.ackAsConsumed();
              }
      }
     }
}


DTS实现了SDK的数据消费时间点保存到DTS服务端的机制,简化用户使用SDK时,实现SDK容灾的复杂度。上面示例代码中的 askAsConsumed()接口就是将SDK消费的最新一条数据的位点及时间戳汇报给DTS服务端。汇报了时间戳信息,如果SDK意外宕机重启后,会自动从DTS服务端获取这个消费时间点,然后从这个时间点重启,解决数据重复问题。

启动ClusterClientimport java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
   public static void main(String[] args) throws Exception {
    //初始化RegionContext
    …………
    //初始化ClusterClient
    …………
    //初始化ClusterListener
    …………
    // 添加监听者
      client.addConcurrentListener(listener);
      // 设置请求的订阅通道ID
      client.askForGUID("dts_rdsrjiei2u2afnb_DSF");
      // 启动后台线程, 注意这里不会阻塞, 主线程不能退出
      client.start();
}




上面代码中接口askForGUID设置这个client需要请求的订阅通道ID。这个订阅通道ID从DTS控制台上获取。一旦配置了订阅通道ID,那么这个SDK就能获取这个订阅通道中的增量数据。在启动client之前,需要将监听者listener添加到client中,这样当client从订阅通道中拉取到增量数据时,会同步回调用listener的notify方法开始进行数据消费。

展开
收起
云栖大讲堂 2017-10-31 13:47:05 2858 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
基于Java容器的多应用部署技术实践 立即下载
从《阿里巴巴Java开发手册》编写推广谈技术成长 立即下载
DTS控制台一本通 立即下载