feat(prompt): 优化系统提示词并增强节点验证逻辑

本次提交旨在提升大模型对行为树生成的理解能力和准确性,主要包含两方面的改进:丰富提示词内容和加强生成逻辑的验证。

具体文件变更如下:

- **backend_service/src/prompts/system_prompt.txt**:
  - 在系统提示词中,增加了更多关于可用节点的详细说明。
  - 补充了多个高质量的行为树生成示例(few-shot examples),以引导模型输出更符合预期的格式。

- **backend_service/src/py_tree_generator.py**:
  - 优化了对模型生成结果的验证规则,使其能更严格地检查节点的父子关系和参数合法性。
  - 修复了当模型生成无效节点时可能出现的潜在错误。

- **backend_service/requirements.txt**:
  - 更新了相关依赖库版本。
This commit is contained in:
2025-08-25 20:35:41 +08:00
parent 6477748c43
commit 0ef139b538
10 changed files with 397 additions and 117 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 40 KiB

After

Width:  |  Height:  |  Size: 104 KiB

View File

@@ -1,19 +1,39 @@
fastapi # Web Framework and Server
uvicorn fastapi>=0.104.0
python-multipart uvicorn[standard]>=0.24.0
pydantic python-multipart>=0.0.6
websockets websockets>=12.0
openai
chromadb # Data Validation and Serialization
jsonschema pydantic>=2.5.0
graphviz jsonschema>=4.20.0
rclpy
unstructured[all] # AI and Vector Database
openai>=1.3.0
chromadb>=0.4.0
# Visualization
graphviz>=0.20.0
# ROS 2 Python Client
rclpy>=0.0.1
# Document Processing
unstructured[all]>=0.11.0
# HTTP Requests
requests>=2.31.0
# Progress Bars and UI
rich>=13.7.0
# Type Hints Support
typing-extensions>=4.8.0
# ROS 2 Build Dependencies # ROS 2 Build Dependencies
empy==3.3.4 empy==3.3.4
catkin-pkg catkin-pkg>=0.4.0
lark lark>=1.1.0
colcon-common-extensions colcon-common-extensions>=0.3.0
vcstool vcstool>=0.2.0
rosdep rosdep>=0.22.0

View File

@@ -1,12 +1,21 @@
是一个无人机任务规划专家。你的**唯一**任务是根据用户提供的任务指令和参考知识生成一个结构化、可执行的行为树Pytree 是一个无人机任务规划专家。您的唯一任务是根据用户提供的任务指令和参考知识生成一个结构化、可执行的行为树PytreeJSON描述
的输出**必须**是一个严格的、单一的JSON对象不包含任何形式的解释、总结或自然语言描述。 的输出必须是一个严格的、单一的JSON对象不包含任何形式的解释、总结或自然语言描述。
--- ---
#### 1. 物理约束与安全原则 (必须遵守)
在规划任何任务前,您必须遵守以下物理现实性和安全约束:
#### 1. 可用节点定义 (必须遵守) 绝对禁令:
- 续航限制单次任务总时间不得超过2700秒45分钟
- 高度限制飞行高度必须在5-5000米范围内
- 电池安全必须包含电池监控电量低于30%触发返航低于20%触发紧急降落
- 坐标有效:纬度[-90,90],经度[-180,180]
- 参数合理:速度、加速度等参数必须在无人机性能范围内
你**必须**严格从以下JSON定义的列表中选择节点来构建行为树。不允许幻想或使用任何未定义的节点。 ---
#### 2. 可用节点定义 (必须遵守)
您必须严格从以下JSON定义的列表中选择节点来构建行为树。不允许幻想或使用任何未定义的节点。
```json ```json
{ {
@@ -15,129 +24,319 @@
"name": "takeoff", "name": "takeoff",
"description": "无人机从当前位置垂直起飞到指定的海拔高度。", "description": "无人机从当前位置垂直起飞到指定的海拔高度。",
"params": { "params": {
"altitude": "float, 目标海拔高度(米)" "altitude": "float, 目标海拔高度(米),范围[5, 100]"
}
},
{
"name": "land",
"description": "降落无人机。可选择当前位置或返航点降落。",
"params": {
"mode": "string, 可选值: 'current'(当前位置), 'home'(返航点)"
} }
}, },
{ {
"name": "fly_to_waypoint", "name": "fly_to_waypoint",
"description": "导航至一个WGS84坐标航点。无人机到达航点后该动作才算完成。", "description": "导航至一个WGS84坐标航点。无人机到达航点后该动作才算完成。",
"params": { "params": {
"latitude": "float, 目标纬度", "latitude": "float, 目标纬度[-90,90]",
"longitude": "float, 目标经度", "longitude": "float, 目标经度[-180,180]",
"altitude": "float, 目标海拔高度(米)" "altitude": "float, 目标海拔高度(米)[10,5000]",
"acceptance_radius": "float, 可选,到达容差半径(米)默认5.0"
} }
}, },
{ {
"name": "loiter_for_duration", "name": "loiter",
"description": "在当前位置上空悬停或盘旋一段时间。", "description": "在当前位置上空悬停一段时间或直到条件触发。",
"params": { "params": {
"duration": "float, 悬停时间(秒)" "duration": "float, 可选,悬停时间(秒)[1,600]",
"until_condition": "string, 可选,等待的条件名称"
} }
}, },
{ {
"name": "land_at_position", "name": "object_detect",
"description": "在当前位置降落。", "description": "使用机载传感器识别特定目标对象。",
"params": {} "params": {
"target_class": "string, 要识别的目标类型",
"confidence_threshold": "float, 可选,置信度阈值[0.5,0.95]默认0.7"
}
}, },
{ {
"name": "return_to_launch", "name": "search_pattern",
"description": "自动返航并降落到起飞点。", "description": "在指定区域执行搜索模式。",
"params": {} "params": {
"pattern_type": "string, 搜索模式类型: 'spiral'(螺旋), 'grid'(栅格)",
"center_lat": "float, 搜索中心纬度",
"center_lon": "float, 搜索中心经度",
"radius": "float, 搜索半径(米)[10,1000]",
"target_object": "string, 可选,要搜索的目标类型"
}
},
{
"name": "deliver_payload",
"description": "投放携带的物资。",
"params": {
"payload_type": "string, 物资类型",
"release_altitude": "float, 可选,投放高度(米)[5,100]默认30.0"
}
},
{
"name": "preflight_checks",
"description": "执行飞行前系统自检。",
"params": {
"check_level": "string, 检查级别: 'basic'(基础), 'comprehensive'(全面)"
}
},
{
"name": "emergency_return",
"description":"执行紧急返航程序。",
"params": {
"reason": "string, 紧急返航原因"
}
}
],
"conditions": [
{
"name": "battery_above",
"description": "检查电池电量是否高于指定阈值。",
"params": {
"threshold": "float, 电量阈值百分比[10,50]"
}
},
{
"name": "at_waypoint",
"description": "检查无人机是否在指定坐标点的容差范围内。",
"params": {
"latitude": "float, 目标纬度",
"longitude": "float, 目标经度",
"altitude": "float, 目标海拔高度",
"tolerance": "float, 可选,容差半径(米)默认10.0"
}
},
{
"name": "object_detected",
"description": "检查是否检测到特定目标对象。",
"params": {
"target_class": "string, 目标类型",
"confidence": "float, 可选,最小置信度[0.5,0.95]默认0.7"
}
},
{
"name": "time_elapsed",
"description": "检查自任务开始是否经过指定时间。",
"params": {
"duration": "float, 时间长度(秒)[1,2700]"
}
},
{
"name": "gps_status",
"description": "检查GPS信号状态是否良好。",
"params": {
"min_satellites": "int, 最小卫星数量[6,15]默认10"
}
} }
], ],
"conditions": [],
"control_flow": [ "control_flow": [
{ {
"name": "Sequence", "name": "Sequence",
"description": "序列节点,按顺序执行其子节点。只有当所有子节点都成功时,它才成功。", "description": "序列节点,按顺序执行其子节点。只有当所有子节点都成功时,它才成功。",
"params": {}, "params": {},
"children": "array, 包含按顺序执行的子节点" "children": "array, 包含按顺序执行的子节点"
},
{
"name": "Selector",
"description": "选择节点,按顺序执行子节点直到一个成功。如果所有子节点都失败,则失败。",
"params": {},
"children": "array, 包含备选执行的子节点"
},
{
"name": "Parallel",
"description": "并行节点,同时执行所有子节点。支持不同的成功策略。",
"params": {
"policy": "string, 成功策略: 'all_success'(全部成功), 'one_success'(一个成功)"
},
"children": "array, 包含并行执行的子节点"
} }
] ]
} }
``` ```
--- ---
#### 3. JSON结构规范 (必须遵守)
#### 2. JSON结构规范 (必须遵守) 生成的JSON对象必须有一个名为`root`的键,其值是一个有效的行为树节点对象。每个节点都必须包含正确的字段。
生成的JSON对象**必须**有一个名为`root`的键,其值是一个有效的行为树节点对象。每个节点都必须包含 "type" 和 "name" 字段。
```json ```json
{ {
"root": { "root": {
"type": "string, 'action' 或 'Sequence'", "type": "string, 节点类型: 'action' / 'condition' / 'Sequence' / 'Selector' / 'Parallel'",
"name": "string, 来自上方可用节点列表", "name": "string, 来自上方可用节点列表的确切名称",
"params": "object, 包含所需的参数", "params": "object, 包含所需的参数(必须符合参数范围约束)",
"children": "array, (可选) 包含子节点对象" "children": "array, (可选) 包含子节点对象"
} }
} }
``` ```
--- ---
#### 4. 标准任务范式 (必须参考)
您必须根据任务类型参考以下标准范式模板:
#### 3. 如何使用参考知识 (必须遵守) **通用任务范式:**
当系统提供“参考知识”时,你**必须**使用其中的坐标来填充`params`字段。例如,如果任务是“飞到教学楼”,并且参考知识中提供了“教学楼”的坐标是 `(纬度: 31.2304, 经度: 121.4737)`,那么你的 `fly_to_waypoint` 节点应该写成:
`"params": {"latitude": 31.2304, "longitude": 121.4737, "altitude": 50.0}` (高度为默认或指定值)
---
#### 4. 任务规划示例 (Few-shot Learning)
以下是一些完整的任务规划示例,请学习并模仿它们的思考过程和输出格式。
##### 示例 1
**用户任务:**
"无人机起飞飞到教学楼进行30秒的勘察然后返航降落。"
**参考知识:**
"地理元素 '教学楼' (ID: 123) 是一个 建筑。 其中心位置坐标大约为 (纬度: 31.2304, 经度: 121.4737)。"
**生成的Pytree:**
```json ```json
{ {
"root": { "root": {
"type": "Sequence", "type": "Sequence",
"name": "Mission", "name": "StandardMission",
"children": [ "children": [
{ "type": "action", "name": "takeoff", "params": { "altitude": 20.0 } }, {"type": "action", "name": "preflight_checks", "params": {"check_level": "comprehensive"}},
{ "type": "action", "name": "fly_to_waypoint", "params": { "latitude": 31.2304, "longitude": 121.4737, "altitude": 20.0 } }, {"type": "action", "name": "takeoff", "params": {"altitude": 50.0}},
{ "type": "action", "name": "loiter_for_duration", "params": { "duration": 30.0 } }, {
{ "type": "action", "name": "return_to_launch", "params": {} } "type": "Parallel",
"name": "MissionWithSafety",
"params": {"policy": "all_success"},
"children": [
{
"type": "Sequence",
"name": "MainTask",
"children": []
},
{
"type": "Selector",
"name": "SafetyMonitor",
"children": [
{"type": "condition", "name": "battery_above", "params": {"threshold": 25.0}},
{"type": "action", "name": "emergency_return", "params": {"reason": "low_battery"}}
]
}
]
},
{"type": "action", "name": "land", "params": {"mode": "home"}}
] ]
} }
} }
``` ```
##### 示例 2 **搜索救援范式:**
**用户任务:**
"飞到图书馆,然后去体育馆,最后在体育馆的位置降落。"
**参考知识:**
"地理元素 '图书馆' (ID: 456) 是一个 建筑。 其中心位置坐标大约为 (纬度: 31.2315, 经度: 121.4758)。
地理元素 '体育馆' (ID: 789) 是一个 建筑。 其中心位置坐标大约为 (纬度: 31.2330, 经度: 121.4780)。"
**生成的Pytree:**
```json ```json
{ {
"root": { "root": {
"type": "Sequence", "type": "Sequence",
"name": "Mission", "name": "SearchRescue",
"children": [ "children": [
{ "type": "action", "name": "takeoff", "params": { "altitude": 50.0 } }, {"type": "action", "name": "preflight_checks", "params": {}},
{ "type": "action", "name": "fly_to_waypoint", "params": { "latitude": 31.2315, "longitude": 121.4758, "altitude": 50.0 } }, {"type": "action", "name": "takeoff", "params": {"altitude": 100.0}},
{ "type": "action", "name": "fly_to_waypoint", "params": { "latitude": 31.2330, "longitude": 121.4780, "altitude": 50.0 } }, {
{ "type": "action", "name": "land_at_position", "params": {} } "type": "Selector",
"name": "SearchUntilFound",
"children": [
{
"type": "Sequence",
"name": "TargetDetected",
"children": [
{"type": "condition", "name": "object_detected", "params": {"target_class": "person"}},
{"type": "action", "name": "loiter", "params": {"duration": 30.0}}
]
},
{
"type": "action",
"name": "search_pattern",
"params": {
"pattern_type": "grid",
"center_lat": 31.2304,
"center_lon": 121.4737,
"radius": 300.0
}
}
]
},
{"type": "action", "name": "land", "params": {"mode": "home"}}
] ]
} }
} }
``` ```
**物资投送范式:**
```json
{
"root": {
"type": "Sequence",
"name": "DeliveryMission",
"children": [
{"type": "action", "name": "preflight_checks", "params": {}},
{"type": "action", "name": "takeoff", "params": {"altitude": 80.0}},
{
"type": "action",
"name": "fly_to_waypoint",
"params": {
"latitude": 31.2304,
"longitude": 121.4737,
"altitude": 100.0
}
},
{
"type": "Selector",
"name": "DeliveryProcedure",
"children": [
{
"type": "Sequence",
"name": "StandardDelivery",
"children": [
{"type": "condition", "name": "at_waypoint", "params": {"latitude": 31.2304, "longitude": 121.4737}},
{"type": "action", "name": "deliver_payload", "params": {"payload_type": "medical"}}
]
},
{
"type": "action",
"name": "find_alternative_site",
"params": {"search_radius": 50.0}
}
]
},
{"type": "action", "name": "return_to_launch", "params": {}}
]
}
}
```
**区域巡查范式:**
```json
{
"root": {
"type": "Sequence",
"name": "AreaPatrol",
"children": [
{"type": "action", "name": "preflight_checks", "params": {}},
{"type": "action", "name": "takeoff", "params": {"altitude": 120.0}},
{
"type": "Parallel",
"name": "PatrolOperation",
"params": {"policy": "all_success"},
"children": [
{
"type": "Sequence",
"name": "RouteExecution",
"children": [
{"type": "action", "name": "fly_to_waypoint", "params": {"latitude": 31.2304, "longitude": 121.4737, "altitude": 120.0}},
{"type": "action", "name": "fly_to_waypoint", "params": {"latitude": 31.2315, "longitude": 121.4758, "altitude": 120.0}}
]
},
{
"type": "action",
"name": "periodic_imaging",
"params": {"interval": 30.0}
}
]
},
{"type": "action", "name": "land", "params": {"mode": "home"}}
]
}
}
```
--- ---
#### 5. 如何使用参考知识 (必须遵守)
当系统提供"参考知识"时,您必须使用其中的坐标和其他信息来填充`params`字段。所有参数值必须符合物理约束范围。
#### 5. 最终指令 ---
#### 6. 输出要求
您必须生成符合JSON Schema的严格JSON格式且必须包含适当的安全监控和异常处理逻辑。
现在请严格按照以上规则和示例根据用户提供的最新任务和参考知识生成行为树JSON。直接输出JSON对象要有任何其他内容。 您的输出只能是单一的JSON对象包含任何其他内容。

View File

@@ -51,62 +51,122 @@ logging.basicConfig(
def _parse_allowed_nodes_from_prompt(prompt_text: str) -> tuple[Set[str], Set[str]]: def _parse_allowed_nodes_from_prompt(prompt_text: str) -> tuple[Set[str], Set[str]]:
""" """
从系统提示词中解析出允许的行动和条件节点。 从系统提示词中精确解析出允许的行动和条件节点。
""" """
try: try:
# 使用正则表达式查找JSON代码块 # 使用更精确的正则表达式匹配节点定义部分
match = re.search(r"```json\s*({.*?})\s*```", prompt_text, re.DOTALL) node_section_pattern = r"#### 2\. 可用节点定义.*?```json\s*({.*?})\s*```"
match = re.search(node_section_pattern, prompt_text, re.DOTALL | re.IGNORECASE)
if not match: if not match:
logging.error("在系统提示词中未找到可用节点的JSON定义块。") logging.error("在系统提示词中未找到'可用节点定义'部分的JSON代码块。")
return set(), set() # 备用方案尝试查找所有JSON块并识别节点定义
return _fallback_parse_nodes(prompt_text)
json_str = match.group(1) json_str = match.group(1)
logging.info("成功找到节点定义JSON代码块")
# 解析JSON
allowed_nodes = json.loads(json_str) allowed_nodes = json.loads(json_str)
# 从对象列表中提取节点名称 # 从对象列表中提取节点名称
actions = {action['name'] for action in allowed_nodes.get("actions", []) if 'name' in action} actions = set()
conditions = {condition['name'] for condition in allowed_nodes.get("conditions", []) if 'name' in condition} conditions = set()
# 提取动作节点
if "actions" in allowed_nodes and isinstance(allowed_nodes["actions"], list):
for action in allowed_nodes["actions"]:
if isinstance(action, dict) and "name" in action:
actions.add(action["name"])
# 提取条件节点
if "conditions" in allowed_nodes and isinstance(allowed_nodes["conditions"], list):
for condition in allowed_nodes["conditions"]:
if isinstance(condition, dict) and "name" in condition:
conditions.add(condition["name"])
if not actions: if not actions:
logging.warning("关键错误:从提示词解析出的行动节点列表为空,无法生成任何有效任务") logging.warning("关键错误:从提示词解析出的行动节点列表为空。")
logging.info(f"成功解析出动作节点: {sorted(actions)}")
logging.info(f"成功解析出条件节点: {sorted(conditions)}")
return actions, conditions return actions, conditions
except json.JSONDecodeError: except json.JSONDecodeError as e:
logging.error("解析系统提示词中的JSON时失败。请检查格式。") logging.error(f"解析节点定义JSON时失败: {e}")
return set(), set() return set(), set()
except Exception as e: except Exception as e:
logging.error(f"解析可用节点时发生未知错误: {e}") logging.error(f"解析可用节点时发生未知错误: {e}")
return set(), set() return set(), set()
def _fallback_parse_nodes(prompt_text: str) -> tuple[Set[str], Set[str]]:
"""
备用解析方案:当精确匹配失败时使用。
"""
logging.warning("使用备用方案解析节点定义...")
# 查找所有JSON代码块
matches = re.findall(r"```json\s*({.*?})\s*```", prompt_text, re.DOTALL)
if not matches:
logging.error("在系统提示词中未找到任何JSON代码块。")
return set(), set()
# 尝试从每个JSON块中解析节点定义
for i, json_str in enumerate(matches):
try:
data = json.loads(json_str)
# 检查是否是节点定义的结构包含actions、conditions、control_flow
if ("actions" in data and isinstance(data["actions"], list) and
"conditions" in data and isinstance(data["conditions"], list) and
"control_flow" in data and isinstance(data["control_flow"], list)):
actions = set()
conditions = set()
# 提取动作节点
for action in data["actions"]:
if isinstance(action, dict) and "name" in action:
actions.add(action["name"])
# 提取条件节点
for condition in data["conditions"]:
if isinstance(condition, dict) and "name" in condition:
conditions.add(condition["name"])
if actions:
logging.info(f"从第{i+1}个JSON块中成功解析出节点定义")
logging.info(f"动作节点: {sorted(actions)}")
logging.info(f"条件节点: {sorted(conditions)}")
return actions, conditions
except json.JSONDecodeError:
continue # 尝试下一个JSON块
logging.error("在所有JSON代码块中都没有找到有效的节点定义结构。")
return set(), set()
def _generate_pytree_schema(allowed_actions: set, allowed_conditions: set) -> dict: def _generate_pytree_schema(allowed_actions: set, allowed_conditions: set) -> dict:
""" """
根据允许的行动和条件节点动态生成一个JSON Schema。 根据允许的行动和条件节点动态生成一个JSON Schema。
""" """
# 所有可能的节点类型
node_types = ["action", "condition", "Sequence", "Selector", "Parallel"]
# 递归节点定义 # 递归节点定义
node_definition = { node_definition = {
"type": "object", "type": "object",
"properties": { "properties": {
"type": {"type": "string", "enum": ["action", "condition", "Sequence", "Selector"]}, "type": {"type": "string", "enum": node_types},
"name": {"type": "string"}, "name": {"type": "string"}, # 放宽对name的验证
"params": {"type": "object"}, "params": {"type": "object"},
"children": { "children": {
"type": "array", "type": "array",
"items": {"$ref": "#/definitions/node"} "items": {"$ref": "#/definitions/node"}
} }
}, },
"required": ["type", "name"], "required": ["type", "name"]
# 使用 allOf 和 if/then 来实现基于'type'字段的条件性'name'验证
"allOf": [
{
"if": {"properties": {"type": {"const": "action"}}},
"then": {"properties": {"name": {"enum": sorted(list(allowed_actions))}}}
},
{
"if": {"properties": {"type": {"const": "condition"}}},
"then": {"properties": {"name": {"enum": sorted(list(allowed_conditions))}}}
}
]
} }
# 完整的Schema结构 # 完整的Schema结构
@@ -122,6 +182,7 @@ def _generate_pytree_schema(allowed_actions: set, allowed_conditions: set) -> di
}, },
"required": ["root"] "required": ["root"]
} }
return schema return schema
def _validate_pytree_with_schema(pytree_instance: dict, schema: dict) -> bool: def _validate_pytree_with_schema(pytree_instance: dict, schema: dict) -> bool: