ElasticsearchRetriever
Elasticsearch 是一个分布式、RESTful 的搜索和分析引擎。它提供了一个分布式的、支持多租户的全文搜索引擎,具有 HTTP Web 接口和无模式的 JSON 文档。它支持关键词搜索、向量搜索、混合搜索和复杂过滤。
ElasticsearchRetriever 是一个通用的包装器,通过 Query DSL 实现对所有 Elasticsearch 功能的灵活访问。对于大多数用例,其他类 (ElasticsearchStore、ElasticsearchEmbeddings 等) 应该已经足够,但如果它们不能满足你的需求,你可以使用 ElasticsearchRetriever。
本指南将帮助你开始使用 Elasticsearch retriever。有关 ElasticsearchRetriever 所有功能和配置的详细文档,请参阅 API 参考。
集成详情
| Retriever | Self-host | Cloud offering | Package |
|---|---|---|---|
| ElasticsearchRetriever | ✅ | ✅ | langchain_elasticsearch |
设置
设置 Elasticsearch 实例主要有两种方式:
-
Elastic Cloud:Elastic Cloud 是一个托管的 Elasticsearch 服务。注册一个 免费试用。 若要连接到不需要登录凭据的 Elasticsearch 实例(在 启用了安全的情况下启动 docker 实例),请将 Elasticsearch URL 和索引名称与 embedding 对象一起传递给构造函数。
-
本地安装 Elasticsearch:通过在本地运行 Elasticsearch 来入门。最简单的方法是使用官方 Elasticsearch Docker 镜像。有关更多信息,请参阅 Elasticsearch Docker 文档。
如果你想从单个查询中获得自动化跟踪,你也可以通过取消下面一行的注释来设置你的 LangSmith API 密钥:
# os.environ["LANGSMITH_API_KEY"] = getpass.getpass("Enter your LangSmith API key: ")
# os.environ["LANGSMITH_TRACING"] = "true"
安装
此检索器位于 langchain-elasticsearch 包中。为方便演示,我们还将安装 langchain-community 来生成文本embeddings。
%pip install -qU langchain-community langchain-elasticsearch
from typing import Any, Dict, Iterable
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from langchain_community.embeddings import DeterministicFakeEmbedding
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_elasticsearch import ElasticsearchRetriever
配置
在此处定义与 Elasticsearch 的连接。在此示例中,我们使用本地运行的实例。或者,您也可以在 Elastic Cloud 上注册一个账户并开始 免费试用。
es_url = "http://localhost:9200"
es_client = Elasticsearch(hosts=[es_url])
es_client.info()
为了进行向量搜索,我们将仅出于说明目的而使用随机嵌入。对于实际用例,请选择 LangChain Embeddings 类中的一个。
embeddings = DeterministicFakeEmbedding(size=3)
定义示例数据
index_name = "test-langchain-retriever"
text_field = "text"
dense_vector_field = "fake_embedding"
num_characters_field = "num_characters"
texts = [
"foo",
"bar",
"world",
"hello world",
"hello",
"foo bar",
"bla bla foo",
]
索引数据
通常,当用户已经在 Elasticsearch 索引中拥有数据时,他们会使用 ElasticsearchRetriever。这里我们索引了一些示例文本文件。如果您已经使用 ElasticsearchStore.from_documents 等方式创建了索引,那也是可以的。
def create_index(
es_client: Elasticsearch,
index_name: str,
text_field: str,
dense_vector_field: str,
num_characters_field: str,
):
es_client.indices.create(
index=index_name,
mappings={
"properties": {
text_field: {"type": "text"},
dense_vector_field: {"type": "dense_vector"},
num_characters_field: {"type": "integer"},
}
},
)
def index_data(
es_client: Elasticsearch,
index_name: str,
text_field: str,
dense_vector_field: str,
embeddings: Embeddings,
texts: Iterable[str],
refresh: bool = True,
) -> None:
create_index(
es_client, index_name, text_field, dense_vector_field, num_characters_field
)
vectors = embeddings.embed_documents(list(texts))
requests = [
{
"_op_type": "index",
"_index": index_name,
"_id": i,
text_field: text,
dense_vector_field: vector,
num_characters_field: len(text),
}
for i, (text, vector) in enumerate(zip(texts, vectors))
]
bulk(es_client, requests)
if refresh:
es_client.indices.refresh(index=index_name)
return len(requests)
index_data(es_client, index_name, text_field, dense_vector_field, embeddings, texts)
7
实例化
向量搜索
本示例中使用伪造的 embedding 进行密集向量检索。
def vector_query(search_query: str) -> Dict:
vector = embeddings.embed_query(search_query) # same embeddings as for indexing
return {
"knn": {
"field": dense_vector_field,
"query_vector": vector,
"k": 5,
"num_candidates": 10,
}
}
vector_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=vector_query,
content_field=text_field,
url=es_url,
)
vector_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 1.0, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='world', metadata={'_index': 'test-langchain-index', '_id': '2', '_score': 0.6770179, '_source': {'fake_embedding': [-0.7041151202179595, -1.4652961969276497, -0.25786766898672847], 'num_characters': 5}}),
Document(page_content='hello world', metadata={'_index': 'test-langchain-index', '_id': '3', '_score': 0.4816144, '_source': {'fake_embedding': [0.42728413221815387, -1.1889908285425348, -1.445433230084671], 'num_characters': 11}}),
Document(page_content='hello', metadata={'_index': 'test-langchain-index', '_id': '4', '_score': 0.46853775, '_source': {'fake_embedding': [-0.28560441330564046, 0.9958894823084921, 1.5489829880195058], 'num_characters': 5}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.2086992, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}})]
BM25
传统的关键词匹配。
def bm25_query(search_query: str) -> Dict:
return {
"query": {
"match": {
text_field: search_query,
},
},
}
bm25_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=bm25_query,
content_field=text_field,
url=es_url,
)
bm25_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.9711467, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.6025789, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]
混合搜索
将向量搜索与 BM25 搜索相结合,并使用 互逆文档排名融合 (RRF) 来合并结果集。
def hybrid_query(search_query: str) -> Dict:
vector = embeddings.embed_query(search_query) # same embeddings as for indexing
return {
"retriever": {
"rrf": {
"retrievers": [
{
"standard": {
"query": {
"match": {
text_field: search_query,
}
}
}
},
{
"knn": {
"field": dense_vector_field,
"query_vector": vector,
"k": 5,
"num_candidates": 10,
}
},
]
}
}
}
hybrid_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=hybrid_query,
content_field=text_field,
url=es_url,
)
hybrid_retriever.invoke("foo")
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.9711467, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.6025789, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]
模糊匹配
支持容错的关键词匹配。
def fuzzy_query(search_query: str) -> Dict:
return {
"query": {
"match": {
text_field: {
"query": search_query,
"fuzziness": "AUTO",
}
},
},
}
fuzzy_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=fuzzy_query,
content_field=text_field,
url=es_url,
)
fuzzy_retriever.invoke("fox") # note the character tolernace
[Document(page_content='foo', metadata={'_index': 'test-langchain-index', '_id': '0', '_score': 0.6474311, '_source': {'fake_embedding': [-2.336764233933763, 0.27510289545940503, -0.7957597268194339], 'num_characters': 3}}),
Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 0.49580228, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='bla bla foo', metadata={'_index': 'test-langchain-index', '_id': '6', '_score': 0.40171927, '_source': {'fake_embedding': [1.7365927060137358, -0.5230400847844948, 0.7978339724186192], 'num_characters': 11}})]
复杂筛选
对不同字段进行筛选组合。
def filter_query_func(search_query: str) -> Dict:
return {
"query": {
"bool": {
"must": [
{"range": {num_characters_field: {"gte": 5}}},
],
"must_not": [
{"prefix": {text_field: "bla"}},
],
"should": [
{"match": {text_field: search_query}},
],
}
}
}
filtering_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=filter_query_func,
content_field=text_field,
url=es_url,
)
filtering_retriever.invoke("foo")
[Document(page_content='foo bar', metadata={'_index': 'test-langchain-index', '_id': '5', '_score': 1.7437035, '_source': {'fake_embedding': [0.2533670476638539, 0.08100381646160418, 0.7763644080870179], 'num_characters': 7}}),
Document(page_content='world', metadata={'_index': 'test-langchain-index', '_id': '2', '_score': 1.0, '_source': {'fake_embedding': [-0.7041151202179595, -1.4652961969276497, -0.25786766898672847], 'num_characters': 5}}),
Document(page_content='hello world', metadata={'_index': 'test-langchain-index', '_id': '3', '_score': 1.0, '_source': {'fake_embedding': [0.42728413221815387, -1.1889908285425348, -1.445433230084671], 'num_characters': 11}}),
Document(page_content='hello', metadata={'_index': 'test-langchain-index', '_id': '4', '_score': 1.0, '_source': {'fake_embedding': [-0.28560441330564046, 0.9958894823084921, 1.5489829880195058], 'num_characters': 5}})]
请注意,查询匹配位于顶部。通过过滤的其他文档也包含在结果集中,但它们的得分均相同。
自定义文档映射器
可以通过自定义函数将 Elasticsearch 的结果(命中项)映射到 LangChain 文档。
def num_characters_mapper(hit: Dict[str, Any]) -> Document:
num_chars = hit["_source"][num_characters_field]
content = hit["_source"][text_field]
return Document(
page_content=f"This document has {num_chars} characters",
metadata={"text_content": content},
)
custom_mapped_retriever = ElasticsearchRetriever.from_es_params(
index_name=index_name,
body_func=filter_query_func,
document_mapper=num_characters_mapper,
url=es_url,
)
custom_mapped_retriever.invoke("foo")
[Document(page_content='This document has 7 characters', metadata={'text_content': 'foo bar'}),
Document(page_content='This document has 5 characters', metadata={'text_content': 'world'}),
Document(page_content='This document has 11 characters', metadata={'text_content': 'hello world'}),
Document(page_content='This document has 5 characters', metadata={'text_content': 'hello'})]
用法
遵循上面的示例,我们使用 .invoke 来发出单个查询。因为检索器是可运行的(Runnables),所以我们也可以使用 Runnable 接口 中的任何方法,例如 .batch。
在链中使用
我们还可以将检索器整合到 链 中,以构建更大的应用程序,例如一个简单的 RAG 应用程序。出于演示目的,我们也实例化了一个 OpenAI 聊天模型。
%pip install -qU langchain-openai
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
prompt = ChatPromptTemplate.from_template(
"""Answer the question based only on the context provided.
Context: {context}
Question: {question}"""
)
llm = ChatOpenAI(model="gpt-4o-mini")
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
chain = (
{"context": vector_retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
chain.invoke("what is foo?")
API 参考
有关 ElasticsearchRetriever 所有功能和配置的详细文档,请访问 API 参考。
Related
- Retriever conceptual guide
- Retriever how-to guides