Kafka
Kafka 是一个分布式消息系统,用于发布和订阅记录流。
此演示展示了如何使用 KafkaChatMessageHistory 在 Kafka 集群中存储和检索聊天消息。
需要运行一个 Kafka 集群来运行此演示。您可以遵循此说明在本地创建 Kafka 集群。
from langchain_community.chat_message_histories import KafkaChatMessageHistory
chat_session_id = "chat-message-history-kafka"
bootstrap_servers = "localhost:64797" # host:port. `localhost:Plaintext Ports` if setup Kafka cluster locally
history = KafkaChatMessageHistory(
chat_session_id,
bootstrap_servers,
)
API Reference:KafkaChatMessageHistory
KafkaChatMessageHistory 的可选参数:
ttl_ms:聊天消息的生存时间(毫秒)。partition:用于存储聊天消息的主题的分区数。replication_factor:用于存储聊天消息的主题的副本系数。
KafkaChatMessageHistory 在内部使用 Kafka 消费者来读取聊天消息,并能够持久化标记已消费的位置。它具有以下检索聊天消息的方法:
messages: 从上一条消息继续消费聊天消息。messages_from_beginning: 重置消费者到历史记录的开头并开始消费消息。可选参数:max_message_count: 要读取的最大消息数。max_time_sec: 要读取的最大时间(秒)。
messages_from_latest: 重置消费者到聊天历史记录的末尾并尝试消费消息。可选参数与上面相同。messages_from_last_consumed: 继续返回从最后一条已消费的消息开始的消息,类似于messages,但带有可选参数。
max_message_count 和 max_time_sec 用于避免在检索消息时无限期阻塞。
因此,messages 和其他检索消息的方法可能不会返回聊天历史中的所有消息。您需要指定 max_message_count 和 max_time_sec 以在单个批次中检索所有聊天历史。
添加消息并检索。
history.add_user_message("hi!")
history.add_ai_message("whats up?")
history.messages
[HumanMessage(content='hi!'), AIMessage(content='whats up?')]
再次调用 messages 会返回一个空列表,因为消费者位于聊天记录的末尾。
history.messages
[]
添加新消息并继续消费。
history.add_user_message("hi again!")
history.add_ai_message("whats up again?")
history.messages
[HumanMessage(content='hi again!'), AIMessage(content='whats up again?')]
重置消费者并从头开始读取:
history.messages_from_beginning()
[HumanMessage(content='hi again!'),
AIMessage(content='whats up again?'),
HumanMessage(content='hi!'),
AIMessage(content='whats up?')]
将消费者设置到聊天记录的末尾,添加几条新消息,然后进行消费:
history.messages_from_latest()
history.add_user_message("HI!")
history.add_ai_message("WHATS UP?")
history.messages
[HumanMessage(content='HI!'), AIMessage(content='WHATS UP?')]