订阅 OceanBase CLog 日志(Java程序)

简介: 简介: 随着 OceanBase 数据库的开源,越来越多的企业开始使用 OceanBase,也有很多个人、机构开始学习 OceanBase,我也是其中之一。后续计划将自己的学习经验陆续总结出来,欢迎大家一起讨论。考虑到数据库是一个博大精深的领域,如有写的不对的地方欢迎指正。 本文主要通过 OceanBase-Mini 版本、LogProxy、已经 Java 程序,实现实时订阅 OB CLog 日志,仅限学习场景使用,不适合生产。

主要有 3 个步骤:

  • 部署 OB-Mini 版本
  • 部署 LogProxy
  • 编写、部署订阅程序

1 部署 OB-Mini 版本

OB 提供了 Mini 版,我们可以直接通过 Docker 进行部署。

  • 资源要求
  • CPU:2 核
  • 内存:10GB
  • 工具准备:
  • docker
  • 执行命令
docker run -p2881:2881 --name obce-mini -d oceanbase/obce-mini
  • 结果验证
  • 命令:
  • docker  ps
  • mysql -uroot   -h127.1 -P2881

2 部署 LogProxy

配置 Yum 源:

yum install -y yum-utils
yum-config-manager --add-repo https://mirrors.aliyun.com/oceanbase/OceanBase.repo

安装LogProxy,默认安装目录:/usr/local/oblogproxy 

yum install -y oblogproxy

修改系统租户密码:

SET PASSWORD FOR 'root'= PASSWORD('asdfgh')

配置系统租户信息:

生成账密:

[root@1fa1e2d1b8c8 bin]# ./logproxy  -x rootB13EE2C14A1FBCDCC0ECABFB1DF9A7C7
[root@1fa1e2d1b8c8 bin]# ./logproxy  -x asdfgh7CC8D321325019926AC51DA863C1CB0E

配置到conf 文件中:

"ob_sys_username": "B13EE2C14A1FBCDCC0ECABFB1DF9A7C7",

"ob_sys_password": "7CC8D321325019926AC51DA863C1CB0E",

[root@1fa1e2d1b8c8 oblogproxy]# cat conf/conf.json{
"service_port": 2983,
"encode_threadpool_size": 8,
"encode_queue_size": 20000,
"max_packet_bytes": 8388608,
"record_queue_size": 1024,
"read_timeout_us": 2000000,
"read_fail_interval_us": 1000000,
"read_wait_num": 20000,
"send_timeout_us": 2000000,
"send_fail_interval_us": 1000000,
"command_timeout_s": 10,
"log_quota_size_mb": 5120,
"log_quota_day": 30,
"log_gc_interval_s": 43200,
"oblogreader_path_retain_hour": 168,
"oblogreader_lease_s": 300,
"oblogreader_path": "./run",
"allow_all_tenant": true,
"auth_user": true,
"auth_use_rs": false,
"auth_allow_sys_user": true,
"ob_sys_username": "B13EE2C14A1FBCDCC0ECABFB1DF9A7C7",
"ob_sys_password": "7CC8D321325019926AC51DA863C1CB0E",
"counter_interval_s": 2,
"metric_interval_s": 120,
"debug": false,
"verbose": false,
"verbose_packet": false,
"readonly": false,
"count_record": false,
"channel_type": "plain",
"tls_ca_cert_file": "",
"tls_cert_file": "",
"tls_key_file": "",
"tls_verify_peer": true,
"liboblog_tls": false,
"liboblog_tls_cert_path": ""}

启动LogProxy:

[root@1fa1e2d1b8c8 oblogproxy]# bash ./run.sh startwork path : /usr/local/oblogproxy
is_running : (600)/usr/local/oblogproxy logproxy is running !
logproxy started!

3 订阅程序

  • 引入 mvn 依赖
<dependency><groupId>com.oceanbase.logclient</groupId><artifactId>logproxy-client</artifactId><version>1.0.1</version></dependency>
  • 代码
publicclassMain {
publicstaticvoidmain(String[] args) {
ObReaderConfigconfig=newObReaderConfig();
// 设置OceanBase root server 地址列表,格式为(可以支持多个,用';'分隔):ip1:rpc_port1:sql_port1;ip2:rpc_port2:sql_port2config.setRsList("127.0.0.1:2882:2881");
// 设置用户名和密码(非系统租户)config.setUsername("root");
config.setPassword("asdfgh");
// 设置启动位点(UNIX时间戳,单位s), 0表示从当前时间启动。config.setStartTimestamp(0L);
// 设置订阅表白名单,格式为:tenant.db.table, '*'表示通配.config.setTableWhiteList("sys.*.*");
// 指定oblogproxy服务地址,创建实例.LogProxyClientclient=newLogProxyClient("127.0.0.1", 2983, config);
// 添加 RecordListenerclient.addListener(newRecordListener() {
@Overridepublicvoidnotify(LogMessagemessage) {
System.out.println("--->msg:"+JSON.toJSONString(message));
System.out.println(String.format("--->dbType=%s,dbName=%s,tableName=%s,str=%s",
message.getDbType(),
message.getDbName(),
message.getTableName(),
message.getEncodingStr()));
            }
@OverridepublicvoidonException(LogProxyClientExceptione) {
// 处理错误if (e.needStop()) {
// 不可恢复异常,需要停止Clientclient.stop();
                }
            }
        });
// 启动client.start();
client.join();
    }
}
  • 启动执行:
[root@1fa1e2d1b8c8 ~]# java -jar my-project-name-jar-with-dependencies.jar15:27:29.503 [Thread-0] WARN com.oceanbase.clogproxy.client.connection.ClientStream -start to reconnect...
15:27:29.568 [Thread-0] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
15:27:29.573 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 --Dio.netty.noUnsafe: false15:27:29.574 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 815:27:29.576 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
15:27:29.579 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
15:27:29.580 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
15:27:29.581 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
15:27:29.584 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true15:27:29.585 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9

小结

通过 OceanBase 开源的组件可以容易的实现简单的订阅功能,后续将使用数据同步中间件将数据写入到 kafka、flink 中。欢迎交流。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
21天前
|
存储 数据库 OceanBase
想问一下OceanBase数据库这个的日志都能删除么?clog
是的,OceanBase数据库企业版确实支持performance_schema。performance_schema是一个集合了数据库服务器性能参数信息的数据字典,它能够帮助我们收集和存储数据库服务器的性能参数信息,以便于我们对数据库进行更好的优化和管理。
97 5
|
21天前
|
存储 关系型数据库 数据库
OceanBase数据库常见问题之修改日志盘的数据存储路径后控件不生效如何解决
OceanBase 是一款由阿里巴巴集团研发的企业级分布式关系型数据库,它具有高可用、高性能、可水平扩展等特点。以下是OceanBase 数据库使用过程中可能遇到的一些常见问题及其解答的汇总,以帮助用户更好地理解和使用这款数据库产品。
|
21天前
|
存储 数据库 OceanBase
OceanBase数据库的磁盘配置包括数据盘和事务日志盘的大小
OceanBase数据库的磁盘配置包括数据盘和事务日志盘的大小
44 1
|
21天前
|
数据库 OceanBase
OceanBase数据库中,clog和slog文件夹的内容
OceanBase数据库中,clog和slog文件夹的内容
105 1
|
21天前
|
数据库 OceanBase
在OceanBase数据库中,clog和slog文件夹的内容包含了事务日志和系统日志
在OceanBase数据库中,clog和slog文件夹的内容包含了事务日志和系统日志
101 7
|
6月前
|
网络协议 Android开发 虚拟化
Android Studio无法运行程序调试程序出现Unable to connect to ADB.Check the Event Log for possible issues.Verify th
Android Studio无法运行程序调试程序出现Unable to connect to ADB.Check the Event Log for possible issues.Verify th
66 0
Android Studio无法运行程序调试程序出现Unable to connect to ADB.Check the Event Log for possible issues.Verify th
|
19天前
|
关系型数据库 MySQL 数据库
实时计算 Flink版产品使用合集之支持将数据写入 OceanBase 数据库吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
21天前
|
Oracle 关系型数据库 数据库
OceanBase数据库常见问题之租户创建后想要改字符集如何解决
OceanBase 是一款由阿里巴巴集团研发的企业级分布式关系型数据库,它具有高可用、高性能、可水平扩展等特点。以下是OceanBase 数据库使用过程中可能遇到的一些常见问题及其解答的汇总,以帮助用户更好地理解和使用这款数据库产品。
|
21天前
|
SQL 存储 监控
OceanBase数据库常见问题之提示no such file ordirectory如何解决
OceanBase 是一款由阿里巴巴集团研发的企业级分布式关系型数据库,它具有高可用、高性能、可水平扩展等特点。以下是OceanBase 数据库使用过程中可能遇到的一些常见问题及其解答的汇总,以帮助用户更好地理解和使用这款数据库产品。
|
21天前
|
SQL Oracle 关系型数据库
OceanBase数据库常见问题之慢SQL不显示如何解决
OceanBase 是一款由阿里巴巴集团研发的企业级分布式关系型数据库,它具有高可用、高性能、可水平扩展等特点。以下是OceanBase 数据库使用过程中可能遇到的一些常见问题及其解答的汇总,以帮助用户更好地理解和使用这款数据库产品。