ElasticSearch的scroll接口搜索示例


=Start=

缘由:

之前整理过一篇「ElasticSearch的搜索API学习整理」的文章,但里面介绍的主要是一些搜索语法、限制条件,并没有实际搞定最终匹配数量超过10000的数据获取办法,其实方法也很简单——使用scroll接口分批次取,这里简单记录一下,方便以后参考。

正文:

参考解答:
# 方法一:HTTP请求 scroll 接口

scroll是先做一次初始化搜索把所有符合搜索条件的结果缓存起来生成一个快照,然后持续地、批量地从快照里拉取数据直到没有数据剩下。而这时对索引数据的插入、删除、更新都不会影响遍历结果,因此scroll并不适合用来做实时搜索。

需要先执行一个初始化搜索请求,传递一个scroll参数来告诉 Elasticsearch缓存应该持续多长时间,在缓存持续时间内初始化搜索请求后对索引的修改不会反应到快照中。每次搜索请求后都会返回一个scrollId,是一个 64 位的字符串编码,后续会使用此scrollId来获取数据。scroll时间指的是本次数据处理所需要的时间,如果超过此时间,继续使用该scrollId搜索数据则会报错。

#!/usr/bin/env python
# coding=utf-8

import sys, time
import json
import requests

def main():
    elastic_url = 'http://localhost:9200/index_name/_search?scroll=2m'
    payload = {
        "size": 10,
        "query": {
            "bool" : {
                "must": {
                    "query_string": {
                       "query": "spend_time:>=1000",
                    }
                },
                "filter": {
                    "range" : {
                        "es_timestamp" : {
                            "gte": "2018/08/30 00:00:32 +0800",
                            "lte": "2018/08/31 00:00:32 +0800",
                        }
                    }
                }
            }
        }
    }

    r = requests.post(elastic_url, data=json.dumps(payload))
    resp_json = r.json()
    hits = resp_json['hits']['hits']
    _scroll_id = resp_json['_scroll_id']

    # 注意,这里的URL和上面的不一样,并没有 index_name 信息,对于新手来说千万注意!
    # scroll_api_url = 'http://localhost:9200/_search/scroll'
    scroll_api_url = 'http://localhost:9200/_search/scroll?filter_path=_shards,_scroll_id,hits.total,hits.hits._source'
    
    while hits:
        print "hits: {}".format(len(hits))
        print "_scroll_id: {}".format(_scroll_id)

        scroll_payload = {
            'scroll': '2m',
            'scroll_id': _scroll_id
        }
        scroll_r = requests.post(scroll_api_url, data=json.dumps(scroll_payload))
        resp_json = scroll_r.json()
        # print resp_json
        hits = resp_json['hits']['hits']
        _scroll_id = resp_json['_scroll_id']


if __name__ == '__main__':
    time_start = time.time()
    try:
        main()
    except KeyboardInterrupt:
        print 'Killed by user'
        sys.exit(0)
    print "Spend {0} seconds.\n".format(time.time() - time_start)

 

# 方法二:使用官方提供的库

 

参考链接:

=END=


《“ElasticSearch的scroll接口搜索示例”》 有 3 条评论

回复 hi 取消回复

您的电子邮箱地址不会被公开。 必填项已用*标注