前文作者讲述了BottledWater-PG安装部署,并在pg中实现了数据改变,向kafka发送消息的案例,详细参考《BottledWater-PG:PostgreSQL集成Kafka的实时数据交换平台》。此前作者写过一篇pg的异步消息实现的实时地图应用案例《postgres+socket.io+nodejs实时地图应用实践》,本文将改用BottledWater-PG实现一遍。
一 服务器端
var fs = require('fs');
var http = require('http');
var socket = require('socket.io');
var Kafka = require('node-rdkafka');
var server = http.createServer(function(req, res) {
res.writeHead(200, { 'Content-type': 'text/html'});
res.end(fs.readFileSync(__dirname + '/index.html'));
}).listen(8081, function() {
console.log('Listening at: http://localhost:8081');
});
//注册socket.io
var socketio=socket.listen(server);
socketio.on('connection', function (socketclient) {
console.log('已连接socket:');
//socketclient.broadcast.emit('GPSCoor', data.payload);//广播给别人
//socketclient.emit('GPSCoor', data.payload);//广播给自己
});
var consumer = new Kafka.KafkaConsumer({
//'debug': 'all',
'metadata.broker.list': '192.168.43.27:9092',
'group.id': 'node-rdkafka-consumer-flow-example',
'enable.auto.commit': false
});
var topicName = 'gps';
//logging debug messages, if debug is enabled
consumer.on('event.log', function(log) {
console.log(log);
});
//打印错误
consumer.on('error', function(err) {
console.error('Error from consumer');
console.error(err);
});
consumer.on('ready', function(arg) {
console.log('consumer ready.' + JSON.stringify(arg));
consumer.subscribe([topicName]);
//准备消费消息
consumer.consume();
});
consumer.on('data', function(m) {
console.log(m);
let _data;
if(m.value==null)//delete操作发送来的消息
{
_data=JSON.parse(m.key);
_data.tg_op='delete';
}
else{
_data=m.value.toString();
_data=JSON.parse(_data);
}
console.log(_data);
socketio.emit('GPSCoor', _data);//广播给所有的客户端
});
consumer.on('disconnected', function(arg) {
console.log('consumer disconnected. ' + JSON.stringify(arg));
});
//启动
consumer.connect();
二 客户端
<html>
<head>
<meta charset='utf-8'>
<title>实时地图应用</title>
<link rel="stylesheet" href="http://openlayers.org/en/v3.18.2/css/ol.css" type="text/css">
<script src="http://openlayers.org/en/v3.18.2/build/ol.js"></script>
<script src="/socket.io/socket.io.js"></script>
<script>
var wktform=new ol.format.WKT();//wkt解析
var gpsSource=new ol.source.Vector();
function init(){
var gpsLayer=new ol.layer.Vector({
source:gpsSource,
style:new ol.style.Style({
image: new ol.style.Icon(({
anchor: [0.5, 1],
src: 'http://openlayers.org/en/v3.18.2/examples/data/icon.png'
}))
})
});
var map = new ol.Map({
layers : [
new ol.layer.Tile({
title : '街道图',
visible : true,
source : new ol.source.XYZ({
url : 'http://www.google.cn/maps/vt?pb=!1m5!1m4!1i{z}!2i{x}!3i{y}!4i256!2m3!1e0!2sm!3i342009817!3m9!2szh-CN!3sCN!5e18!12m1!1e47!12m3!1e37!2m1!1ssmartmaps!4e0&token=32965'
})
}),
gpsLayer
],
target : 'map',
controls : ol.control.defaults({
attributionOptions :
({
collapsible : false
})
}),
view : new ol.View({
center : [0, 0],
zoom : 2
})
});
var iosocket = io.connect();
//接受服务端消息
iosocket.on('GPSCoor', function(data) {
console.log(data);
var id=data.id.int;
var feature;
if(data.tg_op=='delete'){
feature=gpsSource.getFeatureById(id);
if(feature)
gpsSource.removeFeature(feature);//删除点
}
else{
var geom=data.geom.string;
geom=wktform.readGeometry(geom);
geom.transform('EPSG:4326','EPSG:3857');
feature=gpsSource.getFeatureById(id);
if(feature)
feature.setGeometry(geom);//修改已有点
else{
feature=new ol.Feature({
geometry:geom
});
feature.setId(id);
gpsSource.addFeature(feature);//地图新增点
}
}
});
}
</script>
</head>
<body onload="init()">
<div id="map"></div>
</body>
</html>
三 测试成果
3.1 新增
mcsas=# insert into gps(name,geom) values ('opy','Point(118 31.5)');
INSERT 0 1
mcsas=# insert into gps(name,geom) values ('ty','Point(117 30.5)');
INSERT 0 1
3.2 修改
mcsas=# update gps set geom='Point(115 40)' where name='opy';
UPDATE 1
3.3 删除
mcsas=# delete from gps where name='opy';
DELETE 1
四 总结
BottledWater-PG主要作用是将pg库中的表的增删改的消息都发往了kafka,应用程序并没有直接连接数据库,而是直接去消费kafka的消息。在表发生insert,update,delete能获取消息,但是truncate table并未向kafka生成消息,不知是否是我哪里遗漏。
作者之前曾使用pg自带的notify与listen实现异步消息发送,该方法借助了表的触发器实现。应用程序是直连数据库且数据增删改都会走触发器。
匆忙中,作者并未对比两者之间孰优孰劣,但一个直连库,一个间接消费,在不同需求中可选择一个比较符合要求的方案而加以应用。