Elastic实战:logstash将kafka数据同步到es时,如何将字符串型时间字段转换为时间戳

简介: 今天群里有同学问如何将字符串型的时间字段转换为long类型的时间戳。特记录下供后续参考。原问题: > 我接收数据方传过来的数据,其中有个时间类型是字符串类型,格式为:yyyy-MM-dd hh:mm:ss,我需要转成时间戳保存,我按照网上的方法试了好多种都无法成功转换。> 数据方把数据发到kafka,我用logstash读kafka,经过处理存到es

0. 引言

今天群里有同学问如何将字符串型的时间字段转换为long类型的时间戳。特记录下供后续参考。
原问题:

我接收数据方传过来的数据,其中有个时间类型是字符串类型,格式为:yyyy-MM-dd hh:mm:ss,我需要转成时间戳保存,我按照网上的方法试了好多种都无法成功转换。
数据方把数据发到kafka,我用logstash读kafka,经过处理存到es

1. 思路

看到这个问题,首先的反应过来的是这是一个数据入库前的处理需求,所以很明显我们可以借助es的pipeline来解决这个问题。

但核心的问题在于字符串转时间,然后获取时间的时间戳,pipeline默认是使用painless语法的,那就要去painless官方文档看看时间转换的方法了。

2. 解决

1、因为painless是类java语法的,所以我的第一反应是看看这个需求用java如何书写
java实现如下

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timestamp = sdf.parse("2021-01-11 00:00:00").getTime();

2、下面到painless官方文档中找找是否有SimpleDateFormat类
直接在share-api页面查找SimpleDateFormat

发现是有这个类的,并且也有parse方法
SimpleDateFormat
在这里插入图片描述
3、同时再看看Date类,很容易也找到了getTime方法
[Date](https://www.elastic.co/guide/en/elasticsearch/painless/7.13/painless-api-reference-shared-java-util.html#painless-api-reference-shared-Date
)
在这里插入图片描述
4、于是我们就可以书写pipeline了

PUT _ingest/pipeline/string_to_datelong
{
  "description": "",
  "processors": [
    {
      "script": {
        "lang": "painless",
        "source": """
          SimpleDateFormat sdf = new SimpleDateFormat('yyyy-MM-dd HH:mm:ss');
         ctx.date_long = sdf.parse(ctx.date).getTime(); 
        """
      }
    }
  ]
}

5、在索引中引用pipeline

PUT date_index
{
  "mappings": {
    "properties": {
      "date": {
        "type": "keyword"
      },
      "date_long": {
        "type": "long"
      }
    }
  },
  "settings": {
    "default_pipeline": "string_to_datelong"
  }
}

6、插入一条数据,看看效果

PUT date_index/_doc/1
{
  "date": "2021-01-01 00:00:00"
}

7、可以看到成功转换为时间戳,问题解决

GET date_index/_search

在这里插入图片描述

目录
相关文章
|
4月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
264 7
|
7月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
556 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
7月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
234 12
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
746 5
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
215 3
|
存储 关系型数据库 MySQL
【TiDB原理与实战详解】5、BR 物理备份恢复与Binlog 数据同步~学不会? 不存在的!
BR(Backup & Restore)是 TiDB 分布式备份恢复的命令行工具,适用于大数据量场景,支持常规备份恢复及大规模数据迁移。BR 通过向各 TiKV 节点下发命令执行备份或恢复操作,生成 SST 文件存储数据信息与 `backupmeta` 文件存储元信息。推荐部署配置包括在 PD 节点部署 BR 工具,使用万兆网卡等。本文介绍 BR 的工作原理、部署配置、使用限制及多种备份恢复方式,如全量备份、单库/单表备份、过滤备份及增量备份等。
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
255 8
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
375 7