摘要:# api_key=("api_key_id", "api_key"),
为了使用Python连接和操作Elasticsearch,可以按照以下步骤进行:
1. 安装必要的库
使用elasticsearch库进行连接和操作,使用elasticsearch-dsl进行高级查询(可选):
bash
pip install elasticsearch elasticsearch-dsl
2. 连接到Elasticsearch
python
from elasticsearch import Elasticsearch
# 基本连接(无认证)
es = Elasticsearch(["http://localhost:9200"])
# 带有用户名和密码的连接
# es = Elasticsearch(
# "http://localhost:9200",
# http_auth=("username", "password")
# )
# Elasticsearch 8.x 使用SSL和API密钥(示例)
# es = Elasticsearch(
# "https://localhost:9200",
# api_key=("api_key_id", "api_key"),
# verify_certs=False # 测试环境可关闭证书验证
# )
# 检查连接
if es.ping:
print("成功连接到Elasticsearch")
else:
print("连接失败")
3. 创建索引
python
index_name = "my_index"
# 判断索引是否存在,不存在则创建
if not es.indices.exists(index=index_name):
es.indices.create(
index=index_name,
body={
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"title": {"type": "text"},
"content": {"type": "text"},
"date": {"type": "date"}
}
}
}
)
print(f"索引 {index_name} 创建成功")
else:
print(f"索引 {index_name} 已存在")
4. 插入文档
python
doc = {
"title": "示例文档",
"content": "这是用于Elasticsearch的示例内容。",
"date": "2023-10-01"
}
# 插入文档并指定ID
response = es.index(index=index_name, id=1, document=doc)
print(f"文档插入结果: {response['result']}") # 输出: created 或 updated
5. 查询文档
根据ID查询
python
try:
doc = es.get(index=index_name, id=1)
print(doc['_source'])
except Exception as e:
print(f"查询失败: {e}")
搜索文档
python
search_body = {
"query": {
"match": {
"content": "示例"
}
}
}
result = es.search(index=index_name, body=search_body)
print(f"找到 {result['hits']['total']['value']} 条记录:")
for hit in result['hits']['hits']:
print(hit['_source'])
6. 更新文档
python
update_body = {
"doc": {
"content": "更新后的内容。"
}
}
response = es.update(index=index_name, id=1, body=update_body)
print(f"更新结果: {response['result']}") # 输出: updated
7. 删除文档
python
response = es.delete(index=index_name, id=1)
print(f"删除结果: {response['result']}") # 输出: deleted
8. 批量操作
python
from elasticsearch import helpers
actions = [
{
"_index": index_name,
"_source": {
"title": f"文档 {i}",
"content": f"内容 {i}",
"date": "2023-10-01"
}
}
for i in range(10)
]
helpers.bulk(es, actions)
print("批量插入完成")
9. 删除索引
python
if es.indices.exists(index=index_name):
es.indices.delete(index=index_name)
print(f"索引 {index_name} 已删除")
10. 复杂查询示例(布尔查询和聚合)
python
search_body = {
"query": {
"bool": {
"must": [
{"match": {"content": "内容"}},
{"range": {"date": {"gte": "2023-01-01"}}}
]
}
},
"aggs": {
"内容统计": {
"terms": {"field": "title.keyword", "size": 10}
}
}
}
result = es.search(index=index_name, body=search_body)
print("聚合结果:")
for bucket in result['aggregations']['内容统计']['buckets']:
print(f"{bucket['key']}: {bucket['doc_count']}")
常见问题处理
版本兼容性:确保客户端版本与Elasticsearch版本对应,例如Elasticsearch 8.x使用elasticsearch>=8.0。认证失败:检查用户名、密码或API密钥是否正确,确认Elasticsearch的安全配置。SSL证书问题:在生产环境中应使用有效证书,测试时可临时禁用验证(verify_certs=False)。字段类型错误:确保插入的数据类型与映射定义一致,如日期格式需符合yyyy-MM-dd。通过以上步骤,您可以有效地使用Python操作Elasticsearch进行数据存储、检索和分析。
来源:老客数据一点号