Skip to main content
Open In ColabOpen on GitHub

Elasticsearch

Elasticsearch 是一个分布式的、RESTful 的搜索和分析引擎,能够同时执行向量搜索和词汇搜索。它构建在 Apache Lucene 库之上。

本笔记展示了如何使用与 Elasticsearch 向量存储相关的功能。

设置

要使用 Elasticsearch 向量搜索,您必须安装 langchain-elasticsearch 包。

%pip install -qU langchain-elasticsearch

凭据

有两种主要方法可以设置 Elasticsearch 实例以供配合使用:

  1. Elastic Cloud:Elastic Cloud 是一项托管的 Elasticsearch 服务。注册即可获得免费试用

    若要连接到不需要登录凭证的 Elasticsearch 实例(启动已启用安全性的 docker 实例),请将 Elasticsearch URL 和索引名称与嵌入对象一起传递给构造函数。

  2. 本地安装 Elasticsearch:通过在本地运行 Elasticsearch 来开始使用。最简单的方法是使用官方 Elasticsearch Docker 镜像。有关更多信息,请参阅Elasticsearch Docker 文档

通过 Docker 运行 Elasticsearch

示例:运行一个禁用了安全性的单节点 Elasticsearch 实例。不建议在生产环境中使用。

%docker run -p 9200:9200 -e "discovery.type=single-node" -e "xpack.security.enabled=false" -e "xpack.security.http.ssl.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:8.12.1

启用认证运行

为了生产环境的运行安全,我们建议启用安全设置。您可以使用 es_api_keyes_useres_password 参数来连接并提供登录凭证。

pip install -qU langchain-openai
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
from langchain_elasticsearch import ElasticsearchStore

elastic_vector_search = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="langchain_index",
embedding=embeddings,
es_user="elastic",
es_password="changeme",
)
API Reference:ElasticsearchStore

如何获取默认“elastic”用户的密码?

要获取默认“elastic”用户的 Elastic Cloud 密码:

  1. 登录 Elastic Cloud 控制台 https://cloud.elastic.co
  2. 转到“Security”>“Users”
  3. 找到“elastic”用户并点击“Edit”
  4. 点击“Reset password”
  5. 按照提示重置密码

如何获取 API 密钥?

要获取 API 密钥:

  1. 登录 Elastic Cloud 控制台 https://cloud.elastic.co
  2. 打开 Kibana 并转到 Stack Management > API Keys
  3. 点击“Create API key”
  4. 输入 API 密钥的名称并点击“Create”
  5. 复制 API 密钥并将其粘贴到 api_key 参数中

Elastic Cloud

要连接到 Elastic Cloud 上的 Elasticsearch 实例,您可以使用 es_cloud_id 参数或 es_url

elastic_vector_search = ElasticsearchStore(
es_cloud_id="<cloud_id>",
index_name="test_index",
embedding=embeddings,
es_user="elastic",
es_password="changeme",
)

如果你想获得一流的模型调用自动化追踪,你也可以通过取消下面一行的注释来设置你的 LangSmith API 密钥:

# os.environ["LANGSMITH_API_KEY"] = getpass.getpass("Enter your LangSmith API key: ")
# os.environ["LANGSMITH_TRACING"] = "true"

初始化

Elasticsearch 在本地 localhost:9200 上运行,使用了 Docker。有关如何从 Elastic Cloud 连接到 Elasticsearch 的更多详细信息,请参阅上面的 带身份验证的运行

from langchain_elasticsearch import ElasticsearchStore

vector_store = ElasticsearchStore(
"langchain-demo", embedding=embeddings, es_url="http://localhost:9201"
)
API Reference:ElasticsearchStore

管理向量存储

向向量存储添加条目

from uuid import uuid4

from langchain_core.documents import Document

document_1 = Document(
page_content="I had chocolate chip pancakes and scrambled eggs for breakfast this morning.",
metadata={"source": "tweet"},
)

document_2 = Document(
page_content="The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees.",
metadata={"source": "news"},
)

document_3 = Document(
page_content="Building an exciting new project with LangChain - come check it out!",
metadata={"source": "tweet"},
)

document_4 = Document(
page_content="Robbers broke into the city bank and stole $1 million in cash.",
metadata={"source": "news"},
)

document_5 = Document(
page_content="Wow! That was an amazing movie. I can't wait to see it again.",
metadata={"source": "tweet"},
)

document_6 = Document(
page_content="Is the new iPhone worth the price? Read this review to find out.",
metadata={"source": "website"},
)

document_7 = Document(
page_content="The top 10 soccer players in the world right now.",
metadata={"source": "website"},
)

document_8 = Document(
page_content="LangGraph is the best framework for building stateful, agentic applications!",
metadata={"source": "tweet"},
)

document_9 = Document(
page_content="The stock market is down 500 points today due to fears of a recession.",
metadata={"source": "news"},
)

document_10 = Document(
page_content="I have a bad feeling I am going to get deleted :(",
metadata={"source": "tweet"},
)

documents = [
document_1,
document_2,
document_3,
document_4,
document_5,
document_6,
document_7,
document_8,
document_9,
document_10,
]
uuids = [str(uuid4()) for _ in range(len(documents))]

vector_store.add_documents(documents=documents, ids=uuids)
API Reference:Document
['21cca03c-9089-42d2-b41c-3d156be2b519',
'a6ceb967-b552-4802-bb06-c0e95fce386e',
'3a35fac4-e5f0-493b-bee0-9143b41aedae',
'176da099-66b1-4d6a-811b-dfdfe0808d30',
'ecfa1a30-3c97-408b-80c0-5c43d68bf5ff',
'c0f08baa-e70b-4f83-b387-c6e0a0f36f73',
'489b2c9c-1925-43e1-bcf0-0fa94cf1cbc4',
'408c6503-9ba4-49fd-b1cc-95584cd914c5',
'5248c899-16d5-4377-a9e9-736ca443ad4f',
'ca182769-c4fc-4e25-8f0a-8dd0a525955c']

从向量库中删除条目

vector_store.delete(ids=[uuids[-1]])
True

查询向量存储

创建向量存储并将相关文档添加进去后,您很可能希望在链或代理运行时查询它。这些示例还展示了如何在搜索时使用过滤。

直接查询

相似性搜索

可以按以下方式执行带有元数据过滤的简单相似性搜索:

results = vector_store.similarity_search(
query="LangChain provides abstractions to make working with LLMs easy",
k=2,
filter=[{"term": {"metadata.source.keyword": "tweet"}}],
)
for res in results:
print(f"* {res.page_content} [{res.metadata}]")
* Building an exciting new project with LangChain - come check it out! [{'source': 'tweet'}]
* LangGraph is the best framework for building stateful, agentic applications! [{'source': 'tweet'}]

相似性搜索与评分

如果你想执行相似性搜索并获得相应的评分,可以运行:

results = vector_store.similarity_search_with_score(
query="Will it be hot tomorrow",
k=1,
filter=[{"term": {"metadata.source.keyword": "news"}}],
)
for doc, score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
* [SIM=0.765887] The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees. [{'source': 'news'}]

查询并转换为检索器

您还可以将向量存储转换为检索器,以便在链中使用更加方便。

retriever = vector_store.as_retriever(
search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.2}
)
retriever.invoke("Stealing from the bank is a crime")
[Document(metadata={'source': 'news'}, page_content='Robbers broke into the city bank and stole $1 million in cash.'),
Document(metadata={'source': 'news'}, page_content='The stock market is down 500 points today due to fears of a recession.'),
Document(metadata={'source': 'website'}, page_content='Is the new iPhone worth the price? Read this review to find out.'),
Document(metadata={'source': 'tweet'}, page_content='Building an exciting new project with LangChain - come check it out!')]

距离相似性算法

Elasticsearch 支持以下向量距离相似性算法:

  • cosine
  • euclidean
  • dot_product

cosine 相似性算法是默认的。

您可以通过 similarity 参数指定所需的相似性算法。

注意:根据检索策略的不同,相似性算法在查询时无法更改。它需要在为字段创建索引映射时进行设置。如果需要更改相似性算法,您需要删除索引并使用正确的 distance_strategy 重新创建它。

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
distance_strategy="COSINE",
# distance_strategy="EUCLIDEAN_DISTANCE"
# distance_strategy="DOT_PRODUCT"
)

检索策略

Elasticsearch 在支持广泛的检索策略方面,相对于其他仅支持向量的数据库具有巨大的优势。在本 Notebook 中,我们将配置 ElasticsearchStore 来支持一些最常见的检索策略。

默认情况下,ElasticsearchStore 使用 DenseVectorStrategy(在 0.2.0 版本之前称为 ApproxRetrievalStrategy)。

DenseVectorStrategy

这将返回与查询向量最相似的前 k 个向量。k 参数在初始化 ElasticsearchStore 时设置。默认值为 10。

from langchain_elasticsearch import DenseVectorStrategy

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=DenseVectorStrategy(),
)

docs = db.similarity_search(
query="What did the president say about Ketanji Brown Jackson?", k=10
)
API Reference:DenseVectorStrategy

示例:密集向量和关键字搜索的混合检索

本示例将展示如何配置 ElasticsearchStore 以执行混合检索,结合使用近似语义搜索和基于关键字的搜索。

我们使用 RRF 来平衡来自不同检索方法的两个分数。

要启用混合检索,我们需要在 DenseVectorStrategy 构造函数中设置 hybrid=True

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=DenseVectorStrategy(hybrid=True),
)

当启用混合模式后,执行的查询将是近似语义搜索和基于关键字的搜索的组合。

它将使用 rrf(Reciprocal Rank Fusion)来平衡来自不同检索方法的两个分数。

注意:RRF 要求 Elasticsearch 8.9.0 或更高版本。

{
"retriever": {
"rrf": {
"retrievers": [
{
"standard": {
"query": {
"bool": {
"filter": [],
"must": [{"match": {"text": {"query": "foo"}}}],
}
},
},
},
{
"knn": {
"field": "vector",
"filter": [],
"k": 1,
"num_candidates": 50,
"query_vector": [1.0, ..., 0.0],
},
},
]
}
}
}

示例:在 Elasticsearch 中使用 Embedding Model 进行密集向量搜索

本示例将展示如何配置 ElasticsearchStore 以在 Elasticsearch 中使用已部署的嵌入模型进行密集向量检索。

要使用此功能,请通过 query_model_id 参数在 DenseVectorStrategy 构造函数中指定 model_id

注意: 这要求模型已部署并在 Elasticsearch ML 节点上运行。有关如何使用 eland 部署模型的说明,请参阅 笔记本示例

DENSE_SELF_DEPLOYED_INDEX_NAME = "test-dense-self-deployed"

# Note: This does not have an embedding function specified
# Instead, we will use the embedding model deployed in Elasticsearch
db = ElasticsearchStore(
es_cloud_id="<your cloud id>",
es_user="elastic",
es_password="<your password>",
index_name=DENSE_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=DenseVectorStrategy(model_id="sentence-transformers__all-minilm-l6-v2"),
)

# Setup a Ingest Pipeline to perform the embedding
# of the text field
db.client.ingest.put_pipeline(
id="test_pipeline",
processors=[
{
"inference": {
"model_id": "sentence-transformers__all-minilm-l6-v2",
"field_map": {"query_field": "text_field"},
"target_field": "vector_query_field",
}
}
],
)

# creating a new index with the pipeline,
# not relying on langchain to create the index
db.client.indices.create(
index=DENSE_SELF_DEPLOYED_INDEX_NAME,
mappings={
"properties": {
"text_field": {"type": "text"},
"vector_query_field": {
"properties": {
"predicted_value": {
"type": "dense_vector",
"dims": 384,
"index": True,
"similarity": "l2_norm",
}
}
},
}
},
settings={"index": {"default_pipeline": "test_pipeline"}},
)

db.from_texts(
["hello world"],
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name=DENSE_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=DenseVectorStrategy(model_id="sentence-transformers__all-minilm-l6-v2"),
)

# Perform search
db.similarity_search("hello world", k=10)

SparseVectorStrategy (ELSER)

此策略使用 Elasticsearch 的稀疏向量检索来检索 top-k 结果。目前我们仅支持我们自己的“ELSER”嵌入模型。

注意: 这要求 ELSER 模型已在 Elasticsearch ml 节点中部署并运行。

要使用此策略,请在 ElasticsearchStore 构造函数中指定 SparseVectorStrategy(0.2.0 版本之前称为 SparseVectorRetrievalStrategy)。您需要提供模型 ID。

from langchain_elasticsearch import SparseVectorStrategy

# Note that this example doesn't have an embedding function. This is because we infer the tokens at index time and at query time within Elasticsearch.
# This requires the ELSER model to be loaded and running in Elasticsearch.
db = ElasticsearchStore.from_documents(
docs,
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name="test-elser",
strategy=SparseVectorStrategy(model_id=".elser_model_2"),
)

db.client.indices.refresh(index="test-elser")

results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson", k=4
)
print(results[0])
API Reference:SparseVectorStrategy

DenseVectorScriptScoreStrategy

此策略使用 Elasticsearch 的 script score 查询来执行精确向量检索(也称为暴力搜索),以检索 top-k 结果。(在 0.2.0 版本之前,此策略被称为 ExactRetrievalStrategy。)

要使用此策略,请在 ElasticsearchStore 构造函数中指定 DenseVectorScriptScoreStrategy

from langchain_elasticsearch import SparseVectorStrategy

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=DenseVectorScriptScoreStrategy(),
)
API Reference:SparseVectorStrategy

BM25Strategy

最后,您可以使用全文本关键词搜索。

要使用此功能,请在 ElasticsearchStore 构造函数中指定 BM25Strategy

from langchain_elasticsearch import BM25Strategy

db = ElasticsearchStore.from_documents(
docs,
es_url="http://localhost:9200",
index_name="test",
strategy=BM25Strategy(),
)
API Reference:BM25Strategy

BM25RetrievalStrategy

此策略允许用户执行纯 BM25 搜索,无需向量搜索。

要使用此策略,请在 ElasticsearchStore 构造函数中指定 BM25RetrievalStrategy

请注意,在下面的示例中,没有指定 embedding 选项,这表明搜索是在不使用 embeddings 的情况下进行的。

from langchain_elasticsearch import ElasticsearchStore

db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
strategy=ElasticsearchStore.BM25RetrievalStrategy(),
)

db.add_texts(
["foo", "foo bar", "foo bar baz", "bar", "bar baz", "baz"],
)

results = db.similarity_search(query="foo", k=10)
print(results)
API Reference:ElasticsearchStore

自定义查询

通过搜索中的 custom_query 参数,您可以调整用于从 Elasticsearch 检索文档的查询。如果您想使用更复杂的查询来支持字段的线性加权,这将非常有用。

# Example of a custom query thats just doing a BM25 search on the text field.
def custom_query(query_body: dict, query: str):
"""Custom query to be used in Elasticsearch.
Args:
query_body (dict): Elasticsearch query body.
query (str): Query string.
Returns:
dict: Elasticsearch query body.
"""
print("Query Retriever created by the retrieval strategy:")
print(query_body)
print()

new_query_body = {"query": {"match": {"text": query}}}

print("Query thats actually used in Elasticsearch:")
print(new_query_body)
print()

return new_query_body


results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
custom_query=custom_query,
)
print("Results:")
print(results[0])

自定义文档构建器

通过搜索中的 doc_builder 参数,您可以调整使用从 ElasticSearch 检索的数据构建文档的方式。如果您拥有不是使用 Langchain 创建的索引,这将特别有用。

from typing import Dict

from langchain_core.documents import Document


def custom_document_builder(hit: Dict) -> Document:
src = hit.get("_source", {})
return Document(
page_content=src.get("content", "Missing content!"),
metadata={
"page_number": src.get("page_number", -1),
"original_filename": src.get("original_filename", "Missing filename!"),
},
)


results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
doc_builder=custom_document_builder,
)
print("Results:")
print(results[0])
API Reference:Document

用于检索增强生成的使用方法

有关如何将此向量存储用于检索增强生成(RAG)的指南,请参阅以下部分:

FAQ

问题:在将文档索引到 Elasticsearch 时,我遇到了超时错误。如何解决?

一个可能的原因是您的文档索引到 Elasticsearch 所需的时间较长。ElasticsearchStore 使用 Elasticsearch bulk API,该 API 具有一些默认设置,您可以进行调整以减少超时错误的发生几率。

当您使用 SparseVectorRetrievalStrategy 时,这也是一个不错的选择。

默认值如下:

  • chunk_size: 500
  • max_chunk_bytes: 100MB

要调整这些设置,您可以将 chunk_sizemax_chunk_bytes 参数传递给 ElasticsearchStore 的 add_texts 方法。

    vector_store.add_texts(
texts,
bulk_kwargs={
"chunk_size": 50,
"max_chunk_bytes": 200000000
}
)

升级到 ElasticsearchStore

如果你已经在基于 Langchain 的项目中使用了 Elasticsearch,你可能正在使用已弃用的旧实现:ElasticVectorSearchElasticKNNSearch。我们引入了一个名为 ElasticsearchStore 的新实现,它更灵活、更易于使用。本指南将引导你完成升级到新实现的过程。

有什么新内容?

新的实现现在合并为一个名为 ElasticsearchStore 的类,可以通过策略用于近似稠密向量、精确稠密向量、稀疏向量 (ELSER)、BM25 检索和混合检索。

我正在使用 ElasticKNNSearch

旧实现:


from langchain_community.vectorstores.elastic_vector_search import ElasticKNNSearch

db = ElasticKNNSearch(
elasticsearch_url="http://localhost:9200",
index_name="test_index",
embedding=embedding
)

新实现:


from langchain_elasticsearch import ElasticsearchStore, DenseVectorStrategy

db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding,
# 如果你使用 model_id
# strategy=DenseVectorStrategy(model_id="test_model")
# 如果你使用混合搜索
# strategy=DenseVectorStrategy(hybrid=True)
)

我正在使用 ElasticVectorSearch

旧实现:


from langchain_community.vectorstores.elastic_vector_search import ElasticVectorSearch

db = ElasticVectorSearch(
elasticsearch_url="http://localhost:9200",
index_name="test_index",
embedding=embedding
)

API Reference:ElasticVectorSearch

新实现:


from langchain_elasticsearch import ElasticsearchStore, DenseVectorScriptScoreStrategy

db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding,
strategy=DenseVectorScriptScoreStrategy()
)

db.client.indices.delete(
index="test-metadata, test-elser, test-basic",
ignore_unavailable=True,
allow_no_indices=True,
)

API 参考

如需了解 ElasticSearchStore 所有功能和配置的详细文档,请参阅 API 参考:https://python.langchain.com/api_reference/elasticsearch/vectorstores/langchain_elasticsearch.vectorstores.ElasticsearchStore.html