BottledWater-PG:PostgreSQL集成Kafka的实时数据交换平台

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: 一 安装BottledWater-PG的安装前文已经表述,本文不赘述直接进入集成应用阶段。二 启动KafKa#启动zookeeper[root@bogon kafka_2.

一 安装

BottledWater-PG的安装前文已经表述,本文不赘述直接进入集成应用阶段。

二 启动KafKa

#启动zookeeper
[root@bogon kafka_2.11-0.10.2.0]# bin/zookeeper-server-start.sh config/zookeeper.properties
#启动kafka服务端
[root@bogon kafka_2.11-0.10.2.0]# bin/kafka-server-start.sh config/server.properties

三 配置PostgreSQL

3.1 配置读取权限

Bottled Water会连接到postgresql获取相关数据,连接的账户需要有replication权限,pg中数据库的变化存储在WAL中,至少需要replication权限才能读取WAL。
编辑$PGDATA目录中postgresql.conf和pg_hba.conf文件。

vi $PGDATA/postgresql.conf
#编辑内容如下:
listen_addresses = '*'
port = 5432 
wal_level = logical         
max_wal_senders = 8
wal_keep_segments = 4
max_replication_slots = 4
vi $PGDATA/pg_hba.conf
#编辑内容如下:

# IPv4 local connections:
host    all             all             0.0.0.0/0               md5
# replication privilege.
local   replication     freerep                                trust
host    replication     freerep        127.0.0.1/32            trust
host    replication     freerep        ::1/128                 trust

编辑完保存,重启数据库服务:

pg_ctl restart
psql
postgres=# CREATE ROLE freerep  WITH REPLICATION PASSWORD 'password' LOGIN;
CREATE ROLE

配置完毕!

3.2 Bottled Water使用演示

3.2.1 创建测试库表

创建一个测试数据库,建立测试表

postgres=# create database mcsas;
postgres=# \c mcsas;
mcsas=# create extension bottledwater;
mcsas=# create extension postgis;
#赋予public下的表给freerep角色,要创建如下语句,否则建立的表freerep没有读取权限
mcsas=# alter default privileges in schema public grant all on tables to freerep;
mcsas=# create table gps(gid serial primary key,name text,geom text);
mcsas=# create index gps_geom_idx on gps using gist(ST_GeomFromText(geom,4326));

在另一个终端启动bottledwater可执行程序:

source /home/postgres/.bashrc
cd /opt/bottledwater-pg-master/kafka
[root@localhost kafka]# ./bottledwater -d postgres://freerep:password@127.0.0.1/mcsas -b 192.168.43.27:9092 -f json

启动结果如下:

[root@bogon kafka]# ./bottledwater -d postgres://freerep:password@127.0.0.1/mcsas -b 192.168.43.27:9092 -f json
[INFO] Writing messages to Kafka in JSON format
[INFO] Created replication slot "bottledwater", capturing consistent snapshot "0000DA72-1".
INFO:  bottledwater_export: Table public.spatial_ref_sys is keyed by index spatial_ref_sys_pkey
INFO:  bottledwater_export: Table public.mark is keyed by index mark_pkey
[INFO] Registering metadata for table spatial_ref_sys (relid 24263)
[INFO] Opening Kafka topic "spatial_ref_sys" for table "spatial_ref_sys"
[INFO] Storing key schema for table 24263
[INFO] Storing row schema for table 24263
[INFO] Snapshot complete, streaming changes from 0/AB016F30.

代表启动成功了。

3.2.2 监听数据改变消息

插入数据

mcsas=# insert into gps(name,geom) values ('china','Point(118 32)');
INSERT 0 1
mcsas=# insert into gps(name,geom) values ('england','Point(118 12)');
INSERT 0 1

启动监听topic

bin/kafka-console-consumer.sh --bootstrap-server 192.168.43.27:9092 --topic gps --from-beginning
{"gid": {"int": 1}, "name": {"string": "china"}, "geom": {"string": "Point(118 32)"}}
{"gid": {"int": 2}, "name": {"string": "england"}, "geom": {"string": "Point(118 12)"}}

每当插入或者更新,收听的消息会源源不断的输出出来,这样,pg与kafka集成就完毕了。

相关文章
|
17天前
|
JSON API 数据处理
Winform管理系统新飞跃:无缝集成SqlSugar与Web API,实现数据云端同步的革新之路!
【8月更文挑战第3天】在企业应用开发中,常需将Winform桌面应用扩展至支持Web API调用,实现数据云端同步。本文通过实例展示如何在已有SqlSugar为基础的Winform系统中集成HTTP客户端调用Web API。采用.NET的`HttpClient`处理请求,支持异步操作。示例包括创建HTTP辅助类封装请求逻辑及在Winform界面调用API更新UI。此外,还讨论了跨域与安全性的处理策略。这种方法提高了系统的灵活性与扩展性,便于未来的技术演进。
77 2
|
29天前
|
SQL 运维 监控
SLS 数据加工全面升级,集成 SPL 语法
在系统开发、运维过程中,日志是最重要的信息之一,其最大的优点是简单直接。SLS 数据加工功能旨在解决非结构化的日志数据处理,当前全面升级,集成 SPL 语言、更强的数据处理性能、更优的使用成本。
18045 135
|
4天前
|
Java API 数据中心
百炼平台Java 集成API上传文档到数据中心并添加索引
本文主要演示阿里云百炼产品,如何通过API实现数据中心文档的上传和索引的添加。
|
5天前
|
JSON 数据管理 关系型数据库
【Dataphin V3.9】颠覆你的数据管理体验!API数据源接入与集成优化,如何让企业轻松驾驭海量异构数据,实现数据价值最大化?全面解析、实战案例、专业指导,带你解锁数据整合新技能!
【8月更文挑战第15天】随着大数据技术的发展,企业对数据处理的需求不断增长。Dataphin V3.9 版本提供更灵活的数据源接入和高效 API 集成能力,支持 MySQL、Oracle、Hive 等多种数据源,增强 RESTful 和 SOAP API 支持,简化外部数据服务集成。例如,可轻松从 RESTful API 获取销售数据并存储分析。此外,Dataphin V3.9 还提供数据同步工具和丰富的数据治理功能,确保数据质量和一致性,助力企业最大化数据价值。
19 1
|
5天前
|
开发框架 .NET 数据库连接
闲话 Asp.Net Core 数据校验(三)EF Core 集成 FluentValidation 校验数据例子
闲话 Asp.Net Core 数据校验(三)EF Core 集成 FluentValidation 校验数据例子
|
10天前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
31 3
|
1月前
|
前端开发 JavaScript API
【Django+Vue3 线上教育平台项目实战】构建课程详情页与集成视频播放功能
随着数字化教育的兴起,构建一个高效、用户友好的线上教育平台至关重要。本文将探讨如何使用Django与Vue.js 3结合,实现一个包含课程列表和课程详情页(含视频播放功能)的线上教育平台部分。本文主要介绍了如何设计数据库模型、处理数据查询、构建动态前端界面,并集成视频播放功能,为用户带来流畅的学习体验。
【Django+Vue3 线上教育平台项目实战】构建课程详情页与集成视频播放功能
|
15天前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
|
15天前
|
人工智能 Java API
JeecgBoot 低代码平台快速集成 Spring AI
Spring 通过 Spring AI 项目正式启用了 AI(人工智能)生成提示功能。本文将带你了解如何在 Jeecg Boot 应用中集成生成式 AI,以及 Spring AI 如何与模型互动,包含 RAG 功能。
52 3
|
22天前
|
消息中间件 存储 Kafka
kafka 在 zookeeper 中保存的数据内容
kafka 在 zookeeper 中保存的数据内容
32 3