从高层次上讲,sql agent智能体做的内容:
从数据库中获取可用表
确定哪些表与问题相关
获取相关表的架构
根据问题和架构中的信息生成查询
使用 LLM 仔细检查查询中是否存在常见错误
执行查询并返回结果
纠正数据库引擎发现的错误,直到查询成功
根据结果制定响应

定义状态(State)
LangGraph 的工作流需要定义一个状态对象,用于在节点之间传递信息。SQL Agent 的状态可能包括:
- 用户的自然语言问题。
- 生成的SQL查询。
- 查询结果。
- 最终答案。
from typing import TypedDict, List
class SQLAgentState(TypedDict):
user_query: str # 用户的自然语言问题
sql_query: str # 生成的SQL查询
query_result: List[dict] # 查询结果
final_answer: str # 最终答案
error: str # 错误信息(如果有)
(3) 定义工具(Tools)
SQL Agent 的核心是与数据库交互的工具,例如执行SQL查询、获取表结构等。

数据库连接
engine = create_engine("sqlite:///example.db") # 示例使用 SQLite 数据库
@tool
def execute_sql(sql_query: str) -> List[dict]:
"""执行SQL查询并返回结果"""
try:
with engine.connect() as connection:
result = connection.execute(text(sql_query))
# 将结果转为字典列表
columns = result.keys()
return [dict(zip(columns, row)) for row in result.fetchall()]
except Exception as e:
return [{"error": str(e)}]
@tool
def get_schema() -> str:
"""获取数据库表结构"""
try:
with engine.connect() as connection:
# 示例:获取所有表名
tables = connection.execute(text("SELECT name FROM sqlite_master WHERE type='table';")).fetchall()
schema_info = "Tables in database:\n"
for table in tables:
table_name = table[0]
schema_info += f"Table: {table_name}\n"
# 获取表的列信息
columns = connection.execute(text(f"PRAGMA table_info({table_name});")).fetchall()
schema_info += "Columns:\n"
for col in columns:
schema_info += f"- {col[1]} ({col[2]})\n"
return schema_info
except Exception as e:
return f"Error: {str(e)}"
(4) 定义SQL Agent
SQL Agent 使用语言模型将用户问题转换为SQL查询,并将查询结果转为自然语言答案。
定义提示模板
sql_prompt = PromptTemplate(
input_variables=["user_query", "schema"],
template="""
你是一个SQL专家。用户的问题是:{user_query}
数据库的表结构如下:
{schema}
根据用户问题,生成一个SQL查询语句。
只返回SQL查询语句,不包含其他说明。
"""
)
answer_prompt = PromptTemplate(
input_variables=["user_query", "query_result"],
template="""
用户的问题是:{user_query}
SQL查询结果如下:
{query_result}
根据查询结果,用自然语言回答用户的问题。
"""
)
# 初始化语言模型
llm = ChatOpenAI(model="gpt-4o", temperature=0) # 使用支持文本推理的模型
sql_generator = llm.bind_tools([execute_sql, get_schema])
answer_generator = llm.bind_tools([])
(5) 定义LangGraph工作流
- schema_node:获取数据库表结构。
- sql_node:生成SQL查询。
- execute_node:执行SQL查询。
- answer_node:生成最终答案。
定义图
workflow = StateGraph(SQLAgentState)
# 节点:获取表结构
def schema_node(state: SQLAgentState) -> SQLAgentState:
schema = get_schema()
state["schema"] = schema
return state
# 节点:生成SQL查询
def sql_node(state: SQLAgentState) -> SQLAgentState:
sql_query = sql_generator.invoke({
"user_query": state["user_query"],
"schema": state["schema"]
}).content
state["sql_query"] = sql_query
return state
# 节点:执行SQL查询
def execute_node(state: SQLAgentState) -> SQLAgentState:
result = execute_sql(state["sql_query"])
if "error" in result[0]:
state["error"] = result[0]["error"]
else:
state["query_result"] = result
return state
# 节点:生成最终答案
def answer_node(state: SQLAgentState) -> SQLAgentState:
if state.get("error"):
state["final_answer"] = f"执行SQL查询时出错:{state['error']}"
else:
final_answer = answer_generator.invoke({
"user_query": state["user_query"],
"query_result": state["query_result"]
}).content
state["final_answer"] = final_answer
return state
# 添加节点
workflow.add_node("schema", schema_node)
workflow.add_node("sql", sql_node)
workflow.add_node("execute", execute_node)
workflow.add_node("answer", answer_node)
# 添加边
workflow.set_entry_point("schema")
workflow.add_edge("schema", "sql")
workflow.add_edge("sql", "execute")
workflow.add_edge("execute", "answer")
workflow.add_edge("answer", END)
# 编译图
app = workflow.compile()
(6) 运行示例
- 初始化:连接数据库(如 SQLite),加载表结构。
- 生成SQL:Agent 根据用户问题和表结构,生成SQL查询。
- 执行查询:通过 SQLAlchemy 执行SQL查询,获取结果。
- 生成答案:将查询结果转为自然语言答案,返回给用户。
初始化状态
initial_state = {
"user_query": "有多少用户注册?",
"sql_query": "",
"query_result": [],
"final_answer": "",
"error": ""
}
# 运行工作流
result = app.invoke(initial_state)
print("最终答案:", result["final_answer"])
Paragoger衍生者AI训练营。发布者:稻草人,转载请注明出处:https://www.shxcj.com/archives/9656