Elasticsearch 提供了 Bulk API,可以一次性发送多个操作(如索引、更新、删除)到 Elasticsearch 集群,从而实现高效的批量写入。这可以显著减少网络开销和提高写入性能。
示例代码:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
# 连接到Elasticsearch集群
es = Elasticsearch(['localhost:9200'])
# 准备要写入的数据
data_to_insert = [
{"_index": "my_index", "_id": 1, "_source": {"field1": "value1"}},
{"_index": "my_index", "_id": 2, "_source": {"field1": "value2"}},
# ... 添加更多数据
]
# 使用bulk方法批量写入数据
success, failed = bulk(es, data_to_insert)
print(f"成功写入文档数:{success}, 失败数:{failed}")
Elasticsearch Parallel Bulk API 是对 Bulk API 的改进,它允许在多个线程或进程中并行地执行批量写入操作,从而进一步提高写入性能。
示例代码:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
import multiprocessing
# 连接到Elasticsearch集群
es = Elasticsearch(['localhost:9200'])
# 准备要写入的数据
data_to_insert = [
{"_index": "my_index", "_id": 1, "_source": {"field1": "value1"}},
{"_index": "my_index", "_id": 2, "_source": {"field1": "value2"}},
# ... 添加更多数据
]
# 使用parallel_bulk方法并行写入数据
def generate_actions(data):
for item in data:
yield {
"_index": item["_index"],
"_id": item["_id"],
"_source": item["_source"]
}
num_processes = multiprocessing.cpu_count() # 使用所有可用核心
success, failed = parallel_bulk(es, generate_actions(data_to_insert), num_processes=num_processes)
print(f"成功写入文档数:{success}, 失败数:{failed}")
Elasticsearch Ingest Node 是 Elasticsearch 的一个功能,允许你在写入数据之前对数据进行预处理。你可以定义一个包含多个处理步骤的管道,并将数据发送到 Elasticsearch 集群,集群会按照定义的管道进行数据处理和写入。
示例代码:
在 Elasticsearch 中定义一个 Ingest 管道:
PUT _ingest/pipeline/my_pipeline
{
"description": "My custom pipeline",
"processors": [
{
"set": {
"field": "timestamp",
"value": "{{_ingest.timestamp}}"
}
}
]
}
使用定义的管道进行数据写入:
from elasticsearch import Elasticsearch
# 连接到Elasticsearch集群
es = Elasticsearch(['localhost:9200'])
# 准备要写入的数据
data_to_insert = [
{"field1": "value1"},
{"field1": "value2"},
# ... 添加更多数据
]
# 使用管道将数据写入Elasticsearch
for data in data_to_insert:
es.index(index='my_index', pipeline='my_pipeline', body=data)
通过使用 Ingest Node 管道,你可以在写入数据时进行各种预处理操作,如添加时间戳、修改字段等,从而根据需求灵活地定制数据写入过程。
这些方式可以帮助你高效地将大量数据写入 Elasticsearch,选择适合你应用需求的方式可以在性能和灵活性之间做出权衡。