Skip to main content
Open In ColabOpen on GitHub

Amazon Neptune with SPARQL

Amazon Neptune 是高性能图分析和无服务器数据库,可提供卓越的可伸缩性和可用性。

此示例演示了一个 QA 链,该链使用 SPARQL 查询语言查询 Amazon Neptune 图数据库中的 Resource Description Framework (RDF) 数据,并返回人类可读的响应。

SPARQLRDF 图的标准查询语言。

此示例使用 NeptuneRdfGraph 类,该类连接到 Neptune 数据库并加载其模式。 create_neptune_sparql_qa_chain 用于连接图和 LLM,以提出自然语言问题。

本笔记本演示了一个使用组织数据的示例。

运行此笔记本的先决条件:

  • Neptune 1.2.x 集群,可从本笔记本访问
  • 内核为 Python 3.9 或更高版本
  • 对于 Bedrock 访问,请确保 IAM 角色具有此策略
{
"Action": [
"bedrock:ListFoundationModels",
"bedrock:InvokeModel"
],
"Resource": "*",
"Effect": "Allow"
}
  • 用于暂存示例数据的 S3 存储桶。存储桶应与 Neptune 位于同一账户/区域。

设置

播种 W3C 组织数据

播种 W3C 组织数据,包括 W3C org ontology 以及一些实例。

你需要在与 Neptune 集群相同的区域和账户中创建一个 S3 存储桶。将 STAGE_BUCKET 设置为该存储桶的名称。

STAGE_BUCKET = "<bucket-name>"
%%bash  -s "$STAGE_BUCKET"

rm -rf data
mkdir -p data
cd data
echo getting org ontology and sample org instances
wget http://www.w3.org/ns/org.ttl
wget https://raw.githubusercontent.com/aws-samples/amazon-neptune-ontology-example-blog/main/data/example_org.ttl

echo Copying org ttl to S3
aws s3 cp org.ttl s3://$1/org.ttl
aws s3 cp example_org.ttl s3://$1/example_org.ttl

我们将使用 graph-notebook 包中的 %load magic 命令将 W3C 数据插入 Neptune 图中。在运行 %load 之前,请使用 %%graph_notebook_config 来设置图连接参数。

!pip install --upgrade --quiet graph-notebook
%load_ext graph_notebook.magics
%%graph_notebook_config
{
"host": "<neptune-endpoint>",
"neptune_service": "neptune-db",
"port": 8182,
"auth_mode": "<[DEFAULT|IAM]>",
"load_from_s3_arn": "<neptune-cluster-load-role-arn>",
"ssl": true,
"aws_region": "<region>"
}

批量加载组织时间戳(ttl)——包括本体和实例。

%load -s s3://{STAGE_BUCKET} -f turtle --store-to loadres --run
%load_status {loadres['payload']['loadId']} --errors --details

设置链

!pip install --upgrade --quiet langchain-aws

** 重启内核 **

准备一个示例

EXAMPLES = """

<question>
Find organizations.
</question>

<sparql>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX org: <http://www.w3.org/ns/org#>

select ?org ?orgName where {{
?org rdfs:label ?orgName .
}}
</sparql>

<question>
Find sites of an organization
</question>

<sparql>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX org: <http://www.w3.org/ns/org#>

select ?org ?orgName ?siteName where {{
?org rdfs:label ?orgName .
?org org:hasSite/rdfs:label ?siteName .
}}
</sparql>

<question>
Find suborganizations of an organization
</question>

<sparql>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX org: <http://www.w3.org/ns/org#>

select ?org ?orgName ?subName where {{
?org rdfs:label ?orgName .
?org org:hasSubOrganization/rdfs:label ?subName .
}}
</sparql>

<question>
Find organizational units of an organization
</question>

<sparql>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX org: <http://www.w3.org/ns/org#>

select ?org ?orgName ?unitName where {{
?org rdfs:label ?orgName .
?org org:hasUnit/rdfs:label ?unitName .
}}
</sparql>

<question>
Find members of an organization. Also find their manager, or the member they report to.
</question>

<sparql>
PREFIX org: <http://www.w3.org/ns/org#>
PREFIX foaf: <http://xmlns.com/foaf/0.1/>

select * where {{
?person rdf:type foaf:Person .
?person org:memberOf ?org .
OPTIONAL {{ ?person foaf:firstName ?firstName . }}
OPTIONAL {{ ?person foaf:family_name ?lastName . }}
OPTIONAL {{ ?person org:reportsTo ??manager }} .
}}
</sparql>


<question>
Find change events, such as mergers and acquisitions, of an organization
</question>

<sparql>
PREFIX org: <http://www.w3.org/ns/org#>

select ?event ?prop ?obj where {{
?org rdfs:label ?orgName .
?event rdf:type org:ChangeEvent .
?event org:originalOrganization ?origOrg .
?event org:resultingOrganization ?resultingOrg .
}}
</sparql>

"""

创建 Neptune 数据库 RDF 图谱

from langchain_aws.graphs import NeptuneRdfGraph

host = "<your host>"
port = 8182 # change if different
region = "us-east-1" # change if different
graph = NeptuneRdfGraph(host=host, port=port, use_iam_auth=True, region_name=region)

# Optionally, change the schema
# elems = graph.get_schema_elements
# change elems ...
# graph.load_schema(elems)
API Reference:NeptuneRdfGraph

使用 Neptune SPARQL QA Chain

此 QA Chain 使用 SPARQL 查询 Neptune 图数据库,并返回人类可读的响应。

from langchain_aws import ChatBedrockConverse
from langchain_aws.chains import create_neptune_sparql_qa_chain

MODEL_ID = "anthropic.claude-3-5-sonnet-20241022-v2:0"
llm = ChatBedrockConverse(
model_id=MODEL_ID,
temperature=0,
)

chain = create_neptune_sparql_qa_chain(
llm=llm,
graph=graph,
examples=EXAMPLES,
)

result = chain.invoke("How many organizations are in the graph?")
print(result["result"].content)

试试下面这些针对已摄入图数据的提示。

result = chain.invoke("Are there any mergers or acquisitions?")
print(result["result"].content)
result = chain.invoke("Find organizations.")
print(result["result"].content)
result = chain.invoke("Find sites of MegaSystems or MegaFinancial.")
print(result["result"].content)
result = chain.invoke("Find a member who is a manager of one or more members.")
print(result["result"].content)
result = chain.invoke("Find five members and their managers.")
print(result["result"].content)
result = chain.invoke(
"Find org units or suborganizations of The Mega Group. What are the sites of those units?"
)
print(result["result"].content)

添加消息历史记录

Neptune SPARQL QA 链可以通过 RunnableWithMessageHistory 进行封装。这将为链添加消息历史记录,使我们能够创建一个跨多次调用的、可以保留对话状态的聊天机器人。

首先,我们需要一种存储和加载消息历史记录的方法。为此,每个会话都将创建为 InMemoryChatMessageHistory 的实例,并存储在一个字典中以便重复访问。

(另请参阅:https://python.langchain.com/docs/versions/migrating_memory/chat_history/#chatmessagehistory)

from langchain_core.chat_history import InMemoryChatMessageHistory

chats_by_session_id = {}


def get_chat_history(session_id: str) -> InMemoryChatMessageHistory:
chat_history = chats_by_session_id.get(session_id)
if chat_history is None:
chat_history = InMemoryChatMessageHistory()
chats_by_session_id[session_id] = chat_history
return chat_history

现在,QA 链和消息历史存储可用于创建新的 RunnableWithMessageHistory。请注意,我们必须将 query 设置为输入键,以匹配基础链的预期格式。

from langchain_core.runnables.history import RunnableWithMessageHistory

runnable_with_history = RunnableWithMessageHistory(
chain,
get_chat_history,
input_messages_key="query",
)

在调用链之前,需要为对话生成一个唯一的 session_idInMemoryChatMessageHistory 将会记住它。

import uuid

session_id = uuid.uuid4()

最后,调用具有 session_id 的消息历史记录启用链。

result = runnable_with_history.invoke(
{"query": "How many org units or suborganizations does the The Mega Group have?"},
config={"configurable": {"session_id": session_id}},
)
print(result["result"].content)

随着链使用相同的 session_id 被反复调用,响应将根据对话中先前的查询在上下文中返回。

result = runnable_with_history.invoke(
{"query": "List the sites for each of the units."},
config={"configurable": {"session_id": session_id}},
)
print(result["result"].content)