import os
import asyncio
from typing import List, Optional, Dict, Type
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import Tool
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.memory import ConversationBufferMemory
## --- 0. 配置和设置 ---
## 从 .env 文件加载 OPENAI_API_KEY。
load_dotenv()
## ChatOpenAI 客户端自动从环境中获取 API 密钥。
llm = ChatOpenAI(temperature=0.5, model="gpt-4o-mini")
## --- 1. 任务管理系统 ---
class Task(BaseModel):
"""表示系统中的单个任务。"""
id: str
description: str
priority: Optional[str] = None # P0, P1, P2
assigned_to: Optional[str] = None # 工作人员的名字
class SuperSimpleTaskManager:
"""一个高效且稳健的内存任务管理器。"""
def __init__(self):
# 使用字典实现 O(1) 查找、更新和删除。
self.tasks: Dict[str, Task] = {}
self.next_task_id = 1
def create_task(self, description: str) -> Task:
"""创建并存储一个新任务。"""
task_id = f"TASK-{self.next_task_id:03d}"
new_task = Task(id=task_id, description=description)
self.tasks[task_id] = new_task
self.next_task_id += 1
print(f"DEBUG: 任务已创建 - {task_id}: {description}")
return new_task
def update_task(self, task_id: str, **kwargs) -> Optional[Task]:
"""使用 Pydantic 的 model_copy 安全地更新任务。"""
task = self.tasks.get(task_id)
if task:
# 使用 model_copy 进行类型安全的更新。
update_data = {k: v for k, v in kwargs.items() if v is not None}
updated_task = task.model_copy(update=update_data)
self.tasks[task_id] = updated_task
print(f"DEBUG: 任务 {task_id} 已更新为 {update_data}")
return updated_task
print(f"DEBUG: 未找到任务 {task_id} 进行更新。")
return None
def list_all_tasks(self) -> str:
"""列出系统中当前的所有任务。"""
if not self.tasks:
return "系统中没有任务。"
task_strings = []
for task in self.tasks.values():
task_strings.append(
f"ID: {task.id}, 描述: '{task.description}', "
f"优先级: {task.priority or 'N/A'}, "
f"分配给: {task.assigned_to or 'N/A'}"
)
return "当前任务:\n" + "\n".join(task_strings)
task_manager = SuperSimpleTaskManager()
## --- 2. 项目管理智能体工具 ---
## 使用 Pydantic 模型作为工具参数以获得更好的验证和清晰度。
class CreateTaskArgs(BaseModel):
description: str = Field(description="任务的详细描述。")
class PriorityArgs(BaseModel):
task_id: str = Field(description="要更新的任务 ID,例如 'TASK-001'。")
priority: str = Field(description="要设置的优先级。必须是以下之一:'P0'、'P1'、'P2'。")
class AssignWorkerArgs(BaseModel):
task_id: str = Field(description="要更新的任务 ID,例如 'TASK-001'。")
worker_name: str = Field(description="要分配任务的工作人员的名字。")
def create_new_task_tool(description: str) -> str:
"""使用给定的描述创建一个新的项目任务。"""
task = task_manager.create_task(description)
return f"已创建任务 {task.id}: '{task.description}'。"
def assign_priority_to_task_tool(task_id: str, priority: str) -> str:
"""为给定的任务 ID 分配优先级(P0、P1、P2)。"""
if priority not in ["P0", "P1", "P2"]:
return "优先级无效。必须是 P0、P1 或 P2。"
task = task_manager.update_task(task_id, priority=priority)
return f"已为任务 {task.id} 分配优先级 {priority}。" if task else f"未找到任务 {task_id}。"
def assign_task_to_worker_tool(task_id: str, worker_name: str) -> str:
"""将任务分配给特定的工作人员。"""
task = task_manager.update_task(task_id, assigned_to=worker_name)
return f"已将任务 {task.id} 分配给 {worker_name}。" if task else f"未找到任务 {task_id}。"
## PM 智能体使用的所有工具
pm_tools = [
Tool(
name="create_new_task",
func=create_new_task_tool,
description="首先使用此工具创建一个新任务并获取其 ID。",
args_schema=CreateTaskArgs
),
Tool(
name="assign_priority_to_task",
func=assign_priority_to_task_tool,
description="使用此工具在创建任务后为其分配优先级。",
args_schema=PriorityArgs
),
Tool(
name="assign_task_to_worker",
func=assign_task_to_worker_tool,
description="使用此工具在创建任务后将其分配给特定的工作人员。",
args_schema=AssignWorkerArgs
),
Tool(
name="list_all_tasks",
func=task_manager.list_all_tasks,
description="使用此工具列出所有当前任务及其状态。"
),
]
## --- 3. 项目管理智能体定义 ---
pm_prompt_template = ChatPromptTemplate.from_messages([
("system", """你是一个专注的项目管理 LLM 智能体。你的目标是高效地管理项目任务。
当你收到新的任务请求时,遵循以下步骤:
1. 首先,使用 `create_new_task` 工具创建具有给定描述的任务。你必须首先执行此操作以获取 `task_id`。
2. 接下来,分析用户的请求以查看是否提到了优先级或受让人。
- 如果提到优先级(例如,"紧急"、"ASAP"、"关键"),将其映射到 P0。使用 `assign_priority_to_task`。
- 如果提到工作人员,使用 `assign_task_to_worker`。
3. 如果缺少任何信息(优先级、受让人),你必须做出合理的默认分配(例如,分配 P1 优先级并分配给 'Worker A')。
4. 一旦任务完全处理完毕,使用 `list_all_tasks` 显示最终状态。
可用的工作人员:'Worker A'、'Worker B'、'Review Team'
优先级级别:P0(最高)、P1(中等)、P2(最低)
"""),
("placeholder", "{chat_history}"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}")
])
## 创建智能体执行器
pm_agent = create_react_agent(llm, pm_tools, pm_prompt_template)
pm_agent_executor = AgentExecutor(
agent=pm_agent,
tools=pm_tools,
verbose=True,
handle_parsing_errors=True,
memory=ConversationBufferMemory(memory_key="chat_history", return_messages=True)
)
## --- 4. 简单交互流程 ---
async def run_simulation():
print("--- 项目管理模拟 ---")
# 场景 1:处理新的紧急功能请求
print("\n[用户请求] 我需要尽快实现一个新的登录系统。它应该分配给 Worker B。")
await pm_agent_executor.ainvoke({"input": "创建一个实现新登录系统的任务。这很紧急,应该分配给 Worker B。"})
print("\n" + "-"*60 + "\n")
# 场景 2:处理细节较少的不太紧急的内容更新
print("[用户请求] 我们需要审查营销网站内容。")
await pm_agent_executor.ainvoke({"input": "管理一个新任务:审查营销网站内容。"})
print("\n--- 模拟完成 ---")
## 运行模拟
if __name__ == "__main__":
asyncio.run(run_simulation())