【Elastic Engineering】Elasticsearch: Reindex 接口

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Elasticsearch: Reindex 接口

作者:刘晓国


在我们开发的过程中,我们有很多时候需要用到 Reindex 接口。它可以帮我们把数据从一个 index 到另外一个 index 进行重新 reindex。这个对于特别适用于我们在修改我们数据的 mapping 后,需要重新把数据从现有的 index 转到新的 index 建立新的索引,这是因为我们不能修改现有的 index 的 mapping 一旦已经定下来了。在接下来的介绍中,我们将学习如何使用 reindex 接口。


image.png


为了能够使用 reindex 接口,我们必须满足一下的条件:


_source 选项对所有的源 index 文档是启动的,也即源 index 的 source 是已经被存储的

reindex 不是帮我们尝试设置好目的地 index。它不拷贝源 index 的设置到目的地的 index 里去。你应该在做 reindex 之前把目的地的源的 index 设置好,这其中包括 mapping, shard 数目,replica 等

下面,我们来一个具体的例子,比如建立一个 blogs 的 index。

PUT twitter2/_doc/1
{
  "user" : "双榆树-张三",
  "message" : "今儿天气不错啊,出去转转去",
  "uid" : 2,
  "age" : 20,
  "city" : "北京",
  "province" : "北京",
  "country" : "中国",
  "address" : "中国北京市海淀区",
  "location" : {
    "lat" : "39.970718",
    "lon" : "116.325747"
  }
}

上面的命令让我们建立了一个叫做 twitter2 的 index,并同时帮我们生产了一个如下的 mapping:

GET /twitter2/_mapping

显示的结果是:

{
  "twitter2" : {
    "mappings" : {
      "properties" : {
        "address" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "age" : {
          "type" : "long"
        },
        "city" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "country" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "location" : {
          "properties" : {
            "lat" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "lon" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            }
          }
        },
        "message" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "province" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "uid" : {
          "type" : "long"
        },
        "user" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}

显然根据我们之前的练习 “开始使用Elasticsearch (2)”,系统帮我们生产的 location 数据类型是不对的,我们必须进行修改。一种办法是删除现有的 twitter2 索引,让后修改它的 mapping,再重新索引所有的数据。这对于一个两个文档还是可以的,但是如果已经有很多的数据了,这个方法并不可取。另外一种方式,是建立一个完全新的 index,使用新的 mapping 进行 reindex。下面我们展示如何使用这种方法。


创建一个新的 twitter3 的 index,使用如下的 mapping:

PUT twitter3
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "address": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "age": {
        "type": "long"
      },
      "city": {
        "type": "text"
      },
      "country": {
        "type": "text"
      },
      "location": {
        "type": "geo_point"
      },
      "message": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "province": {
        "type": "text"
      },
      "uid": {
        "type": "long"
      },
      "user": {
        "type": "text"
      }
    }
  }
}

这里我们我们修改了 location 及其它的一些数据项的数据类型。运行上面的指令,我们就可以创建一个完全新的 twitter3 的 index。我们可以通过如下的命令来进行 reindex:

POST _reindex
{
  "source": {
    "index": "twitter2"
  },
  "dest": {
    "index": "twitter3"
  }
}

显示的结果是:

{
  "took" : 52,
  "timed_out" : false,
  "total" : 1,
  "updated" : 0,
  "created" : 1,
  "deleted" : 0,
  "batches" : 1,
  "version_conflicts" : 0,
  "noops" : 0,
  "retries" : {
    "bulk" : 0,
    "search" : 0
  },
  "throttled_millis" : 0,
  "requests_per_second" : -1.0,
  "throttled_until_millis" : 0,
  "failures" : [ ]
}

我们可以通过如下的命令来检查我们的 twitter3 是否已经有新的数据:

GET /twitter3/_search

显示的结果是:

{
  "took" : 100,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "twitter3",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "user" : "双榆树-张三",
          "message" : "今儿天气不错啊,出去转转去",
          "uid" : 2,
          "age" : 20,
          "city" : "北京",
          "province" : "北京",
          "country" : "中国",
          "address" : "中国北京市海淀区",
          "location" : {
            "lat" : "39.970718",
            "lon" : "116.325747"
          }
        }
      }
    ]
  }
}

显然我们的数据已经从 twitter2 到 twitter3,并且它的数据类型已经是完全符合我们要求的数据类型。


在执行 reindex 的时候,为了提高效率,我们可以通过设置 slices 参数来并行处理。在通常的情况下,我们可以设置 slices为 auto Elasticsearchimage.png 自动选择 slices 的数目来进行加速。Elasticsearch 会依据你电脑中的 core 的数量来进行决定。我们可以使用如下的格式:

POST _reindex?slices=auto
{
  "source": {
    "index": "twitter2"
  },
  "dest": {
    "index": "twitter3"
  }
}


Reindex 执行


Reindex 是一个时间点的副本

就像上面返回的结果显示的那样,它是以 batch(批量)的方式来执行的。默认的批量大小为 1000

你也可以只拷贝源 index 其中的一部分数据

        -  通过加入 query 到 source 中


        -  通过定义 max_docs 参数


比如:

POST _reindex
{
  "max_docs": 100,
  "source": {
    "index": "twitter2",
    "query": {
      "match": {
        "city": "北京"
      }
    }
  },
  "dest": {
    "index": "twitter3"
  }
}

这里,我们定义最多不超过 100 个文档,同时,我们只拷贝来自“北京”的 twitter 记录。


设置 op_type to create 将导致 _reindex 仅在目标索引中创建缺少的文档。 所有现有文档都会导致版本冲突,比如:

POST _reindex
{
  "source": {
    "index": "twitter2"
  },
  "dest": {
    "index": "twitter3",
    "op_type": "create"
  }
}

如果我们之前已经做过 reindex,那么我们可以看到如下的结果:

{
  "took": 2,
  "timed_out": false,
  "total": 1,
  "updated": 0,
  "created": 0,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 1,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1,
  "throttled_until_millis": 0,
  "failures": [
    {
      "index": "twitter3",
      "type": "_doc",
      "id": "1",
      "cause": {
        "type": "version_conflict_engine_exception",
        "reason": "[1]: version conflict, document already exists (current version [5])",
        "index_uuid": "ffz2LNIIQqqDx211R5f4fQ",
        "shard": "0",
        "index": "twitter3"
      },
      "status": 409
    }
  ]
}

它表明我们之前的文档 id 为1的有版本上的冲突。


默认情况下,版本冲突会中止 _reindex 进程。 “conflict” 请求 body 参数可用于指示 _reindex 继续处理版本冲突的下一个文档。 请务必注意,其他错误类型的处理不受 “conflict” 参数的影响。 当 “conflict”:在请求正文中设置 “proceed” 时, _reindex 进程将继续发生版本冲突并返回遇到的版本冲突计数:

POST _reindex
{
  "conflicts": "proceed",
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "op_type": "create"
  }
}


Throttling


重新索引大量文档可能会使你的群集泛滥甚至崩溃。requests_per_second 限制索引操作速率。

POST _reindex?requests_per_second=500 
{
  "source": {
    "index": "blogs",
    "size": 500
  },
  "dest": {
    "index": "blogs_fixed"
  }
}


运用 index 别名来进行 reindex


我们可以通过如下的方法来实现从一个 index 到另外一个 index 的数据转移:


PUT test     
PUT test_2   
POST /_aliases
{
    "actions" : [
        { "add":  { "index": "test_2", "alias": "test" } },
        { "remove_index": { "index": "test" } }  
    ]
}

在上面的例子中,假如我们地添加了一个叫做 test 的 index,而 test_2 是我们想要的。我们直接可以通过上面的方法吧 test 中的数据交换到 test_2 中,并同时把 test 索引删除。


从远处进行 reindex


_reindex 也支持从一个远处的 Elasticsearch 的服务器进行 reindex,它的语法为:

POST _reindex
{
  "source": {
    "index": "blogs",
    "remote": {
      "host": "http://remote_cluster_node1:9200",
      "username": "USERNAME",
      "password": "PASSWORD"
    }
  },
  "dest": {
    "index": "blogs"
  }
}

这里显示它从一个在 http://remote_cluster_node1:9200 的服务器来拷贝文件从一个 index 到另外一个 index。我们必须注意的是:remote hosts 必须很清楚地在 elasticsearch.yml 这个配置文件中的 reindex.remote.whitelist 中进行定义。


Update by Query


虽然这个不在我们的 reindex 介绍范围,但是在有些情况下,我们可以可以通过 _update_by_query API 来让我们轻松地更新一个字段的值:

POST blogs_fixed/_update_by_query
{
  "query": {
    "match": {
      "category.keyword": ""
    }
  },
  "script": {
    "source": """
       ctx._source['category'] = "None"
     """
  }
}

在上面,把 category.keyword 项为空的所有文档的 category 通过脚本设置为默认的 "None" 字符串。它和 reindex 的作用相似。


为 mapping 添加新的 mulit-field


假设我们要向 twitter_new 索引的 mapping 添加一个多字段(multi-field)


具体来说,假设我们要用新的方法分析 “content” 字段

PUT new_new/_mapping
{
  "properties": {
    "content": {
      "type": "text",
      "fields": {
        "english": {
          "type": "text",
          "analyzer": "english"
        }
      }
    }
  }
}

在上面我们为 content 字段添加了一个新的 english 字段,并且它的 analyzer 为 english。


由于 mapping 已经发生改变,但是索引中已经有的文档没有这个新的字段 english,如果这个时候我们进行如下的搜索,将不会找到任何的结果:

GET twitter_new/_search
{
  "query": {
    "match": {
      "content.english": "performance tips"
    }
  }
}

那么我们该如何使得索引中现有的文档都有 content.english 这个字段呢?运行 _update_by_query 以拥有现有文档选择新的 “content.english” 字段:

POST twitter_new/_update_by_query

当我们完成上面的请求后,然后再执行如下的操作,将会在 twitter_new 索引中搜索到想要的文档:

GET twitter_new/_search
{
  "query": {
    "match": {
      "content.english": "performance tips"
    }
  }
}


参考:


【1】https://www.elastic.co/guide/en/elasticsearch/reference/7.3/docs-reindex.html


相关实践学习
利用Elasticsearch实现地理位置查询
本实验将分别介绍如何使用Elasticsearch7.10版本进行全文检索、多语言检索和地理位置查询三个Elasticsearch基础检索子场景的实现。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
2月前
|
Java 流计算
这个错误是因为Elasticsearch的Emitter没有实现Serializable接口
【2月更文挑战第18天】这个错误是因为Elasticsearch的Emitter没有实现Serializable接口
53 3
|
2月前
|
JSON 监控 Java
Elasticsearch 8.X reindex 源码剖析及提速指南
Elasticsearch 8.X reindex 源码剖析及提速指南
39 0
|
2月前
|
运维 架构师 搜索推荐
7 年+积累、 Elastic 创始人Shay Banon 等 15 位专家推荐的 Elasticsearch 8.X新书已上线...
7 年+积累、 Elastic 创始人Shay Banon 等 15 位专家推荐的 Elasticsearch 8.X新书已上线...
39 4
|
2月前
|
存储 安全 数据处理
Elastic 中国开发者大会2023最新干货——Elasticsearch 7、8 新功能一网打尽
Elastic 中国开发者大会2023最新干货——Elasticsearch 7、8 新功能一网打尽
31 0
|
7月前
|
搜索推荐 索引
Elasticsearch elastic io 100%,但磁盘的iops和吞吐量没爆没啥原因吗?
Elasticsearch elastic io 100%,但磁盘的iops和吞吐量没爆没啥原因吗?
108 3
|
7月前
|
存储 缓存 监控
Elasticsearch elastic io 100%,但磁盘的iops和吞吐量没爆没啥原因吗?
Elasticsearch elastic io 100%,但磁盘的iops和吞吐量没爆没啥原因吗?
102 2
|
9月前
|
存储 自然语言处理 监控
ElasticSearch第三讲:ES详解 - Elastic Stack生态和场景方案
ElasticSearch第三讲:ES详解 - Elastic Stack生态和场景方案
100 0
|
数据采集 数据可视化 搜索推荐
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(上)
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(上)
228 0
|
存储 安全 数据可视化
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(中)
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(中)
163 0
|
测试技术 Apache
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(下)
带你读《Elastic Stack 实战手册》之3:——3.1.1.从 Elasticsearch 到 Elastic Stack(下)
157 0

热门文章

最新文章

相关产品

  • 检索分析服务 Elasticsearch版