BottledWater-PG:PostgreSQL集成Kafka的实时数据交换平台

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核8GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: 一 安装BottledWater-PG的安装前文已经表述,本文不赘述直接进入集成应用阶段。二 启动KafKa#启动zookeeper[root@bogon kafka_2.

一 安装

BottledWater-PG的安装前文已经表述,本文不赘述直接进入集成应用阶段。

二 启动KafKa

#启动zookeeper
[root@bogon kafka_2.11-0.10.2.0]# bin/zookeeper-server-start.sh config/zookeeper.properties
#启动kafka服务端
[root@bogon kafka_2.11-0.10.2.0]# bin/kafka-server-start.sh config/server.properties

三 配置PostgreSQL

3.1 配置读取权限

Bottled Water会连接到postgresql获取相关数据,连接的账户需要有replication权限,pg中数据库的变化存储在WAL中,至少需要replication权限才能读取WAL。
编辑$PGDATA目录中postgresql.conf和pg_hba.conf文件。

vi $PGDATA/postgresql.conf
#编辑内容如下:
listen_addresses = '*'
port = 5432 
wal_level = logical         
max_wal_senders = 8
wal_keep_segments = 4
max_replication_slots = 4
vi $PGDATA/pg_hba.conf
#编辑内容如下:

# IPv4 local connections:
host    all             all             0.0.0.0/0               md5
# replication privilege.
local   replication     freerep                                trust
host    replication     freerep        127.0.0.1/32            trust
host    replication     freerep        ::1/128                 trust

编辑完保存,重启数据库服务:

pg_ctl restart
psql
postgres=# CREATE ROLE freerep  WITH REPLICATION PASSWORD 'password' LOGIN;
CREATE ROLE

配置完毕!

3.2 Bottled Water使用演示

3.2.1 创建测试库表

创建一个测试数据库,建立测试表

postgres=# create database mcsas;
postgres=# \c mcsas;
mcsas=# create extension bottledwater;
mcsas=# create extension postgis;
#赋予public下的表给freerep角色,要创建如下语句,否则建立的表freerep没有读取权限
mcsas=# alter default privileges in schema public grant all on tables to freerep;
mcsas=# create table gps(gid serial primary key,name text,geom text);
mcsas=# create index gps_geom_idx on gps using gist(ST_GeomFromText(geom,4326));

在另一个终端启动bottledwater可执行程序:

source /home/postgres/.bashrc
cd /opt/bottledwater-pg-master/kafka
[root@localhost kafka]# ./bottledwater -d postgres://freerep:password@127.0.0.1/mcsas -b 192.168.43.27:9092 -f json

启动结果如下:

[root@bogon kafka]# ./bottledwater -d postgres://freerep:password@127.0.0.1/mcsas -b 192.168.43.27:9092 -f json
[INFO] Writing messages to Kafka in JSON format
[INFO] Created replication slot "bottledwater", capturing consistent snapshot "0000DA72-1".
INFO:  bottledwater_export: Table public.spatial_ref_sys is keyed by index spatial_ref_sys_pkey
INFO:  bottledwater_export: Table public.mark is keyed by index mark_pkey
[INFO] Registering metadata for table spatial_ref_sys (relid 24263)
[INFO] Opening Kafka topic "spatial_ref_sys" for table "spatial_ref_sys"
[INFO] Storing key schema for table 24263
[INFO] Storing row schema for table 24263
[INFO] Snapshot complete, streaming changes from 0/AB016F30.

代表启动成功了。

3.2.2 监听数据改变消息

插入数据

mcsas=# insert into gps(name,geom) values ('china','Point(118 32)');
INSERT 0 1
mcsas=# insert into gps(name,geom) values ('england','Point(118 12)');
INSERT 0 1

启动监听topic

bin/kafka-console-consumer.sh --bootstrap-server 192.168.43.27:9092 --topic gps --from-beginning
{"gid": {"int": 1}, "name": {"string": "china"}, "geom": {"string": "Point(118 32)"}}
{"gid": {"int": 2}, "name": {"string": "england"}, "geom": {"string": "Point(118 12)"}}

每当插入或者更新,收听的消息会源源不断的输出出来,这样,pg与kafka集成就完毕了。

相关文章
|
23天前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
164 43
|
15天前
|
存储 人工智能 NoSQL
Airweave:快速集成应用数据打造AI知识库的开源平台,支持多源整合和自动同步数据
Airweave 是一个开源工具,能够将应用程序的数据同步到图数据库和向量数据库中,实现智能代理检索。它支持无代码集成、多租户支持和自动同步等功能。
86 14
|
1月前
|
机器学习/深度学习 PyTorch 测试技术
LossVal:一种集成于损失函数的高效数据价值评估方法
LossVal是一种创新的机器学习方法,通过在损失函数中引入实例级权重,直接在训练过程中评估数据点的重要性,避免了传统方法中反复重训练模型的高计算成本。该方法适用于回归和分类任务,利用最优传输距离优化权重,确保模型更多地从高质量数据中学习。实验表明,LossVal在噪声样本检测和高价值数据点移除等任务上表现优异,具有更低的时间复杂度和更稳定的性能。论文及代码已开源,为数据价值评估提供了高效的新途径。
82 13
LossVal:一种集成于损失函数的高效数据价值评估方法
|
1月前
|
人工智能 安全 Dubbo
Spring AI 智能体通过 MCP 集成本地文件数据
MCP 作为一款开放协议,直接规范了应用程序如何向 LLM 提供上下文。MCP 就像是面向 AI 应用程序的 USB-C 端口,正如 USB-C 提供了一种将设备连接到各种外围设备和配件的标准化方式一样,MCP 提供了一个将 AI 模型连接到不同数据源和工具的标准化方法。
|
3月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
119 5
|
2月前
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL的数据文件
PostgreSQL的物理存储结构主要包括数据文件、日志文件等。数据文件按oid命名,超过1G时自动拆分。通过查询数据库和表的oid,可定位到具体的数据文件。例如,查询数据库oid后,再查询特定表的oid及relfilenode,即可找到该表对应的数据文件位置。
121 1
|
3月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
91 1
|
4月前
|
人工智能 自然语言处理 关系型数据库
阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成
近日,阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成。
|
6月前
|
Java 测试技术 容器
从零到英雄:Struts 2 最佳实践——你的Web应用开发超级变身指南!
【8月更文挑战第31天】《Struts 2 最佳实践:从设计到部署的全流程指南》深入介绍如何利用 Struts 2 框架从项目设计到部署的全流程。从初始化配置到采用 MVC 设计模式,再到性能优化与测试,本书详细讲解了如何构建高效、稳定的 Web 应用。通过最佳实践和代码示例,帮助读者掌握 Struts 2 的核心功能,并确保应用的安全性和可维护性。无论是在项目初期还是后期运维,本书都是不可或缺的参考指南。
73 0

热门文章

最新文章