环境信息
canal version mysql version
问题描述
操作流程是deployer >> kafka >>logstash >> es
我的logstash配置是 input{ kafka{ bootstrap_servers => ["192.168.10.200:9092"] consumer_threads => 5 auto_offset_reset => "earliest" topics => ["xxx"] codec => "json" } }
output{ elasticsearch{ hosts => ["192.168.10.110:9200"] index => "mysql-json" } }
在es index中出现了很多mysqlType.字段,这些字段实际用途吗?然后有个data字段 内容是表的数据,但是同一个字段中内容分了好几块,如何将这些字段解析成mapping? { "fvlineplus": null, "fkeyarea": "3303", "fproductionno": null, "fgroupprice": null, "fmaterialname": "H749L", }, { "fvlineplus": null, "fkeyarea": "3303", "fproductionno": null, "fgroupprice": null, "fmaterialname": "L537C",
}, { "fvlineplus": null, "fkeyarea": "3303", "fproductionno": null, "fgroupprice": null, "fmaterialname": "J434C", 大概是这样的信息,这种内容是update的数据吗?有没有办法直接合并数据? 我希望数据能展示成在数据库中那样。
原提问者GitHub用户wajika
解决方案
关于mysqlType.字段:这些字段是由canal传递给logstash的元数据,指示字段的数据类型和其他属性。在es中索引这些字段通常不需要,您可以在logstash中使用mutate过滤器删除它们。
关于data字段:数据字段的内容可能分为多个块,因为它们是由canal传递并由logstash接收的。您可以使用logstash中的json插件解析数据字段并将其转换为合适的数据格式,然后将其发送到elasticsearch。
更新数据的问题:如果数据字段包含更新数据,您可以在elasticsearch中使用upsert操作将其与原始记录合并,或者使用logstash中的aggregate过滤器在单个事件中合并更新数据。具体取决于您的要求和数据量。
数据展示问题:如果您希望在es中显示与数据库中相同的结构,可以使用logstash中的mutate和rename过滤器更改字段名称和结构。这可以根据您的要求进行自定义。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。