从 MapReduceDocumentsChain 迁移
MapReduceDocumentsChain 实现了对(可能很长)文本的映射-归约(map-reduce)策略。该策略如下:
- 将文本分割成更小的文档;
- 对这些更小的文档应用一个处理过程;
- 将处理结果归约或整合为最终结果。
请注意,映射步骤通常是并行处理输入文档的。
在这种情况下,一个常见的处理过程是摘要生成,其中映射步骤对单个文档进行摘要,然后归约步骤生成这些摘要的摘要。
在归约步骤中,MapReduceDocumentsChain 支持对摘要进行递归“折叠”:输入将根据令牌限制进行分区,并为分区生成摘要。此步骤将重复进行,直到摘要的总长度在期望的限制内,从而能够对任意长度的文本进行摘要。这对于具有较小上下文窗口的模型特别有用。
LangGraph 支持 映射-归约 工作流,并为该问题带来了许多优势:
- LangGraph 允许对单个步骤(如连续摘要)进行流式传输,从而更好地控制执行;
- LangGraph 的 检查点 支持错误恢复、扩展人类在环工作流,以及更轻松地集成到对话应用程序中。
- 正如我们将在下面看到的,LangGraph 实现更容易扩展。
下面我们将通过一个简单的示例来说明 MapReduceDocumentsChain 和相应的 LangGraph 实现,然后使用一个更长的示例文本来演示递归归约步骤。
我们先加载一个聊天模型:
pip install -qU "langchain[google-genai]"
import getpass
import os
if not os.environ.get("GOOGLE_API_KEY"):
os.environ["GOOGLE_API_KEY"] = getpass.getpass("Enter API key for Google Gemini: ")
from langchain.chat_models import init_chat_model
llm = init_chat_model("gemini-2.0-flash", model_provider="google_genai")
基本示例(短文档)
让我们以下面的 3 个文档为例进行说明。
from langchain_core.documents import Document
documents = [
Document(page_content="Apples are red", metadata={"title": "apple_book"}),
Document(page_content="Blueberries are blue", metadata={"title": "blueberry_book"}),
Document(page_content="Bananas are yelow", metadata={"title": "banana_book"}),
]
旧版
Details
下面我们展示使用 MapReduceDocumentsChain 的实现。我们为 map 和 reduce 步骤定义了 prompt 模板,为这些步骤实例化了单独的链,最后实例化 MapReduceDocumentsChain:
from langchain.chains import MapReduceDocumentsChain, ReduceDocumentsChain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain.chains.llm import LLMChain
from langchain_core.prompts import ChatPromptTemplate
from langchain_text_splitters import CharacterTextSplitter
# Map
map_template = "Write a concise summary of the following: {docs}."
map_prompt = ChatPromptTemplate([("human", map_template)])
map_chain = LLMChain(llm=llm, prompt=map_prompt)
# Reduce
reduce_template = """
The following is a set of summaries:
{docs}
Take these and distill it into a final, consolidated summary
of the main themes.
"""
reduce_prompt = ChatPromptTemplate([("human", reduce_template)])
reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt)
# Takes a list of documents, combines them into a single string, and passes this to an LLMChain
combine_documents_chain = StuffDocumentsChain(
llm_chain=reduce_chain, document_variable_name="docs"
)
# Combines and iteratively reduces the mapped documents
reduce_documents_chain = ReduceDocumentsChain(
# This is final chain that is called.
combine_documents_chain=combine_documents_chain,
# If documents exceed context for `StuffDocumentsChain`
collapse_documents_chain=combine_documents_chain,
# The maximum number of tokens to group documents into.
token_max=1000,
)
# Combining documents by mapping a chain over them, then combining results
map_reduce_chain = MapReduceDocumentsChain(
# Map chain
llm_chain=map_chain,
# Reduce chain
reduce_documents_chain=reduce_documents_chain,
# The variable name in the llm_chain to put the documents in
document_variable_name="docs",
# Return the results of the map steps in the output
return_intermediate_steps=False,
)
result = map_reduce_chain.invoke(documents)
print(result["output_text"])
Fruits come in a variety of colors, with apples being red, blueberries being blue, and bananas being yellow.
在 LangSmith 追踪 中,我们观察到四次 LLM 调用:一次总结了三个输入文档中的每一个,一次总结了这些总结。
LangGraph
下面我们展示一个 LangGraph 实现,使用了与上面相同的提示模板。该图包含一个用于生成摘要的节点,该节点会映射到一系列输入文档。然后,该节点会流向第二个节点,用于生成最终摘要。
Details
我们需要安装 langgraph:
%pip install -qU langgraph
import operator
from typing import Annotated, List, TypedDict
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langgraph.constants import Send
from langgraph.graph import END, START, StateGraph
map_template = "Write a concise summary of the following: {context}."
reduce_template = """
The following is a set of summaries:
{docs}
Take these and distill it into a final, consolidated summary
of the main themes.
"""
map_prompt = ChatPromptTemplate([("human", map_template)])
reduce_prompt = ChatPromptTemplate([("human", reduce_template)])
map_chain = map_prompt | llm | StrOutputParser()
reduce_chain = reduce_prompt | llm | StrOutputParser()
# Graph components: define the components that will make up the graph
# This will be the overall state of the main graph.
# It will contain the input document contents, corresponding
# summaries, and a final summary.
class OverallState(TypedDict):
# Notice here we use the operator.add
# This is because we want combine all the summaries we generate
# from individual nodes back into one list - this is essentially
# the "reduce" part
contents: List[str]
summaries: Annotated[list, operator.add]
final_summary: str
# This will be the state of the node that we will "map" all
# documents to in order to generate summaries
class SummaryState(TypedDict):
content: str
# Here we generate a summary, given a document
async def generate_summary(state: SummaryState):
response = await map_chain.ainvoke(state["content"])
return {"summaries": [response]}
# Here we define the logic to map out over the documents
# We will use this an edge in the graph
def map_summaries(state: OverallState):
# We will return a list of `Send` objects
# Each `Send` object consists of the name of a node in the graph
# as well as the state to send to that node
return [
Send("generate_summary", {"content": content}) for content in state["contents"]
]
# Here we will generate the final summary
async def generate_final_summary(state: OverallState):
response = await reduce_chain.ainvoke(state["summaries"])
return {"final_summary": response}
# Construct the graph: here we put everything together to construct our graph
graph = StateGraph(OverallState)
graph.add_node("generate_summary", generate_summary)
graph.add_node("generate_final_summary", generate_final_summary)
graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "generate_final_summary")
graph.add_edge("generate_final_summary", END)
app = graph.compile()
from IPython.display import Image
Image(app.get_graph().draw_mermaid_png())
请注意,以流式模式调用グラフ可以让我们在执行过程中监控步骤并可能对其采取行动。
# Call the graph:
async for step in app.astream({"contents": [doc.page_content for doc in documents]}):
print(step)
{'generate_summary': {'summaries': ['Apples are typically red in color.']}}
{'generate_summary': {'summaries': ['Bananas are yellow in color.']}}
{'generate_summary': {'summaries': ['Blueberries are a type of fruit that are blue in color.']}}
{'generate_final_summary': {'final_summary': 'The main themes are the colors of different fruits: apples are red, blueberries are blue, and bananas are yellow.'}}
在LangSmith 追踪中,我们发现了与之前相同的四个 LLM 调用。
总结长篇文档
Map-reduce 流程在文本长度远超 LLM 上下文窗口时尤其有用。MapReduceDocumentsChain 支持对摘要进行递归“折叠”:输入会根据 token 限制进行分区,并为这些分区生成摘要。此步骤会重复进行,直到摘要的总长度在期望的限制范围内,从而实现对任意长度文本的摘要。
这个“折叠”步骤在 MapReduceDocumentsChain 中实现为一个 while 循环。我们可以用 Lilian Weng 的 LLM 驱动的自主代理 博客 文章(在 RAG 教程 和其他文档中也有介绍)来演示这个步骤。
首先,我们加载博文并将其分割成更小的“子文档”:
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import CharacterTextSplitter
loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
documents = loader.load()
text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000, chunk_overlap=0
)
split_docs = text_splitter.split_documents(documents)
print(f"Generated {len(split_docs)} documents.")
USER_AGENT environment variable not set, consider setting it to identify your requests.
Created a chunk of size 1003, which is longer than the specified 1000
``````output
Generated 14 documents.
旧版
Details
我们可以像以前一样调用 MapReduceDocumentsChain:
result = map_reduce_chain.invoke(split_docs)
print(result["output_text"])
The article discusses the use of Large Language Models (LLMs) to power autonomous agents in various tasks, showcasing their capabilities in problem-solving beyond generating written content. Key components such as planning, memory optimization, and tool use are explored, with proof-of-concept demos like AutoGPT and GPT-Engineer demonstrating the potential of LLM-powered agents. Challenges include limitations in historical information retention and natural language interface reliability, while the potential of LLMs in enhancing reasoning, problem-solving, and planning proficiency for autonomous agents is highlighted. Overall, the article emphasizes the versatility and power of LLMs in creating intelligent agents for tasks like scientific discovery and experiment design.
请考虑上述调用的 LangSmith trace。在实例化我们的 ReduceDocumentsChain 时,我们将 token_max 设置为 1,000 个 token。这导致总共进行了 17 次 LLM 调用:
- 14 次调用用于总结文本分割器生成的 14 个子文档。
- 这产生了总计约 1,000 到 2,000 个 token 的摘要。由于我们将
token_max设置为 1,000,因此又有两次调用用于总结(或“折叠”)这些摘要。 - 最后一次调用用于生成两个“折叠”摘要的最终摘要。
LangGraph
Details
我们可以将 LangGraph 中最初的 map-reduce 实现扩展到实现相同的递归折叠步骤。我们进行以下更改:
- 向 state 添加一个
collapsed_summaries键以存储折叠后的摘要; - 更新最终的总结节点以总结折叠后的摘要;
- 添加一个
collapse_summaries节点,根据 token 长度(此处为 1,000 tokens,与之前相同)对文档列表进行分区,为每个分区生成摘要,并将结果存储在collapsed_summaries中。
我们从 collapse_summaries 添加一个条件边到自身以形成一个循环:如果折叠后的摘要总量超过 token_max,我们就重新运行该节点。
from typing import Literal
from langchain.chains.combine_documents.reduce import (
acollapse_docs,
split_list_of_docs,
)
def length_function(documents: List[Document]) -> int:
"""Get number of tokens for input contents."""
return sum(llm.get_num_tokens(doc.page_content) for doc in documents)
token_max = 1000
class OverallState(TypedDict):
contents: List[str]
summaries: Annotated[list, operator.add]
collapsed_summaries: List[Document] # add key for collapsed summaries
final_summary: str
# Add node to store summaries for collapsing
def collect_summaries(state: OverallState):
return {
"collapsed_summaries": [Document(summary) for summary in state["summaries"]]
}
# Modify final summary to read off collapsed summaries
async def generate_final_summary(state: OverallState):
response = await reduce_chain.ainvoke(state["collapsed_summaries"])
return {"final_summary": response}
graph = StateGraph(OverallState)
graph.add_node("generate_summary", generate_summary) # same as before
graph.add_node("collect_summaries", collect_summaries)
graph.add_node("generate_final_summary", generate_final_summary)
# Add node to collapse summaries
async def collapse_summaries(state: OverallState):
doc_lists = split_list_of_docs(
state["collapsed_summaries"], length_function, token_max
)
results = []
for doc_list in doc_lists:
results.append(await acollapse_docs(doc_list, reduce_chain.ainvoke))
return {"collapsed_summaries": results}
graph.add_node("collapse_summaries", collapse_summaries)
def should_collapse(
state: OverallState,
) -> Literal["collapse_summaries", "generate_final_summary"]:
num_tokens = length_function(state["collapsed_summaries"])
if num_tokens > token_max:
return "collapse_summaries"
else:
return "generate_final_summary"
graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "collect_summaries")
graph.add_conditional_edges("collect_summaries", should_collapse)
graph.add_conditional_edges("collapse_summaries", should_collapse)
graph.add_edge("generate_final_summary", END)
app = graph.compile()
LangGraph 允许绘制图结构,以帮助可视化其功能:
from IPython.display import Image
Image(app.get_graph().draw_mermaid_png())
和之前一样,我们可以流式传输图以观察其步骤顺序。下面,我们只打印步骤的名称。
请注意,由于图中存在循环,为执行指定**recursion_limit** 会很有帮助。这类似于**ReduceDocumentsChain.token_max**,当超过指定限制时会引发特定错误。
async for step in app.astream(
{"contents": [doc.page_content for doc in split_docs]},
{"recursion_limit": 10},
):
print(list(step.keys()))
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['collect_summaries']
['collapse_summaries']
['generate_final_summary']
print(step)
{'generate_final_summary': {'final_summary': 'The summaries discuss the use of Large Language Models (LLMs) to power autonomous agents in various tasks such as problem-solving, planning, and tool use. Key components like planning, memory, and task decomposition are highlighted, along with challenges such as inefficient planning and hallucination. Techniques like Algorithm Distillation and Maximum Inner Product Search are explored for optimization, while frameworks like ReAct and Reflexion show improvements in knowledge-intensive tasks. The importance of accurate interpretation of user input and well-structured code for functional autonomy is emphasized, along with the potential of LLMs in prompting, reasoning, and emergent social behavior in simulation environments. Challenges in real-world scenarios and the use of LLMs with expert-designed tools for tasks like organic synthesis and drug discovery are also discussed.'}}
在对应的 LangSmith 追踪 中,我们可以看到与之前相同的 17 次 LLM 调用,这次它们被分组到了各自的节点下。
后续步骤
请查看 LangGraph 文档,了解有关使用 LangGraph 进行构建的详细信息,包括 此指南 中关于 LangGraph 中 map-reduce 机制的详细介绍。
有关更多基于 LLM 的摘要策略,请参阅 本教程。