1. Self-RAG-pinecone_movies 范式概述
展示了一个基于 LangGraph 的 Self-RAG(Self-Reflective Retrieval-Augmented Generation) 工作流程,专门针对电影领域的问答任务。它使用 Pinecone 作为向量数据库存储电影相关文档的嵌入,结合本地或云端 LLM 实现自反思的 RAG 系统。与 langgraph_self_rag 类似,通过动态评估文档相关性和生成结果质量来优化答案,但其特色在于:
- 特定领域:聚焦电影相关数据(如剧情、演员、导演等)。
- Pinecone 集成:使用 Pinecone 作为云端向量数据库,提供高效的向量检索。
- 自反思机制:通过 LLM 评估文档和生成结果,确保答案准确性。
Pinecone 是一个托管的矢量数据库服务,允许用户创建和管理自己的索引来存储矢量嵌入。
其核心范式可以总结为:
- 状态管理:通过 GraphState 管理问题、文档、生成结果等状态。
- 节点与边:将 Self-RAG 流程分解为检索、文档评估、生成和反思节点,通过条件边控制动态流程。
- 自反思:评估文档相关性和生成质量,动态调整检索或生成。
- Pinecone 向量检索:利用 Pinecone 的云端向量数据库进行高效文档检索。
要注意的是:
- 需要注册Pinecone的APIKey
- 需要在Pinecone网站上新建Index:sample-movies ,并配置对应内容
2. LangGraph 中 Self-RAG-pinecone_movies 的实现逻辑
在 LangGraph 的 examples/rag/langgraph_self_rag_pinecone_movies.ipynb

下图是原来的Self-RAG 的,没有区别的么。

3. LangGraph Self-RAG_pinecone_movies 示例代码解析
以下结合代码片段说明其工作原理。
3.1 定义图状态(GraphState)
作用:GraphState 定义了工作流程的状态,贯穿所有节点。
字段解析:
- question:用户输入的问题(如“哪部电影由 Christopher Nolan 导演?”)。
- generation:LLM 生成的答案。
- documents:从 Pinecone 检索到的电影相关文档。
- is_relevant:文档是否与问题相关(布尔值)。
- needs_reflection:生成结果是否需要校正(布尔值)。
- steps:记录执行步骤,便于调试。
与 langgraph_self_rag 相同:状态定义一致,两者共享 Self-RAG 的核心逻辑。
from typing import List
from typing_extensions import TypedDict
class GraphState(TypedDict):
question: str
generation: str
documents: List[str]
is_relevant: bool
needs_reflection: bool
steps: List[str]
3.2 创建索引
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
# use pinecone movies database
# Add to vectorDB
vectorstore = PineconeVectorStore(
embedding=OpenAIEmbeddings(),
index_name="sample-movies",
text_key="summary",
)
retriever = vectorstore.as_retriever()
docs = retriever.invoke("James Cameron")
for doc in docs:
print("# " + doc.metadata["title"])
print(doc.page_content)
print()
3.3 定义工作流节点
功能:使用 Pinecone 向量数据库检索与问题相关的电影文档。
逻辑:
- 将问题转换为嵌入向量(使用嵌入模型,如 sentence-transformers 或 OpenAI 嵌入)。
- 查询 Pinecone 索引,返回 top_k 个最相关的文档。
- 从 Pinecone 的元数据中提取文本内容。
输出:更新 documents 和 steps。
特点:使用 Pinecone 的云端向量存储,支持高效、可扩展的检索。
### Retrieval Grader
from langchain import hub
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI
# Data model
class GradeDocuments(BaseModel):
"""Binary score for relevance check on retrieved documents."""
binary_score: str = Field(
description="Documents are relevant to the question, 'yes' or 'no'"
)
# https://smith.langchain.com/hub/efriis/self-rag-retrieval-grader
grade_prompt = hub.pull("efriis/self-rag-retrieval-grader")
# LLM with function call
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
structured_llm_grader = llm.with_structured_output(GradeDocuments)
retrieval_grader = grade_prompt | structured_llm_grader
# Test the retrieval grader
question = "movies starring jason momoa"
docs = retriever.invoke(question)
doc_txt = docs[0].page_content
print(doc_txt)
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))
- 文档评分节点(gradeDocuments):
功能:使用 LLM 评估检索到的文档是否与问题相关。
逻辑:
- 对每个文档调用 LLM,生成相关性判断(relevant 或 irrelevant)。
- 保留相关文档,丢弃不相关文档。
- 设置 is_relevant 字段,表示是否有相关文档。
输出:更新 documents、is_relevant 和 steps。
与 langgraph_self_rag 一致:评估逻辑相同,但文档内容专注于电影领域。
def grade_documents(state):
""" Determines whether the retrieved documents are relevant to the question. Args: state (dict): The current graph state Returns: state (dict): Updates documents key with only filtered relevant documents """print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
question = state["question"]
documents = state["documents"]
# Score each doc
filtered_docs = []
for d in documents:
score = retrieval_grader.invoke(
{"question": question, "document": d.page_content}
)
grade = score.binary_score
if grade == "yes":
print("---GRADE: DOCUMENT RELEVANT---")
filtered_docs.append(d)
else:
print("---GRADE: DOCUMENT NOT RELEVANT---")
continue
return {"documents": filtered_docs, "question": question}
- 生成节点(generate):
功能:根据检索到的文档和问题生成答案。
逻辑:使用预定义的 RAG 提示模板调用 LLM。
输出:更新 generation 和 steps。
与 CRAG 相似:生成逻辑类似,但 Self-RAG 会在后续节点评估生成质量。
def generate(state):
""" Generate answer Args: state (dict): The current graph state Returns: state (dict): New key added to state, generation, that contains LLM generation """print("---GENERATE---")
question = state["question"]
documents = state["documents"]
# RAG generation
generation = rag_chain.invoke({"context": documents, "question": question})
return {"documents": documents, "question": question, "generation": generation}
- reflect_on_generation(生成结果反思):
功能:使用 LLM 评估生成结果的质量,检查是否包含幻觉或与文档不一致。
逻辑:
- LLM 比较生成结果与文档内容,判断是否需要校正(needs_reflection = True)。
- 评估可能基于预定义提示,如“生成内容是否准确引用了文档?”。
输出:更新 needs_reflection 和 steps。
与 CRAG 的区别:CRAG 没有生成结果反思节点,Self-RAG 的自反思是其核心特性。
def reflect_on_generation(state):
question = state["question"]
documents = state["documents"]
generation = state["generation"]
result = llm.invoke([...]) # 使用 LLM 评估生成结果
needs_reflection = result.content.lower() == "needs_reflection"
return {
"needs_reflection": needs_reflection,
"steps": state["steps"] + ["reflect_on_generation"]
}
- 决定生成
如果 is_relevant = True,进入生成阶段。
否则,重新检索文档。
def decide_to_generate(state):
""" Determines whether to generate an answer, or re-generate a question. Args: state (dict): The current graph state Returns: str: Binary decision for next node to call """print("---ASSESS GRADED DOCUMENTS---")
state["question"]
filtered_documents = state["documents"]
if not filtered_documents:
# All documents have been filtered check_relevance
# We will re-generate a new query
print(
"---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---"
)
return "transform_query"
else:
# We have relevant documents, so generate answer
print("---DECISION: GENERATE---")
return "generate"
3.4 构建图结构
流程逻辑:
from langgraph.graph import END, StateGraph, START
workflow = StateGraph(GraphState)
# Define the nodes
workflow.add_node("retrieve", retrieve) # retrieve
workflow.add_node("grade_documents", grade_documents) # grade documents
workflow.add_node("generate", generate) # generatae
workflow.add_node("transform_query", transform_query) # transform_query
# Build graph
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
"grade_documents",
decide_to_generate,
{
"transform_query": "transform_query",
"generate": "generate",
},
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
"generate",
grade_generation_v_documents_and_question,
{
"not supported": "generate",
"useful": END,
"not useful": "transform_query",
},
)
# Compile
app = workflow.compile()
Self-RAG_pinecone_movies 设计体现了以下关键范式:
3.3 Pinecone 集成
- 云端向量存储:使用 Pinecone 提供高效、分布式的向量检索,适合大规模电影数据。
- 元数据支持:Pinecone 的元数据功能存储电影信息(如标题、导演、剧情),便于结构化检索。
- 可扩展性:Pinecone 的云端架构支持动态扩展,适合生产环境。
3.4 领域聚焦
- 电影领域:文档和生成内容专注于电影相关信息,优化了领域特定任务的准确性。
- 定制提示:提示模板可能针对电影问题优化(如“提取导演信息”或“总结剧情”)。
附上全部源代码
RA/SD 衍生者AI训练营。发布者:稻草人,转载请注明出处:https://www.shxcj.com/archives/9600