实现在 Elasticsearch 中查询一万条以上数据的方式有以下几种:
Scroll API 允许您在一系列连续的查询中检索大量数据。它通过在初始查询中返回一个 scroll ID,然后您可以使用这个 scroll ID 来获取下一批结果,直到没有更多结果为止。
示例代码:
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
query = {
"query": {
"match_all": {}
}
}
scroll_time = "5m" # Scroll的时间窗口
# 初始查询
initial_response = es.search(index="your_index", body=query, scroll=scroll_time)
scroll_id = initial_response['_scroll_id']
scroll_size = initial_response['hits']['total']['value']
# 滚动获取数据
while scroll_size > 0:
response = es.scroll(scroll_id=scroll_id, scroll=scroll_time)
# 处理数据
for hit in response['hits']['hits']:
print(hit)
scroll_id = response['_scroll_id']
scroll_size = len(response['hits']['hits'])
Search After 方式使用上一次查询结果中的最后一条记录来获取下一批数据。这种方式适用于已排序字段的情况。
示例代码:
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
query = {
"query": {
"match_all": {}
},
"size": 1000,
"sort": ["_doc"] # 假设使用默认的文档ID排序
}
response = es.search(index="your_index", body=query)
hits = response['hits']['hits']
while hits:
last_hit = hits[-1]
last_sort_values = last_hit['sort']
query["search_after"] = last_sort_values
response = es.search(index="your_index", body=query)
hits = response['hits']['hits']
# 处理数据
for hit in hits:
print(hit)
虽然不是最有效的方式,但您仍然可以使用标准的分页查询来检索大量数据。
示例代码:
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
query = {
"query": {
"match_all": {}
},
"size": 1000,
"from": 0
}
page = 1
page_size = 1000
response = es.search(index="your_index", body=query)
hits = response['hits']['hits']
while hits:
# 处理数据
for hit in hits:
print(hit)
# 获取下一页数据
query["from"] = page * page_size
response = es.search(index="your_index", body=query)
hits = response['hits']['hits']
page += 1
请注意,Scroll API 在处理大量数据时通常是最有效的选择,因为它避免了一些分页查询可能出现的性能问题。