Elastic实战:logstash将kafka数据同步到es时,如何将字符串型时间字段转换为时间戳

简介: 今天群里有同学问如何将字符串型的时间字段转换为long类型的时间戳。特记录下供后续参考。原问题: > 我接收数据方传过来的数据,其中有个时间类型是字符串类型,格式为:yyyy-MM-dd hh:mm:ss,我需要转成时间戳保存,我按照网上的方法试了好多种都无法成功转换。> 数据方把数据发到kafka,我用logstash读kafka,经过处理存到es

0. 引言

今天群里有同学问如何将字符串型的时间字段转换为long类型的时间戳。特记录下供后续参考。
原问题:

我接收数据方传过来的数据,其中有个时间类型是字符串类型,格式为:yyyy-MM-dd hh:mm:ss,我需要转成时间戳保存,我按照网上的方法试了好多种都无法成功转换。
数据方把数据发到kafka,我用logstash读kafka,经过处理存到es

1. 思路

看到这个问题,首先的反应过来的是这是一个数据入库前的处理需求,所以很明显我们可以借助es的pipeline来解决这个问题。

但核心的问题在于字符串转时间,然后获取时间的时间戳,pipeline默认是使用painless语法的,那就要去painless官方文档看看时间转换的方法了。

2. 解决

1、因为painless是类java语法的,所以我的第一反应是看看这个需求用java如何书写
java实现如下

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timestamp = sdf.parse("2021-01-11 00:00:00").getTime();

2、下面到painless官方文档中找找是否有SimpleDateFormat类
直接在share-api页面查找SimpleDateFormat

发现是有这个类的,并且也有parse方法
SimpleDateFormat
在这里插入图片描述
3、同时再看看Date类,很容易也找到了getTime方法
[Date](https://www.elastic.co/guide/en/elasticsearch/painless/7.13/painless-api-reference-shared-java-util.html#painless-api-reference-shared-Date
)
在这里插入图片描述
4、于是我们就可以书写pipeline了

PUT _ingest/pipeline/string_to_datelong
{
  "description": "",
  "processors": [
    {
      "script": {
        "lang": "painless",
        "source": """
          SimpleDateFormat sdf = new SimpleDateFormat('yyyy-MM-dd HH:mm:ss');
         ctx.date_long = sdf.parse(ctx.date).getTime(); 
        """
      }
    }
  ]
}

5、在索引中引用pipeline

PUT date_index
{
  "mappings": {
    "properties": {
      "date": {
        "type": "keyword"
      },
      "date_long": {
        "type": "long"
      }
    }
  },
  "settings": {
    "default_pipeline": "string_to_datelong"
  }
}

6、插入一条数据,看看效果

PUT date_index/_doc/1
{
  "date": "2021-01-01 00:00:00"
}

7、可以看到成功转换为时间戳,问题解决

GET date_index/_search

在这里插入图片描述

目录
相关文章
|
消息中间件 JSON NoSQL
从 ES Kafka Mongodb Restful ... 取到 json 之后
JSON 是一种广泛使用的数据交换格式,但其计算和处理能力有限。esProc SPL 是一款强大的开源计算引擎,能够高效解析 JSON 数据,并支持复杂的过滤、分组、连接等操作。它不仅兼容多种数据源,如 RESTful、ElasticSearch、MongoDB 和 Kafka,还提供了游标对象处理大数据流,支持与 Java 应用无缝集成,实现灵活的业务逻辑处理。
|
Web App开发 监控 Java
Logstash、Filebeat安装与数据同步(+ES安装讲解)
Logstash、Filebeat安装与数据同步(+ES安装讲解)
425 0
|
存储 关系型数据库 MySQL
【TiDB原理与实战详解】5、BR 物理备份恢复与Binlog 数据同步~学不会? 不存在的!
BR(Backup & Restore)是 TiDB 分布式备份恢复的命令行工具,适用于大数据量场景,支持常规备份恢复及大规模数据迁移。BR 通过向各 TiKV 节点下发命令执行备份或恢复操作,生成 SST 文件存储数据信息与 `backupmeta` 文件存储元信息。推荐部署配置包括在 PD 节点部署 BR 工具,使用万兆网卡等。本文介绍 BR 的工作原理、部署配置、使用限制及多种备份恢复方式,如全量备份、单库/单表备份、过滤备份及增量备份等。
|
消息中间件 监控 Kafka
Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
【8月更文挑战第13天】Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
1356 3
|
关系型数据库 MySQL 调度
【TiDB原理与实战详解】4、DM 迁移和TiCDC数据同步~学不会? 不存在的!
TiDB Data Migration (DM) 和 TiCDC 是两款用于数据库迁移和同步的强大工具。DM 支持将兼容 MySQL 协议的数据库(如 MySQL、MariaDB)的数据异步迁移到 TiDB 中,具备全量和增量数据传输能力,并能合并分库分表的数据。TiCDC 则专注于 TiDB 的增量同步,利用 TiKV 日志实现高可用性和水平扩展,支持多种下游系统和输出格式。两者均可通过 TiUP 工具进行部署与管理,简化了集群的安装、配置及任务管理过程。
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
1864 0
|
canal 关系型数据库 MySQL
四种常用的 MySQL 数据同步 ES 的方法
【2月更文挑战第16天】
4684 2
四种常用的 MySQL 数据同步 ES 的方法
|
消息中间件 算法 Java
面试官:Kafka和ES选主有什么区别?
Kafka 和 ES,作为大数据处理的中间件,分别用于流处理和全文检索。它们的选主(Kafka 的 Controller 和 ES 的 Master)都基于 Raft 算法实现一致性。Raft 算法通过选举确保分布式系统数据一致性,涉及领导者、追随者和候选人间的身份转换。当超过一半的节点投票给同一候选节点时,该节点成为新领导者。Kafka 和 ES 在此基础上可能有各自优化调整。更多关于 Raft 算法的详细流程和选举规则见原文。
290 2
|
SQL JSON DataWorks
DataWorks产品使用合集之DataWorks 数据集成任务中,将数据同步到 Elasticsearch(ES)中,并指定 NESTED 字段中的 properties 类型如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
242 0
|
canal 监控 关系型数据库
【技术选型】Mysql和ES数据同步方案汇总
【技术选型】Mysql和ES数据同步方案汇总
749 0
【技术选型】Mysql和ES数据同步方案汇总