订阅 OceanBase CLog 日志(Java程序)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 简介: 随着 OceanBase 数据库的开源,越来越多的企业开始使用 OceanBase,也有很多个人、机构开始学习 OceanBase,我也是其中之一。后续计划将自己的学习经验陆续总结出来,欢迎大家一起讨论。考虑到数据库是一个博大精深的领域,如有写的不对的地方欢迎指正。 本文主要通过 OceanBase-Mini 版本、LogProxy、已经 Java 程序,实现实时订阅 OB CLog 日志,仅限学习场景使用,不适合生产。

主要有 3 个步骤:

  • 部署 OB-Mini 版本
  • 部署 LogProxy
  • 编写、部署订阅程序

1 部署 OB-Mini 版本

OB 提供了 Mini 版,我们可以直接通过 Docker 进行部署。

  • 资源要求
  • CPU:2 核
  • 内存:10GB
  • 工具准备:
  • docker
  • 执行命令
docker run -p2881:2881 --name obce-mini -d oceanbase/obce-mini
  • 结果验证
  • 命令:
  • docker  ps
  • mysql -uroot   -h127.1 -P2881

2 部署 LogProxy

配置 Yum 源:

yum install -y yum-utils
yum-config-manager --add-repo https://mirrors.aliyun.com/oceanbase/OceanBase.repo

安装LogProxy,默认安装目录:/usr/local/oblogproxy 

yum install -y oblogproxy

修改系统租户密码:

SET PASSWORD FOR 'root'= PASSWORD('asdfgh')

配置系统租户信息:

生成账密:

[root@1fa1e2d1b8c8 bin]# ./logproxy  -x rootB13EE2C14A1FBCDCC0ECABFB1DF9A7C7
[root@1fa1e2d1b8c8 bin]# ./logproxy  -x asdfgh7CC8D321325019926AC51DA863C1CB0E

配置到conf 文件中:

"ob_sys_username": "B13EE2C14A1FBCDCC0ECABFB1DF9A7C7",

"ob_sys_password": "7CC8D321325019926AC51DA863C1CB0E",

[root@1fa1e2d1b8c8 oblogproxy]# cat conf/conf.json{
"service_port": 2983,
"encode_threadpool_size": 8,
"encode_queue_size": 20000,
"max_packet_bytes": 8388608,
"record_queue_size": 1024,
"read_timeout_us": 2000000,
"read_fail_interval_us": 1000000,
"read_wait_num": 20000,
"send_timeout_us": 2000000,
"send_fail_interval_us": 1000000,
"command_timeout_s": 10,
"log_quota_size_mb": 5120,
"log_quota_day": 30,
"log_gc_interval_s": 43200,
"oblogreader_path_retain_hour": 168,
"oblogreader_lease_s": 300,
"oblogreader_path": "./run",
"allow_all_tenant": true,
"auth_user": true,
"auth_use_rs": false,
"auth_allow_sys_user": true,
"ob_sys_username": "B13EE2C14A1FBCDCC0ECABFB1DF9A7C7",
"ob_sys_password": "7CC8D321325019926AC51DA863C1CB0E",
"counter_interval_s": 2,
"metric_interval_s": 120,
"debug": false,
"verbose": false,
"verbose_packet": false,
"readonly": false,
"count_record": false,
"channel_type": "plain",
"tls_ca_cert_file": "",
"tls_cert_file": "",
"tls_key_file": "",
"tls_verify_peer": true,
"liboblog_tls": false,
"liboblog_tls_cert_path": ""}

启动LogProxy:

[root@1fa1e2d1b8c8 oblogproxy]# bash ./run.sh startwork path : /usr/local/oblogproxy
is_running : (600)/usr/local/oblogproxy logproxy is running !
logproxy started!

3 订阅程序

  • 引入 mvn 依赖
<dependency><groupId>com.oceanbase.logclient</groupId><artifactId>logproxy-client</artifactId><version>1.0.1</version></dependency>
  • 代码
publicclassMain {
publicstaticvoidmain(String[] args) {
ObReaderConfigconfig=newObReaderConfig();
// 设置OceanBase root server 地址列表,格式为(可以支持多个,用';'分隔):ip1:rpc_port1:sql_port1;ip2:rpc_port2:sql_port2config.setRsList("127.0.0.1:2882:2881");
// 设置用户名和密码(非系统租户)config.setUsername("root");
config.setPassword("asdfgh");
// 设置启动位点(UNIX时间戳,单位s), 0表示从当前时间启动。config.setStartTimestamp(0L);
// 设置订阅表白名单,格式为:tenant.db.table, '*'表示通配.config.setTableWhiteList("sys.*.*");
// 指定oblogproxy服务地址,创建实例.LogProxyClientclient=newLogProxyClient("127.0.0.1", 2983, config);
// 添加 RecordListenerclient.addListener(newRecordListener() {
@Overridepublicvoidnotify(LogMessagemessage) {
System.out.println("--->msg:"+JSON.toJSONString(message));
System.out.println(String.format("--->dbType=%s,dbName=%s,tableName=%s,str=%s",
message.getDbType(),
message.getDbName(),
message.getTableName(),
message.getEncodingStr()));
            }
@OverridepublicvoidonException(LogProxyClientExceptione) {
// 处理错误if (e.needStop()) {
// 不可恢复异常,需要停止Clientclient.stop();
                }
            }
        });
// 启动client.start();
client.join();
    }
}
  • 启动执行:
[root@1fa1e2d1b8c8 ~]# java -jar my-project-name-jar-with-dependencies.jar15:27:29.503 [Thread-0] WARN com.oceanbase.clogproxy.client.connection.ClientStream -start to reconnect...
15:27:29.568 [Thread-0] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
15:27:29.573 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 --Dio.netty.noUnsafe: false15:27:29.574 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 815:27:29.576 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
15:27:29.579 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
15:27:29.580 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
15:27:29.581 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
15:27:29.584 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true15:27:29.585 [Thread-0] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9

小结

通过 OceanBase 开源的组件可以容易的实现简单的订阅功能,后续将使用数据同步中间件将数据写入到 kafka、flink 中。欢迎交流。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
4月前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
5月前
|
XML 存储 JSON
Java程序部署
Java程序部署
|
21天前
|
传感器 安全 算法
消防救援支队消防员单兵装备智能养护舱电机驱动java版程序(二)
本文探讨消防救援中智能养护舱电机驱动的Java程序设计,作为系列文章第二部分。通过自动化和智能化手段,智能养护舱提升了装备维护效率与准确性。文章详细介绍了电机驱动模块的设计与实现,包括硬件选型、PID控制策略、安全保护机制及Java程序架构,确保电机精确控制、稳定性和安全性。未来将优化功能并引入智能算法和物联网技术,进一步提升装备维护智能化水平。
|
21天前
|
IDE Java 开发工具
消防救援支队消防员单兵装备智能养护舱点击驱动java版程序(一)
智能消防作战服架通过电机驱动系统提升消防员作业效率和安全性。本文介绍基于Java的电机驱动程序开发,涵盖硬件准备、软件环境搭建及驱动程序实现。重点包括串口通信配置、电机控制类设计与控制逻辑实现,确保电机高效稳定运行。通过正确配置通信协议和串口参数,并添加异常处理机制,保障系统的安全性和可靠性。
|
3月前
|
SQL 安全 Java
Java 异常处理:筑牢程序稳定性的 “安全网”
本文深入探讨Java异常处理,涵盖异常的基础分类、处理机制及最佳实践。从`Error`与`Exception`的区分,到`try-catch-finally`和`throws`的运用,再到自定义异常的设计,全面解析如何有效管理程序中的异常情况,提升代码的健壮性和可维护性。通过实例代码,帮助开发者掌握异常处理技巧,确保程序稳定运行。
71 2
|
3月前
|
IDE Java 编译器
开发 Java 程序一定要安装 JDK 吗
开发Java程序通常需要安装JDK(Java Development Kit),因为它包含了编译、运行和调试Java程序所需的各种工具和环境。不过,某些集成开发环境(IDE)可能内置了JDK,或可使用在线Java编辑器,无需单独安装。
137 2
|
4月前
|
Java Maven 数据安全/隐私保护
如何实现Java打包程序的加密代码混淆,避免被反编译?
【10月更文挑战第15天】如何实现Java打包程序的加密代码混淆,避免被反编译?
743 2
|
4月前
|
安全 Java Linux
java程序设置开机自启
java程序设置开机自启
214 1
|
4月前
|
运维 Java Linux
【运维基础知识】Linux服务器下手写启停Java程序脚本start.sh stop.sh及详细说明
### 启动Java程序脚本 `start.sh` 此脚本用于启动一个Java程序,设置JVM字符集为GBK,最大堆内存为3000M,并将程序的日志输出到`output.log`文件中,同时在后台运行。 ### 停止Java程序脚本 `stop.sh` 此脚本用于停止指定名称的服务(如`QuoteServer`),通过查找并终止该服务的Java进程,输出操作结果以确认是否成功。
158 1
|
5月前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
65 5

热门文章

最新文章