初始化RegionContext
RegionContext 主要用于保存设置安全认证信息及访问网络模式设置。下面代码显示如何初始化RegionContext,设置安全认证凭证及网络访问模式。
import java.util.List;
import com.aliyun.drc.clusterclient.RegionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
public static void main(String[] args) throws Exception {
// 创建一个RegionContext
RegionContext context = new RegionContext();
// 配置阿里云账号的AccessKey及AccessKeySecret
context.setAccessKey("<AccessKey>");
context.setSecret("<AccessKeySecret>");
// 运行SDK的服务器是否使用公网IP连接DTS订阅通道
context.setUsePublicIp(true);
// 下面为其他调用代码 ……
…………
}
}
如果要使用SDK,必须先初始化RegionContext,配置连接订阅通道的安全认证等信息。上面的接口setAccessKey设置的是阿里云账号的AccessKeysetSecret 设置的是阿里云账号的AccessKeySecretAccessKey及AccessKeySecret是由阿里云的系统直接分配给用户的,称为ID对,用户标识用户,可到阿里云用户中心创建获取。setUsePublicIp 是告诉DTS,您本地SDK运行服务器是否用公网IP连接订阅通道。如果设置为true,那么订阅数据流走公网,否则走内网。
初始化ClusterClient
SDK连接订阅通道,接受增量数据等操作都是通过类ClusterClient来完成的,下面代码创建了一个ClusterClient
import java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
public class MainClass
{
public static void main(String[] args) throws Exception {
// 创建一个RegionContext
RegionContext context = new RegionContext();
context.setAccessKey("<AccessKey>");
context.setSecret("<AccessKeySecret>");
context.setUsePublicIp(true);
// 创建订阅消费者
final ClusterClient client = new DefaultClusterClient(context);
// 下面是一些其他调用代码
……………
}
}
初始化Listener
消费数据的功能通过类Listener来实现。初始化完ClusterClient,需要添加listener,Listener要定义notify函数来接受订阅数据并进行数据消费。下面的代码中实现了最简单的消费逻辑,将订阅到的数据打印到屏幕。
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
public class MainClass
{
public static void main(String[] args) throws Exception {
// 初始化一个RegionContext对象
………
//初始化一个ClusterClient对象
………
ClusterListener listener = new ClusterListener(){
@Override
public void notify(List<ClusterMessage> messages) throws Exception {
for (ClusterMessage message : messages) {
//打印订阅到的增量数据
System.out.println(message.getRecord() + ":" + message.getRecord().getTablename() + ":"
+ message.getRecord().getOpt());
//消费完数据后向DTS汇报ACK,必须调用
message.ackAsConsumed();
}
}
}
}
DTS实现了SDK的数据消费时间点保存到DTS服务端的机制,简化用户使用SDK时,实现SDK容灾的复杂度。上面示例代码中的 askAsConsumed()接口就是将SDK消费的最新一条数据的位点及时间戳汇报给DTS服务端。汇报了时间戳信息,如果SDK意外宕机重启后,会自动从DTS服务端获取这个消费时间点,然后从这个时间点重启,解决数据重复问题。
启动ClusterClientimport java.util.List;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.DefaultClusterClient;
import com.aliyun.drc.clusterclient.RegionContext;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainClass
{
public static void main(String[] args) throws Exception {
//初始化RegionContext
…………
//初始化ClusterClient
…………
//初始化ClusterListener
…………
// 添加监听者
client.addConcurrentListener(listener);
// 设置请求的订阅通道ID
client.askForGUID("dts_rdsrjiei2u2afnb_DSF");
// 启动后台线程, 注意这里不会阻塞, 主线程不能退出
client.start();
}
上面代码中接口askForGUID设置这个client需要请求的订阅通道ID。这个订阅通道ID从DTS控制台上获取。一旦配置了订阅通道ID,那么这个SDK就能获取这个订阅通道中的增量数据。在启动client之前,需要将监听者listener添加到client中,这样当client从订阅通道中拉取到增量数据时,会同步回调用listener的notify方法开始进行数据消费。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。