Reindex API — Elastic Stack 实战手册

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 将文档从源索引复制到目的地索引,称之为 Reindex。

970X90.png

· 更多精彩内容,请下载阅读全本《Elastic Stack实战手册》

· 加入创作人行列,一起交流碰撞,参与技术圈年度盛事吧

创作人:杨松柏

什么是 Reindex

将文档从源索引复制到目的地索引,称之为 Reindex。

在 Reindex 时可以进行数据的丰富、缩减以及字段的变更等。Reindex 可以简单的理解为 Scroll+Bulk_Insert。source 和 dest 都可以是已存在的索引、索引别名或数据流(Data Stream)。

此外,使用 Reindex 需要注意以下几点:

  • 源和目的不能相同,比如不能将数据流 Reindex 给它自身
  • 源索引的文档中 _source 字段必须开启。
  • Reindex不会复制源的 setting 和源所匹配的模板,因此在调用 _reindex 前,你需要设置好目的索引 (action.auto_create_index 为 false 或者 -.* 时)。
  • 目标索引的 mapping,主分片数,副本数等推荐提前配置。

Reindex的主要场景:

  • 集群升级:将数据从旧集群远程 Reindex 到新集群
  • 索引备份
  • 数据重构

前置要求


如果 Elasticsearch 集群配置了安全策略和权限策略,则进行 Reindex 必须拥有以下权限:

  • 读取源的数据流、索引、索引别名等索引级别权限
  • 对于目的数据流、索引、索引别名的写权限
  • 如果需要使用 Reindex API 自动创建数据流和索引,则必须拥有对目的数据流、索引、索引别名的 auto_configure、create_index 或者 manage 等索引级别权限。
  • 如果源为远程的集群,则 source.remote.user 用户必须拥有集群监控权限,和读取源索引、源索引别名、源数据流的权限。

如果 Reindex 的源为远程集群,必须在当前集群的请求节点 elasticsearch.yml 文件配置远程白名单 reindex.remote.whitelist。

自动创建数据流,需要提前配置好数据流的匹配索引模板,

详情可参看 Set up a data stream: https://www.elastic.co/guide/en/elasticsearch/reference/7.11/set-up-a-data-stream.html

API 介绍

RESTful API

POST /_reindex

Query parameters

refresh

可选参数,枚举类型 (true,false,wait_for),默认值为 false。

如果设置为 true, Elasticsearch 刷新受当前操作影响的数据,能够被立即搜索(即立即刷新,但是会对 Elasticsearch 的性能有一定的影响)。如果为 wait_for,则等待刷新以使当前操作对搜索可见,等待时间为默认为 1s(index.refresh_interval)。如果为 false,本次请求不执行刷新。

timeout

可选参数,时间值(time units),默认值为 1 分钟;每个索引周期中等待索引自动创建、动态映射更新,和等待活跃健康分片等的时间。该参数可以确保 Elasticsearch 在失败之前,基本等待的超时时间。实际等待时间可能更长,特别是在发生多个等待时。

wait_for_active_shards

可选参数,参数类型 string,默认值为 1(即只要一个分片处于活跃就可以执行该操作)。在执行 Reindex 之前索引必须处于活动状态的分片副本数,可以设置为 all 或者小于 number_of_replicas+1 的任何正整数,比如你的索引主分片数目为 3,副本设置为 2,那么可以设置的最大正整数为 3,即副本份数加 1 (主分片)。

#因为实操集群只有三个节点,如下索引将会出现副本分片无法分配,
#index.routing.allocation.total_shards_per_node
#控制每个该索引只允许每个节点分配一个分片
PUT reindex_index-name-2
{
  "settings" :{
    "index" :{
      "number_of_shards" : "3",
      "number_of_replicas" : "2"
    },
    "index.routing.allocation.total_shards_per_node":1
  }
}
#插入一条数据
PUT reindex_index-name-1/_bulk
{ "index":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }

#重建索引
POST _reindex?wait_for_active_shards=2&timeout=5s
{
  "source": {
    "index": "reindex_index-name-1"
  },
  "dest": {
    "index": "reindex_index-name-2"
  }
}

由于reindex_index-name-2只有主分片分配成功,所以上面的_reindex将失败

{
  "took" : 5002,
  "timed_out" : false,
  "total" : 1,
  "updated" : 0,
  "created" : 0,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 0,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [
    {
      "index" : "reindex_index-name-2",
      "type" : "_doc",
      "id" : "U_i8VnkBYYHWy1KlBJjc",
      "cause" : {
        "type" : "unavailable_shards_exception",
        "reason" : "[reindex_index-name-2][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [5s], request: [BulkShardRequest [[reindex_index-name-2][0]] containing [index {[reindex_index-name-2][_doc][U_i8VnkBYYHWy1KlBJjc], source[{ \"@timestamp\": \"2099-05-06T16:21:15.000Z\", \"message\": \"192.0.2.42 - - [06/May/2099:16:21:15 +0000] \\\"GET /images/bg.jpg HTTP/1.0\\\" 200 24736\" }]}]]"
      },
      "status" : 503
    }
  ]
}

wait_for_completion

可选参数,参数类型Boolean,默认为true。如果为true,则请求为阻塞同步方式,请求会等到操作完成才返回。

requests_per_second

可选参数,参数类型integer,默认为 -1(不进行限制);限制请求的每秒子请求数。

require_alias

可选参数,参数类型Boolean,默认为true。如果为truedest.index必须为索引别名。

scroll

可选参数,参数类型为时间类型( time units),指定滚动搜索时索引的一致视图应保持多长时间。

slices

可选参数,参数类型integer,默认值为1(不切分成多个子任务);该参数表示将一个任务切分成多少个子任务,并行执行。

max_docs

可选参数,参数类型integer,默认值为对应索引的所有文档;要处理的最大文档数。

Request Body

conflicts

可选参数,参数类型枚举类型,默认为abort;设置为proceed,即使发生文档冲突也继续reindexing

source

  • index

必填参数,参数类型 string;值可以为数据流、索引名字、索引别名,如果源有多个,也可以接受逗号分隔的数据源数组。

  • max_docs

    可选参数,参数类型integer;要被重新索引的最大文档数目。

  • query

    可选参数,参数类型查询对象(query object),按查询 DSL 过滤需要重新索引的文档。

  • remote

    remote 的子参数可接受如下:

参数 是否必填 类型 说明
host string 索引 pattern 名源索引所在远程 ES 集群中任一节点地址;如果是需要从远程 集群复制数据,则该参数必填。
username string 与远程主机进行身份验证的用户名;当远程集群需要认证时必填。
password string 与远程主机进行身份验证的密码;当远程集群需要认证时必填。
socket_timeout 时间类型 默认为 30 秒;远程套接字读取超时。
connect_timeout 时间类型 默认为 30 秒;远程连接超时时间。
  • size

可选参数,参数类型 integer; 每批要索引的文档数(批处理),在远程索引时确保批处理能够放在堆上缓冲区,缓冲区的默认大小为100 MB。

  • slice

slice的子参数可接受如下:

参数 是否必填 类型 说明
id integer 进行手动切片时的,设置的切片 id
max integer 切片总数。
  • sort

可选参数,参数类型 list;以逗号分隔的 : 对列表(比如name:desc),用于在获取源索引文档时,按照 sort 中字段值的排序要求进行排序。通常与max_docs参数结合使用,以控制哪些文档需要被重新索引。

注意:sort 参数在 7.6 版本已经被标注弃用,不建议在 Reindex 中进行排序。 Reindex 中的排序不能保证按顺序索引文档,并阻止 Reindex 的进一步发展,如恢复能力和性能改进。如果与结合使用max_docs,请考虑改为使用查询过滤器。

  • _source

可选参数,参数类型string,默认值为 true。该参数可以用于选择文档中哪些字段需要进行重新索引。如果设置为 true,将会重索引文档中的所有字段。

dest

  • index

必填参数,参数类型 string;该参数表示目的地的表,值可以为数据流、索引名字、索引别名。

  • version_type

可选参数,参数类型枚举;用于索引操作的版本控制;枚举值包括:internalexternalexternal_gtexternal_gte

详情参看 Version types : https://www.elastic.co/guide/en/elasticsearch/reference/7.11/docs-index_.html#index-version-types
  • op_type

可选参数,参数类型枚举,默认为 index,枚举值包括:indexcreate;如果设置为create,则目标索引不存在该文档就创建(可用于reindex续传补偿)。注意:如果dest是数据流,必须设置为create,因为数据流只做append

  • type

可选参数,参数类型string,默认值为_doc;被重建索引的文档中文档类型;注意:该参数在 Elasticsearch 6 版本中已经标记弃用,已经没有任何实际意义。

script

  • source

可选参数,参数类型 string;重新索引时用于更新文档 source 或元数据的脚本 .

  • lang

可选参数,参数类型枚举;支持的脚本语言:painless, expression, mustache, java

更多脚本语言,请参考 Scripting: https://www.elastic.co/guide/en/elasticsearch/reference/7.11/modules-scripting.html

Response Body

执行_reindex时的,响应体参数释意:

字段 类型 说明
took integer 整个操作花费的总毫秒数
timed_out Boolean 如果在重新索引期间出现的任何请求超时,则此标志设置为 true。
total integer 成功处理的文档数
updated integer 已成功更新的文档数,即重新索引的文档,在 dest 索引中存在具有相同 ID 的文档,并且更新成功的
created integer 成功创建的文档数
deleted integer 成功删除的文档数
batches integer 由重新索引回调的滚动响应数
noops integer 由于重新索引的脚本为 ctx.op 返回 noop 值而被忽略的文档数
version_conflicts integer 重新索引命中的版本冲突数
retries integer 重索引尝试的重试次数;bulk 是重试的批量操作数,search 是重试的搜索操作数
throttled_millis integer 请求休眠以符合 requests_per_second
requests_per_second integer 在重新索引期间每秒有效执行的请求数
throttled_until_millis integer 此字段在 _reindex 响应中应始终等于零;该参数只有在使用任务 API(Task API)时才有意义,在任务 API 中,它指示下次再次执行限制请求的时间,以便符合每秒的请求数
failures 数组 如果进程中有任何不可恢复的错误,则返回失败数组。如果数组不为空,那么请求会因为这些失败而中止。重新索引是使用批处理实现的,任何失败都会导致整个进程中止,但当前批处理中的所有失败都会收集到数组中。你可以使用 conflicts 参数,避免因为版本冲突而中止重建索引 Reindex 的一些技巧

异步执行 Reindex

如果请求的查询参数wait_for_completion设置为false,Elasticsearch 将会执行一些预检查,然后发起一个task,来运行你的 Reindex 任务,并立即返回你一个taskid,然后你可以通过这个taskid,去查看任务的运行结果,运行结果记录在系统索引.tasks;如果任务执行完成,你可以删除掉该文档,以使 Elasticsearch 释放空间。

#异步执行 reindex 任务
POST _reindex?wait_for_completion=false
{
  "source": {
    "index": "reindex_index-name-1"
  },
  "dest": {
    "index": "reindex_index-name-2"
  }
}

#返回立即返回的任务 id
{
  "task" : "ydZx8i8HQBe69T4vbYm30g:20987804"
}

#查看任务的运行情况
GET _tasks/ydZx8i8HQBe69T4vbYm30g:20987804
#response返回结果
{
  "completed" : true,
  "task" : {
    "node" : "ydZx8i8HQBe69T4vbYm30g",
    "id" : 20987804,
    "type" : "transport",
    "action" : "indices:data/write/reindex",
    "status" : {
      "total" : 2,
      "updated" : 0,
      "created" : 2,
      "deleted" : 0,
      "batches" : 1,
      "version_conflicts" : 0,
      "noops" : 0,
      "retries" : {
        "bulk" : 0,
        "search" : 0
      },
      "throttled_millis" : 0,
      "requests_per_second" : -1.0,
      "throttled_until_millis" : 0
    },
    "description" : "reindex from [reindex_index-name-1] to [reindex_index-name-2][_doc]",
    "start_time_in_millis" : 1620539345400,
    "running_time_in_nanos" : 84854825,
    "cancellable" : true,
    "headers" : { }
  },
  "response" : {
    "took" : 82,
    "timed_out" : false,
    "total" : 2,
    "updated" : 0,
    "created" : 2,
    "deleted" : 0,
    "batches" : 1,
    "version_conflicts" : 0,
    "noops" : 0,
    "retries" : {
      "bulk" : 0,
      "search" : 0
    },
    "throttled" : "0s",
    "throttled_millis" : 0,
    "requests_per_second" : -1.0,
    "throttled_until" : "0s",
    "throttled_until_millis" : 0,
    "failures" : [ ]
  }
}

#实际任务运行结果会记录在 .tasks 索引
GET  .tasks/_doc/ydZx8i8HQBe69T4vbYm30g:20987804
#返回值如下
{
  "_index" : ".tasks",
  "_type" : "_doc",
  "_id" : "ydZx8i8HQBe69T4vbYm30g:20987804",
  "_version" : 1,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "completed" : true,
    "task" : {
      "node" : "ydZx8i8HQBe69T4vbYm30g",
      "id" : 20987804,
      "type" : "transport",
      "action" : "indices:data/write/reindex",
      "status" : {
        "total" : 2,
        "updated" : 0,
        "created" : 2,
        "deleted" : 0,
        "batches" : 1,
        "version_conflicts" : 0,
        "noops" : 0,
        "retries" : {
          "bulk" : 0,
          "search" : 0
        },
        "throttled_millis" : 0,
        "requests_per_second" : -1.0,
        "throttled_until_millis" : 0
      },
      "description" : "reindex from [reindex_index-name-1] to [reindex_index-name-2][_doc]",
      "start_time_in_millis" : 1620539345400,
      "running_time_in_nanos" : 84854825,
      "cancellable" : true,
      "headers" : { }
    },
    "response" : {
      "took" : 82,
      "timed_out" : false,
      "total" : 2,
      "updated" : 0,
      "created" : 2,
      "deleted" : 0,
      "batches" : 1,
      "version_conflicts" : 0,
      "noops" : 0,
      "retries" : {
        "bulk" : 0,
        "search" : 0
      },
      "throttled" : "0s",
      "throttled_millis" : 0,
      "requests_per_second" : -1.0,
      "throttled_until" : "0s",
      "throttled_until_millis" : 0,
      "failures" : [ ]
    }
  }
}

多源重建索引

如果有许多源需要重新索引,通常最好一次 Reindex 一个源的索引,而不是使用glob模式来选取多个源。这样如果出现任何的错误,你可以删除有问题部分,然后选择特定的源重新索引(destop_type可以设置为create只重索引缺失的文档);另外一个好处,你可以并行运行这些reindex任务。

#!/bin/bash
for index in i1 i2 i3 i4 i5; do
  curl -H Content-Type:application/json -XPOST localhost:9200/_reindex?pretty -d'{
    "source": {
      "index": "'$index'"
    },
    "dest": {
      "index": "'$index'-reindexed"
    }
  }'
done

对 Reindex 限流

设置requests_per_second为任意的正十进制数(如 1.4,6,...1000等),以限制批量操作_reindex索引的速率。通过在每个批处理中,设置等待时间来限制请求;可以通过设置 requests_per_second=-1,来关闭限流操作。

限流是通过在每个批处理之间设置等待时间,因此 _reindex 在内部使用 scroll 的超时时间,应当将这个等待时间考虑进去。等待时间=批大小/requests_per_second - 批写入耗时;默认情况下,批处理大小为 1000,因此如果 requests_per_second 设置为500:

target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - 0.5 seconds = 1.5 seconds

由于批处理是作为单个_bulk请求发出的,因此较大的批处理大小,会导致 Elasticsearch 创建许多请求,然后等待一段时间,再开始下一组请求;这种情况可能会造成 Elasticsearch 周期性的抖动。

动态调整限流

可以使用_rethrottle API 在正在运行的重新索引上更改requests_per_second的值:

POST _reindex/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1

taskid可以通过 task API 进行获取。重新调整requests_per_second,如果是加快查询速度则可以立即生效,如果是降低查询速度,则需要在完成当前批处理后生效,这样可以避免 scroll 超时。

切片

Reindex 支持切片 scroll 以并行化重新索引过程,从而提高 Reindex 的效率。

注意:如果源索引是在远程的 Elasticsearch 集群,是不支持手动或自动切片的。

手动切片

通过为每个请求提供切片 ID 和切片总数。

示例如下:

POST _reindex
{
  "source": {
    "index": "my-index-000001",
    "slice": {
      "id": 0,
      "max": 2
    }
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

POST _reindex
{
  "source": {
    "index": "my-index-000001",
    "slice": {
      "id": 1,
      "max": 2
    }
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

可以通过以下方式验证此功能

#避免还没有形成 segments,文档不可见
GET _refresh
#查看文档的个数
GET my-new-index-000001/_count
#或者
POST my-new-index-000001/_search?size=0&filter_path=hits.total

返回结果如下

{
  "hits": {
    "total" : {
        "value": 120,
        "relation": "eq"
    }
  }
}

自动切片

还可以使用 Sliced scroll 基于文档_id进行切片,让_reindex自动并行化;通过设定 slices 参数的值来实现。

示例如下

POST _reindex?slices=5&refresh
{
  "source": {
    "index": "my-index-000001"
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

可以通过以下方式验证此功能

POST my-new-index-000001/_search?size=0&filter_path=hits.total

返回结果如下

{
  "hits": {
    "total" : {
        "value": 120,
        "relation": "eq"
    }
  }
}

设置slicesauto会让 Elasticsearch 选择要使用的切片数。此设置将每个分片使用一个切片,直到达到某个限制。如果有多个源,它将基于分片数量最少的索引或 Backing index 确定切片数。

POST _reindex?slices=auto&refresh
{
  "source": {
    "index": "my-index-000001"
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}
# 由源码可知,slices 实际被修改为0
 if (slicesString.equals(AbstractBulkByScrollRequest.AUTO_SLICES_VALUE)) {
            return AbstractBulkByScrollRequest.AUTO_SLICES;
        }
public static final int AUTO_SLICES = 0;
public static final String AUTO_SLICES_VALUE = "auto";

_reindex 中添加 slices,将会自动完成上面手动切片创建的请求;自动切片创建的子请求有些不一样的特征:

  1. 你可以使用 Tasks APIs 查看这些子请求,这些子请求是带有slices请求任务的”子”任务。
  2. 获取带有参数slices请求的任务状态,将只返回包含已完成切片的状态
  3. 可以分别对这些子请求,进行任务取消或者重新调节限速
  4. 重新调速带有slices参数的请求,将会按比例调速它的子任务。
  5. 取消带有slices参数的请求,将会取消它的所有子任务。
  6. 由于切片的性质,可能会每个切片的文档并不均匀,会出现某些切片某些切片可能比其他切片大;但是所有文档都会被划分到某个切片中。
  7. slices 请求如果含有requests_per_secondmax_docs,将会按比例分配给每个子请求。结合上面关于分布不均匀的观点,将max_docs与切片一起使用可能会出现满足条件的max_docs文档不被重新索引。
  8. 尽管这些快照几乎都是在同一时间获取,但是每个子请求,可能获取的源快照会有稍微的不同。

合理选择切片数目

自动切片,设置slicesauto,将为大多数索引选择一个合理切片数目。如果手动切片或以其他方式调整自动切片,应当明白以下几个点:

  1. slices的数目等于索引的数目时,查询性能是最优的。设置 slices高于分片数通常不会提高效率,反而会增加开销(CPU,磁盘 IO 等)。
  2. 索引性能会在可用资源与切片数之间线性地缩放
  3. 查询或索引性能在运行时是否占主导地位,取决于重新索引的文档和集群资源。

重新索引的路由

默认情况下,如果_reindex看到一个带有路由的文档,则路由将被保留,除非它被脚本更改。可以在dest的 JSON 体内上重新设置routing,从而改变之前的路由值,routing的可取值如下:

keep

keep为默认值,将为每个匹配项发送的批量请求的路由,设置为匹配项旧的路由(就保持旧的路由方式)。

discard

将发送的批量请求的路由设置为null

=

将发送的批量请求的路由设置为=之后的值

示例,使用以下请求将source_index中公司名称为cat的所有文档复制到source_index且路由设置为cat

POST _reindex
{
  "source": {
    "index": "source_index",
    "query": {
      "match": {
        "company": "cat"
      }
    }
  },
  "dest": {
    "index": "dest_index",
    "routing": "=cat"
  }
}

默认情况下,_reindex使用滚动批处理1000。您可以使用元素中的size字段更改批处理大小:

POST _reindex
{
  "source": {
    "index": "source_index",
    "size": 100
  },
  "dest": {
    "index": "dest_index",
    "routing": "=cat"
  }
}

重索引使用预处理 Pipeline

重索引也可以使用ingest pipeline的特性,来富化数据;示列如下:

POST _reindex
{
  "source": {
    "index": "source"
  },
  "dest": {
    "index": "dest",
     "pipeline": "some_ingest_pipeline" #提前定义的 pipeline
  }
}

实战示例

基于查询重新索引文档

可以通过在source中添加查询条件,对有需要的文档进行重新索引;

例如,复制user.id值为kimchy文档到my-new-index-000001

POST _reindex
{
  "source": {
    "index": "my-index-000001",
    "query": {
      "term": {
        "user.id": "kimchy"
      }
    }
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

基于 max_docs 重新索引文档

通过在请求体中设置max_docs参数,控制重建索引的文档的个数。

例如:从my-index-000001复制一个文档到my-new-index-000001

POST _reindex
{
  "max_docs": 1,
  "source": {
    "index": "my-index-000001"
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

基于多源重新索引

sourceindex属性值可以是一个 list;这样可以允许复制多个源的文档到目的数据流或索引,但是需要注意多个源的文档中字段的类型必须一致。

例如复制my-index-000001my-index-000002索引的文档:

POST _reindex
{
  "source": {
    "index": ["my-index-000001", "my-index-000002"]
  },
  "dest": {
    "index": "my-new-index-000002"
  }
}

选择字段重新索引

只重新索引每个文档筛选的字段;

例如,以下请求仅重新索引每个文档的user.id_doc字段:

POST _reindex
{
  "source": {
    "index": "my-index-000001",
    "_source": ["user.id", "_doc"]
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

通过重新索引修改文档中字段名字

_reindex可用于复制源索引文档,在写入目的索引之前,重命名字段;假设my-index-000001索引有以下文档:

POST my-index-000001/_doc/1?refresh
{
  "text": "words words",
  "flag": "foo"
}

但是你想把字段名flag替换成tag,处理手段如下(当然也可以用 ingest pipeline):

POST _reindex
{
  "source": {
    "index": "my-index-000001"
  },
  "dest": {
    "index": "my-new-index-000001"
  },
  "script": {
    "source": "ctx._source.tag = ctx._source.remove(\"flag\")"
  }
}

现在获取新索引文档

GET my-new-index-000001/_doc/1

返回值如下:

{
  "found": true,
  "_id": "1",
  "_index": "my-new-index-000001",
  "_type": "_doc",
  "_version": 1,
  "_seq_no": 44,
  "_primary_term": 1,
  "_source": {
    "text": "words words",
    "tag": "foo"
  }
}

重新索引每日索引

_reindex结合 Painless 脚本来重新索引每日索引,将新模板应用于现有文档。假设你有如下索引并包含下列文档

PUT metricbeat-2021.05.10/_doc/1?refresh
{"system.cpu.idle.pct": 0.908}

PUT metricbeat-2021.05.11/_doc/1?refresh
{"system.cpu.idle.pct": 0.105}

通配metricbeat-*索引的新模板,已经加载到 Elasticsearch 中,但是该模板只会对新建的索引生效。Painless 可用于重新索引现有文档,并应用新模板。下面的脚本从索引名中提取日期,并创建一个新索引,新索引名添加 -1。metricbeat-2021.05.10所有的数据将会被重建到metricbeat-2021.05.10-1

POST _reindex
{
  "source": {
    "index": "metricbeat-*"
  },
  "dest": {
    "index": "metricbeat"
  },
  "script": {
    "lang": "painless",
    "source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"
  }
}

之前metricbeat索引的数据,都能从新的索引从获取到:

GET metricbeat-2021.05.10-1/_doc/1
GET metricbeat-2021.05.11-1/_doc/1

提取源的随机子集

_reindex 可用于提取源的随机子集以进行测试:

POST _reindex
{
  "max_docs": 10,
  "source": {
    "index": "my-index-000001",
    "query": {
      "function_score" : {
        "random_score" : {},
        "min_score" : 0.9    #备注1
      }
    }
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

可能需要根据从源中提取数据的相对数量,来调整min_score的值

重新索引时修改文档

_update_by_query一样,_reindex支持使用 script 修改文档;不同的是,_reindex中使用脚本可以修改文档的元数据。

此示例增加了源文档的版本:

POST _reindex
{
  "source": {
    "index": "my-index-000001"
  },
  "dest": {
    "index": "my-new-index-000001",
    "version_type": "external"
  },
  "script": {
    "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
    "lang": "painless"
  }
}

_update_by_query一样,你可以设置ctx.op更改在dest上执行的操作:

noop

如果决定不必在目标中为文档重新索引,则需要在脚本中设置 ctx.op=“noop”。响应主体中的noop计数器将报告不做任何的操作。

delete

如果必须从目标(dest)中删除文档,则需要在脚本中设置 ctx.op=“delete”。删除将在响应正文中的已删除计数器中报告 。

设置ctx.op为其他任何值都将返回错误,就像设置中的其他任何字段一样ctx

同时还可以更改一些索引元信息,但是谨慎操作:

  • _id
  • _index
  • _version
  • _routing

如果将_version设置为null或将其从ctx映射中清除,就像不在索引请求中发送版本一样; 则无论目标上的版本或_reindex请求中使用的版本类型如何,都会导致目标中的文档被覆盖。

远程重新索引

重新索引支持从远程 Elasticsearch 复制数据:

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "username": "user",
      "password": "pass"
    },
    "index": "my-index-000001",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "my-new-index-000001"
  }
}

Host 参数必须包含 scheme,host,port (如:https://otherhost:9200)或者代理路径(https://otherhost:9200/proxy)。

_reindex 需要基本授权认证链接远程 Elasticsearch 集群,才需要用户名和密码参数;使用基本身份验证时请确保使用 协议,否则密码将以纯文本形式发送。

如果是 reindex 远程集群的数据,则必须在当前集群的某个节点(请求发送到的那个节点,即协调节点)配置白名单,在 elasticsearch.yml 文件中添加 reindex.remote.whitelist 属性,该属性的值为请求远程集群节点的 host:port,可以用逗号分隔配置多个,也可以使用通配符方式;

示例如下:

reindex.remote.whitelist: "otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*"

此外在做远程reindex时,需要注意集群之前的版本兼容问题;Elasticsearch 不支持跨大版本的向前兼容,如不能从 7.x 群集重新索引到 6.x 群集

从远程服务器重新索引时使用堆内缓冲区,默认最大为 100mb;如果远程索引的文档非常大,那么批的 size 就应该设置的小一点。

如下代码块设置 batch size 为10:

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200"
    },
    "index": "source",
    "size": 10,
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}

连接远程 Elasticsearch 集群,可以通过socket_timeoutconnect_timeout,分别设置socket读取超时时间和链接超时时间,在一定程度上保证 Reindex 的稳定性(网络延迟问题),两者的默认值均为 30s。

如下示例分别设置socket读取超时为 1 分钟和连接超时时间 10s:

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "socket_timeout": "1m",
      "connect_timeout": "10s"
    },
    "index": "source",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}

配置 SSL 参数

从远程集群reindex支持配置 ssl;这些参数是无法在_reindex请求体配置;必须在elasticsearch.yml文件中指定,但在 Elasticsearch 密钥库中添加的安全设置除外,可支持的 ssl 参数。

配置如下:

参数 描述
reindex.ssl.certificate_authorities 应当信任的 PEM 编码证书文件的路径列表;但不能同时指定 reindex.ssl.certificate_authorities和 reindex.ssl.truststore.path
reindex.ssl.truststore.path 要信任的证书的 Java Keystore 文件的路径;该密钥库可以采用”JKS”或“PKCS#12”格式,但不能同时指定 reindex.ssl.certificate_authorities 和 reindex.ssl.truststore.path
reindex.ssl.truststore.password 信任库的密码(reindex.ssl.truststore.path)。此设置不能用于reindex.ssl.truststore.secure_password
reindex.ssl.truststore.secure_password 信任库的密码(reindex.ssl.truststore.path)。此设置不能用于reindex.ssl.truststore.password
reindex.ssl.truststore.type 信任库的类型(reindex.ssl.truststore.path)。必须为jks或PKCS12。如果信任库路径以“ .p12”,“.pfx”或“ pkcs12”结尾,则此设置默认为PKCS12。否则,默认为jks。
reindex.ssl.verification_mode 表示防止中间人攻击和证书伪造的验证类型。其中一个 full(验证主机名和证书路径), certificate (验证证书路径,而不是主机名)或none(不执行任何验证-这是在生产环境中强烈反对)。默认为 full
reindex.ssl.certificate 指定用于HTTP客户端身份验证的 PEM 编码证书(或证书链)的路径(如果远程集群需要)。此设置 reindex.ssl.key 还需要设置。您不能同时指定 reindex.ssl.certificate和reindex.ssl.keystore.path。
reindex.ssl.key 指定与用于客户端身份验证(reindex.ssl.certificate)的证书相关联的 PEM编码的私钥的路径。您不能同时指定reindex.ssl.key 和 reindex.ssl.keystore.path。
reindex.ssl.key_passphrase 指定用于 reindex.ssl.key 加密 PEM 编码的私钥(reindex.ssl.key)的密码。不能与一起使用reindex.ssl.secure_key_passphrase
reindex.ssl.secure_key_passphrase 指定用于 reindex.ssl.key 加密 PEM 编码的私钥(reindex.ssl.key)的密码。不能与一起使用reindex.ssl.key_passphrase
reindex.ssl.keystore.path 指定密钥库的路径,该密钥库包含用于HTTP客户端身份验证的私钥和证书(如果远程集群需要)。该密钥库可以采用“ JKS”或“ PKCS#12”格式。您不能同时指定 reindex.ssl.key 和 reindex.ssl.keystore.path
reindex.ssl.keystore.type 密钥库的类型(reindex.ssl.keystore.path)。必须为 jks 或 PKCS12。如果密钥库路径以“ .p12”,“.pfx” 或 “ pkcs12” 结尾,则此设置默认为 PKCS12。否则,默认为jks
reindex.ssl.keystore.password 密钥库(reindex.ssl.keystore.path)的密码。此设置不能用于reindex.ssl.keystore.secure_password
reindex.ssl.keystore.secure_password 密钥库(reindex.ssl.keystore.path)的密码。此设置不能用于reindex.ssl.keystore.password
reindex.ssl.keystore.key_password 密钥库(reindex.ssl.keystore.path)中密钥的密码。默认为密钥库密码。此设置不能用于 reindex.ssl.keystore.secure_key_password
reindex.ssl.keystore.secure_key_password 密钥库(reindex.ssl.keystore.path)中密钥的密码。默认为密钥库密码。此设置不能用于 reindex.ssl.keystore.key_password
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
18天前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
165 3
|
18天前
|
监控 前端开发 JavaScript
实战篇:商品API接口在跨平台销售中的有效运用与案例解析
随着电子商务的蓬勃发展,企业为了扩大市场覆盖面,经常需要在多个在线平台上展示和销售产品。然而,手工管理多个平台的库存、价格、商品描述等信息既耗时又容易出错。商品API接口在这一背景下显得尤为重要,它能够帮助企业在不同的销售平台之间实现商品信息的高效同步和管理。本文将通过具体的淘宝API接口使用案例,展示如何在跨平台销售中有效利用商品API接口,以及如何通过代码实现数据的统一管理。
|
18天前
|
数据采集 JSON API
如何实现高效率超简洁的实时数据采集?——Python实战电商数据采集API接口
你是否曾为获取重要数据而感到困扰?是否因为数据封锁而无法获取所需信息?是否因为数据格式混乱而头疼?现在,所有这些问题都可以迎刃而解。让我为大家介绍一款强大的数据采集API接口。
|
18天前
|
JavaScript API
【vue实战项目】通用管理系统:api封装、404页
【vue实战项目】通用管理系统:api封装、404页
41 3
|
18天前
|
分布式计算 Java 大数据
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
375 0
|
18天前
|
安全 大数据 API
elasticsearch|大数据|elasticsearch的api部分实战操作以及用户和密码的管理
elasticsearch|大数据|elasticsearch的api部分实战操作以及用户和密码的管理
94 0
|
16天前
|
自然语言处理 Java API
Java 8的Stream API和Optional类:概念与实战应用
【5月更文挑战第17天】Java 8引入了许多重要的新特性,其中Stream API和Optional类是最引人注目的两个。这些特性不仅简化了集合操作,还提供了更好的方式来处理可能为空的情况,从而提高了代码的健壮性和可读性。
42 7
|
18天前
|
JSON API 数据处理
【Swift开发专栏】Swift中的RESTful API集成实战
【4月更文挑战第30天】本文探讨了在Swift中集成RESTful API的方法,涉及RESTful API的基础概念,如HTTP方法和设计原则,以及Swift的网络请求技术,如`URLSession`、`Alamofire`和`SwiftyJSON`。此外,还强调了数据处理、错误管理和异步操作的重要性。通过合理利用这些工具和策略,开发者能实现高效、稳定的API集成,提升应用性能和用户体验。
|
13天前
|
Java 程序员 API
Java 8 Lambda 表达式和Stream API:概念、优势和实战应用
【5月更文挑战第20天】在Java 8中,Lambda 表达式和Stream API是两个非常强大的特性,它们显著改变了Java程序员处理数据和编写代码的方式。本篇技术文章将深入探讨这些特性的概念、优点,并提供实战示例,帮助理解如何有效地利用这些工具来编写更简洁、更高效的代码。
33 6
|
18天前
|
存储 缓存 监控
利用Python和Flask构建RESTful API的实战指南
在当今的软件开发中,RESTful API已成为前后端分离架构中的核心组件。本文将带你走进实战,通过Python的Flask框架,一步步构建出高效、安全的RESTful API。我们将从项目初始化、路由设置、数据验证、错误处理到API文档生成,全方位地探讨如何构建RESTful API,并给出一些实用的最佳实践和优化建议。