主要有 3 个步骤:
- 部署 OB-Mini 版本
- 部署 LogProxy
- 编写、部署订阅程序
1 部署 OB-Mini 版本
OB 提供了 Mini 版,我们可以直接通过 Docker 进行部署。
- 资源要求
- CPU:2 核
- 内存:10GB
- 工具准备:
- docker
- 执行命令
docker run -p2881:2881 --name obce-mini -d oceanbase/obce-mini
- 结果验证
- 命令:
- docker ps
- mysql -uroot -h127.1 -P2881
2 部署 LogProxy
配置 Yum 源:
yum install -y yum-utils yum-config-manager --add-repo https://mirrors.aliyun.com/oceanbase/OceanBase.repo
安装LogProxy,默认安装目录:/usr/local/oblogproxy
yum install -y oblogproxy
修改系统租户密码:
SET PASSWORD FOR 'root'= PASSWORD('asdfgh');
配置系统租户信息:
生成账密:
[root@1fa1e2d1b8c8 bin]# ./logproxy -x rootB13EE2C14A1FBCDCC0ECABFB1DF9A7C7 [root@1fa1e2d1b8c8 bin]# ./logproxy -x asdfgh7CC8D321325019926AC51DA863C1CB0E
配置到conf 文件中:
"ob_sys_username": "B13EE2C14A1FBCDCC0ECABFB1DF9A7C7",
"ob_sys_password": "7CC8D321325019926AC51DA863C1CB0E",
[root@1fa1e2d1b8c8 oblogproxy]# cat conf/conf.json{ "service_port": 2983, "encode_threadpool_size": 8, "encode_queue_size": 20000, "max_packet_bytes": 8388608, "record_queue_size": 1024, "read_timeout_us": 2000000, "read_fail_interval_us": 1000000, "read_wait_num": 20000, "send_timeout_us": 2000000, "send_fail_interval_us": 1000000, "command_timeout_s": 10, "log_quota_size_mb": 5120, "log_quota_day": 30, "log_gc_interval_s": 43200, "oblogreader_path_retain_hour": 168, "oblogreader_lease_s": 300, "oblogreader_path": "./run", "allow_all_tenant": true, "auth_user": true, "auth_use_rs": false, "auth_allow_sys_user": true, "ob_sys_username": "B13EE2C14A1FBCDCC0ECABFB1DF9A7C7", "ob_sys_password": "7CC8D321325019926AC51DA863C1CB0E", "counter_interval_s": 2, "metric_interval_s": 120, "debug": false, "verbose": false, "verbose_packet": false, "readonly": false, "count_record": false, "channel_type": "plain", "tls_ca_cert_file": "", "tls_cert_file": "", "tls_key_file": "", "tls_verify_peer": true, "liboblog_tls": false, "liboblog_tls_cert_path": ""}
启动LogProxy:
[root@1fa1e2d1b8c8 oblogproxy]# bash ./run.sh startwork path : /usr/local/oblogproxy is_running : (600)/usr/local/oblogproxy logproxy is running ! logproxy started!
3 订阅程序
- 引入 mvn 依赖
<dependency><groupId>com.oceanbase.logclient</groupId><artifactId>logproxy-client</artifactId><version>1.0.1</version></dependency>
- 代码
publicclassMain { publicstaticvoidmain(String[] args) { ObReaderConfigconfig=newObReaderConfig(); // 设置OceanBase root server 地址列表,格式为(可以支持多个,用';'分隔):ip1:rpc_port1:sql_port1;ip2:rpc_port2:sql_port2config.setRsList("127.0.0.1:2882:2881"); // 设置用户名和密码(非系统租户)config.setUsername("root"); config.setPassword("asdfgh"); // 设置启动位点(UNIX时间戳,单位s), 0表示从当前时间启动。config.setStartTimestamp(0L); // 设置订阅表白名单,格式为:tenant.db.table, '*'表示通配.config.setTableWhiteList("sys.*.*"); // 指定oblogproxy服务地址,创建实例.LogProxyClientclient=newLogProxyClient("127.0.0.1", 2983, config); // 添加 RecordListenerclient.addListener(newRecordListener() { publicvoidnotify(LogMessagemessage) { System.out.println("--->msg:"+JSON.toJSONString(message)); System.out.println(String.format("--->dbType=%s,dbName=%s,tableName=%s,str=%s", message.getDbType(), message.getDbName(), message.getTableName(), message.getEncodingStr())); } publicvoidonException(LogProxyClientExceptione) { // 处理错误if (e.needStop()) { // 不可恢复异常,需要停止Clientclient.stop(); } } }); // 启动client.start(); client.join(); } }
- 启动执行:
[root@1fa1e2d1b8c8 ~]# java -jar my-project-name-jar-with-dependencies.jar15:27:29.503 [Thread-0] WARN com.oceanbase.clogproxy.client.connection.ClientStream -start to reconnect... 15:27:29.568 [Thread-0] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework 15:27:29.573 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 --Dio.netty.noUnsafe: false15:27:29.574 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 815:27:29.576 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available 15:27:29.579 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available 15:27:29.580 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available 15:27:29.581 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available 15:27:29.584 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true15:27:29.585 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
小结
通过 OceanBase 开源的组件可以容易的实现简单的订阅功能,后续将使用数据同步中间件将数据写入到 kafka、flink 中。欢迎交流。