3 Commits

Author SHA1 Message Date
d32520d83f 增加输出数量约束 2025-09-21 22:33:54 +08:00
afd170c451 优化提示词 2025-09-21 22:12:21 +08:00
fd89745950 新增说明 2025-09-21 01:16:33 +08:00
18 changed files with 382 additions and 21 deletions

View File

@@ -19,6 +19,7 @@
│ │ │ └── classifier_prompt.txt # 指令简单/复杂分类提示词
│ │ ├── ...
│ ├── generated_visualizations/ # 存放最新生成的py_tree可视化图像
│ ├── generated_reasoning_content/ # 存放最新推理链Markdown<plan_id>.md
│ └── requirements.txt # 后端服务的Python依赖
├── tools/
@@ -26,7 +27,8 @@
│ ├── knowledge_base/ # 【处理后】存放build_knowledge_base.py生成的.ndjson文件
│ ├── vector_store/ # 【数据库】存放最终的ChromaDB向量数据库
│ ├── build_knowledge_base.py # 【步骤1】用于将原始数据转换为自然语言知识
── ingest.py # 【步骤2】用于将自然语言知识摄入向量数据库
── ingest.py # 【步骤2】用于将自然语言知识摄入向量数据库
│ └── test_llama_server.py # 直接调用本地8081端口llama-server支持 --system / --system-file
├── / # ROS2接口定义 (保持不变)
└── docs/
@@ -95,6 +97,10 @@
通用API Key`OPENAI_API_KEY`
推理链捕获相关:
- `ENABLE_REASONING_CAPTURE`:是否允许模型返回含有 <think> 的原文以便捕获推理链;默认 true。
- `REASONING_PREVIEW_LINES`:在后端日志中打印推理链预览的行数;默认 20。
示例:
```bash
export CLASSIFIER_MODEL="qwen2.5-1.8b-instruct"
@@ -104,6 +110,10 @@ export CLASSIFIER_BASE_URL="http://$ORIN_IP:8081/v1"
export SIMPLE_BASE_URL="http://$ORIN_IP:8081/v1"
export COMPLEX_BASE_URL="http://$ORIN_IP:8081/v1"
export OPENAI_API_KEY="sk-no-key-required"
# 推理链捕获(可选)
export ENABLE_REASONING_CAPTURE=true # 默认已为true如需关闭设置为 false
export REASONING_PREVIEW_LINES=30 # 调整日志预览行数
```
### 测试简单模式
@@ -117,6 +127,22 @@ python test_api.py
示例输入:“简单模式,起飞” 或 “起飞到10米”。返回结果为简单JSON无 `root`):包含 `mode`、`action`、`plan_id`、`visualization_url`。
### 直接调用 llama-server绕过后端
当仅需测试本地 8081 端口的推理服务OpenAI 兼容接口)时,可使用内置脚本:
```bash
python tools/test_llama_server.py \
--system-file backend_service/src/prompts/system_prompt.txt \
--user "起飞到10米然后降落" \
--base-url "http://127.0.0.1:8081/v1" \
--verbose
```
说明:
- 支持 `--system` 或 `--system-file` 自定义提示词文件;`--system-file` 优先。
- 默认解析 OpenAI 风格返回,若包含 `<think>` 推理内容会显示在输出中(具体取决于模型和服务配置)。
---
## 工作流程

View File

@@ -0,0 +1,43 @@
好的我现在需要处理用户的任务指令生成一个符合要求的行为树JSON。首先我得仔细阅读用户的指令和参考知识确保理解所有要求。
用户的目标是搜索并锁定危险性最高的气球(红色>蓝色>绿色然后跟踪30秒后进行打击。参考知识中提到了三个地点但用户可能希望搜索的区域是这些地点附近或者需要根据这些坐标来规划路径。
首先,我需要确定无人机的起降点。参考知识中的地点坐标可能作为搜索区域的中心。比如,用户提到的“学生宿舍”坐标是(5,3,2)但可能需要选择一个合适的中心点进行搜索。不过用户没有明确指定搜索区域可能需要使用search_pattern来覆盖这些区域或者直接在某个中心点周围搜索。
接下来,任务流程应该是:起飞→飞往搜索区域→搜索目标→检测到目标后跟踪→打击。同时必须包含安全监控。
根据用户提供的参考知识,可能需要将搜索区域设置为某个中心点。比如,用户提到的“跷跷板”在(x:15, y:-8.5, z:1.2)但z坐标需要调整到至少1米这里已经是1.2,没问题。或者可能选择其他地点作为搜索中心。但用户没有明确说明,可能需要假设搜索区域是这些地点附近,或者使用一个综合的中心点。
不过用户可能希望无人机先飞往某个特定的搜索区。比如参考知识中的“学生宿舍”可能是一个可能的区域但需要确认。或者用户可能希望无人机在某个中心点周围进行搜索。例如使用search_pattern的中心点可能选在某个已知地点比如“跷跷板”附近或者综合多个点。
但用户没有明确指定搜索区域所以可能需要使用search_pattern的中心点为某个已知地点比如“跷跷板”的坐标或者选择一个合理的中心点。例如假设搜索区域是“跷跷板”所在的位置那么中心坐标为(15, -8.5, 1.2)。或者可能需要将搜索区域设置为多个地点的组合,但用户没有说明,所以可能需要选择一个中心点。
接下来考虑使用search_pattern来搜索因为目标位置未知。参数中需要指定pattern_type比如spiral或grid。假设选择spiral模式覆盖更大的区域。中心点可能选在某个已知地点比如“跷跷板”的坐标或者综合多个点。但用户没有明确所以可能需要选择一个合理的中心点比如“跷跷板”的坐标。
然后检测到目标后需要跟踪30秒再打击。所以流程是起飞→飞往搜索区→搜索→检测→跟踪→打击→降落。
安全监控必须包含在Parallel节点中监控电池和GPS并在紧急情况下返航。
现在,构建行为树的结构:
根节点是Parallel包含主任务和安全监控。主任务是Sequence包含起飞、飞往搜索区、搜索、检测、跟踪、打击、降落。安全监控是Selector包含电池和GPS条件以及紧急处理Sequence。
在搜索部分使用search_pattern参数包括pattern_type为spiral中心点选在某个已知地点比如“跷跷板”的坐标(15, -8.5, 1.2)半径可能设为50米覆盖较大区域。目标类别是balloon描述为“红色气球危险性高于蓝色气球高于绿色气球”但参数中description需要描述目标属性比如“红>蓝>绿”。
检测到目标后使用track_object参数target_class为balloondescription为“红>蓝>绿”track_time为30秒。然后strike_target最后评估战损。
需要确保所有参数符合约束比如z坐标在1-5000米之间半径在5-1000米之间速度等参数在合理范围。
现在将各个步骤的参数填入JSON中。例如起飞到2米飞往搜索区的中心点可能选在“跷跷板”的坐标(15, -8.5, 1.2)但z需要调整到至少1米这里已经是1.2。所以fly_to_waypoint的参数x=15y=-8.5z=1.2接受半径2米。
搜索使用search_patternpattern_type为spiral中心点为(15, -8.5, 1.2)半径设为50米覆盖区域。目标类别是balloondescription为“红>蓝>绿”。
检测到目标后使用track_object参数target_class为balloondescription为“红>蓝>绿”track_time为30秒min_confidence设为0.7safe_distance设为15米。
然后strike_target参数target_class为balloondescription为“红>蓝>绿”count为1。
最后,降落。
安全监控部分电池阈值设为0.35GPS的min_satellites设为8紧急处理Sequence包括emergency_return和land。
现在将所有步骤的参数填入JSON中确保符合所有规则。

Binary file not shown.

Before

Width:  |  Height:  |  Size: 200 KiB

After

Width:  |  Height:  |  Size: 233 KiB

View File

@@ -1,11 +1,10 @@
你是一个无人机简单指令执行规划器。你的任务:当用户给出“简单指令”(单一原子动作即可完成)时,输出一个严格的JSON对象。
你是一个无人机简单指令执行规划器。你的任务输出一个严格的JSON对象。
输出要求(必须遵守):
- 只输出一个JSON对象不要任何解释或多余文本。
- JSON结构
{"mode":"simple","action":{"name":"<action_name>","params":{...}}}
- <action_name> 与参数定义、取值范围必须与“复杂模式”提示词system_prompt.txt中的定义完全一致
- 简单模式下不包含任何行为树结构与安全监控并行,仅输出单一原子动作。
- 不包含任何行为树结构与安全监控并行,仅输出单一原子动作
示例:
- “起飞到10米” → {"mode":"simple","action":{"name":"takeoff","params":{"altitude":10.0}}}
@@ -21,13 +20,13 @@
{"name": "fly_to_waypoint", "description": "导航至一个指定坐标点。使用相对坐标系x,y,z单位为米。", "params": {"x": "float", "y": "float", "z": "float", "acceptance_radius": "float, 可选默认2.0"}},
{"name": "move_direction", "description": "按指定方向直线移动。方向可为绝对方位或相对机体朝向。", "params": {"direction": "string: north|south|east|west|forward|backward|left|right", "distance": "float[1,10000], 可选, 不指定则持续移动"}},
{"name": "orbit_around_point", "description": "以给定中心点为中心,等速圆周飞行指定圈数。", "params": {"center_x": "float", "center_y": "float", "center_z": "float", "radius": "float[5,1000]", "laps": "int[1,20]", "clockwise": "boolean, 可选, 默认true", "speed_mps": "float[0.5,15], 可选", "gimbal_lock": "boolean, 可选, 默认true"}},
{"name": "orbit_around_target", "description": "以目标为中心,等速圆周飞行指定圈数(需已有目标)。", "params": {"target_class": "string, 见复杂模式定义列表", "description": "string, 可选", "radius": "float[5,1000]", "laps": "int[1,20]", "clockwise": "boolean, 可选, 默认true", "speed_mps": "float[0.5,15], 可选", "gimbal_lock": "boolean, 可选, 默认true"}},
{"name": "orbit_around_target", "description": "以目标为中心,等速圆周飞行指定圈数(需已有目标)。", "params": {"target_class": "string, 取值同object_detect列表", "description": "string, 可选", "radius": "float[5,1000]", "laps": "int[1,20]", "clockwise": "boolean, 可选, 默认true", "speed_mps": "float[0.5,15], 可选", "gimbal_lock": "boolean, 可选, 默认true"}},
{"name": "loiter", "description": "在当前位置上空悬停一段时间或直到条件触发。", "params": {"duration": "float, 可选[1,600]", "until_condition": "string, 可选"}},
{"name": "object_detect", "description": "识别特定目标对象。", "params": {"target_class": "string, 见复杂模式定义列表", "description": "string, 可选", "count": "int, 可选, 默认1"}},
{"name": "object_detect", "description": "识别特定目标对象。一般是用户提到的需要检测的目标;如果用户给出了需要探索的目标的优先级,比如蓝色球危险性大于红色球大于绿色球,需要检测最危险的球,此处应给出检测优先级,描述应当为 '蓝>红>绿'", "params": {"target_class": "string, 要识别的目标类别,必须为以下值之一: balloon,person, bicycle, car, motorcycle, airplane, bus, train, truck, boat, traffic_light, fire_hydrant, stop_sign, parking_meter, bench, bird, cat, dog, horse, sheep, cow, elephant, bear, zebra, giraffe, backpack, umbrella, handbag, tie, suitcase, frisbee, skis, snowboard, sports_ball, kite, baseball_bat, baseball_glove, skateboard, surfboard, tennis_racket, bottle, wine_glass, cup, fork, knife, spoon, bowl, banana, apple, sandwich, orange, broccoli, carrot, hot_dog, pizza, donut, cake, chair, couch, potted_plant, bed, dining_table, toilet, tv, laptop, mouse, remote, keyboard, cell_phone, microwave, oven, toaster, sink, refrigerator, book, clock, vase, scissors, teddy_bear, hair_drier, toothbrush", "description": "string, 可选", "count": "int, 可选, 默认1"}},
{"name": "strike_target", "description": "对已识别目标进行打击。", "params": {"target_class": "string", "description": "string, 可选", "count": "int, 可选, 默认1"}},
{"name": "battle_damage_assessment", "description": "战损评估。", "params": {"target_class": "string", "assessment_time": "float[5-60], 默认15.0"}},
{"name": "search_pattern", "description": "按模式搜索。", "params": {"pattern_type": "string: spiral|grid", "center_x": "float", "center_y": "float", "center_z": "float", "radius": "float[5,1000]", "target_class": "string", "description": "string, 可选", "count": "int, 可选, 默认1"}},
{"name": "track_object", "description": "持续跟踪目标。", "params": {"target_class": "string, 见复杂模式定义列表", "description": "string, 可选", "track_time": "float[1,600], 默认30.0", "min_confidence": "float[0.5-1.0], 默认0.7", "safe_distance": "float[2-50], 默认10.0"}},
{"name": "track_object", "description": "持续跟踪目标。", "params": {"target_class": "string, 取值同object_detect列表", "description": "string, 可选", "track_time": "float[1,600], 默认30.0", "min_confidence": "float[0.5-1.0], 默认0.7", "safe_distance": "float[2-50], 默认10.0"}},
{"name": "deliver_payload", "description": "投放物资。", "params": {"payload_type": "string", "release_altitude": "float[2,100], 默认5.0"}},
{"name": "preflight_checks", "description": "飞行前系统自检。", "params": {"check_level": "string: basic|comprehensive"}},
{"name": "emergency_return", "description": "执行紧急返航程序。", "params": {"reason": "string"}}
@@ -53,7 +52,6 @@
- orbit_around_target.radius: [5, 1000]
- orbit_around_point/target.laps: [1, 20]
- orbit_around_point/target.speed_mps: [0.5, 15]
- 电池阈值等同复杂模式(如需涉及)
- 若参考知识提供坐标,必须使用并裁剪到约束范围内
—— 口令转化规则(环绕类)——

View File

@@ -85,8 +85,8 @@
"name": "object_detect",
"description": "在当前视野范围内识别特定目标对象。适用于定点检测,无人机应在目标大致位置悬停或保持稳定姿态。",
"params": {
"target_class": "string, 要识别的目标类别,必须为以下值之一: person, bicycle, car, motorcycle, airplane, bus, train, truck, boat, traffic_light, fire_hydrant, stop_sign, parking_meter, bench, bird, cat, dog, horse, sheep, cow, elephant, bear, zebra, giraffe, backpack, umbrella, handbag, tie, suitcase, frisbee, skis, snowboard, sports_ball, kite, baseball_bat, baseball_glove, skateboard, surfboard, tennis_racket, bottle, wine_glass, cup, fork, knife, spoon, bowl, banana, apple, sandwich, orange, broccoli, carrot, hot_dog, pizza, donut, cake, chair, couch, potted_plant, bed, dining_table, toilet, tv, laptop, mouse, remote, keyboard, cell_phone, microwave, oven, toaster, sink, refrigerator, book, clock, vase, scissors, teddy_bear, hair_drier, toothbrush",
"description": "string, 可选,目标属性描述(如颜色、状态等)",
"target_class": "string, 要识别的目标类别,必须为以下值之一: balloon,person, bicycle, car, motorcycle, airplane, bus, train, truck, boat, traffic_light, fire_hydrant, stop_sign, parking_meter, bench, bird, cat, dog, horse, sheep, cow, elephant, bear, zebra, giraffe, backpack, umbrella, handbag, tie, suitcase, frisbee, skis, snowboard, sports_ball, kite, baseball_bat, baseball_glove, skateboard, surfboard, tennis_racket, bottle, wine_glass, cup, fork, knife, spoon, bowl, banana, apple, sandwich, orange, broccoli, carrot, hot_dog, pizza, donut, cake, chair, couch, potted_plant, bed, dining_table, toilet, tv, laptop, mouse, remote, keyboard, cell_phone, microwave, oven, toaster, sink, refrigerator, book, clock, vase, scissors, teddy_bear, hair_drier, toothbrush",
"description": "string, 可选,目标属性描述(如颜色、状态等),一般是用户提到的需要检测的目标;如果用户给出了需要探索的目标的优先级,比如蓝色球危险性大于红色球大于绿色球,需要检测最危险的球,此处应给出检测优先级,描述应当为 '蓝>红>绿'",
"count": "int, 可选需要检测的目标个数默认1"
}
},

View File

@@ -201,7 +201,7 @@ def _generate_pytree_schema(allowed_actions: set, allowed_conditions: set) -> di
"sandwich", "orange", "broccoli", "carrot", "hot_dog", "pizza", "donut", "cake", "chair",
"couch", "potted_plant", "bed", "dining_table", "toilet", "tv", "laptop", "mouse", "remote",
"keyboard", "cell_phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book",
"clock", "vase", "scissors", "teddy_bear", "hair_drier", "toothbrush"
"clock", "vase", "scissors", "teddy_bear", "hair_drier", "toothbrush","balloon"
]
# 递归节点定义
@@ -548,6 +548,16 @@ class PyTreeGenerator:
# Updated output directory for visualizations
self.vis_dir = os.path.abspath(os.path.join(self.base_dir, '..', 'generated_visualizations'))
os.makedirs(self.vis_dir, exist_ok=True)
# Reasoning content output directory (Markdown files)
self.reasoning_dir = os.path.abspath(os.path.join(self.base_dir, '..', 'generated_reasoning_content'))
os.makedirs(self.reasoning_dir, exist_ok=True)
# 控制是否允许模型返回含 <think> 的原文不强制JSON以便提取推理链
self.enable_reasoning_capture = os.getenv("ENABLE_REASONING_CAPTURE", "true").lower() in ("1", "true", "yes")
# 终端预览的最大行数
try:
self.reasoning_preview_lines = int(os.getenv("REASONING_PREVIEW_LINES", "20"))
except Exception:
self.reasoning_preview_lines = 20
# 加载提示词:复杂模式复用现有 system_prompt.txt简单模式与分类器独立提示词
self.complex_prompt = self._load_prompt("system_prompt.txt")
self.simple_prompt = self._load_prompt("simple_mode_prompt.txt")
@@ -564,6 +574,10 @@ class PyTreeGenerator:
self.simple_base_url = os.getenv("SIMPLE_BASE_URL", f"http://{self.orin_ip}:8081/v1")
self.complex_base_url = os.getenv("COMPLEX_BASE_URL", f"http://{self.orin_ip}:8081/v1")
self.api_key = os.getenv("OPENAI_API_KEY", "sk-no-key-required")
# 直接在代码中指定最大输出token数不通过环境变量
self.classifier_max_tokens = 512
self.simple_max_tokens = 8192
self.complex_max_tokens = 8192
# 为不同用途分别创建客户端
self.classifier_client = openai.OpenAI(api_key=self.api_key, base_url=self.classifier_base_url)
@@ -626,7 +640,8 @@ class PyTreeGenerator:
{"role": "user", "content": user_prompt}
],
temperature=0.0,
response_format={"type": "json_object"}
response_format={"type": "json_object"},
max_tokens=self.classifier_max_tokens
)
class_str = classifier_resp.choices[0].message.content
class_obj = json.loads(class_str)
@@ -661,21 +676,88 @@ class PyTreeGenerator:
# 简单/复杂分流到不同模型与提示词
client = self.simple_llm_client if mode == "simple" else self.complex_llm_client
model_name = self.simple_model if mode == "simple" else self.complex_model
response = client.chat.completions.create(
model=model_name,
messages=[
# 根据是否捕获推理链来决定是否强制JSON响应
response_kwargs = {
"model": model_name,
"messages": [
{"role": "system", "content": use_prompt},
{"role": "user", "content": final_user_prompt}
],
temperature=0.1 if mode == "complex" else 0.0,
response_format={"type": "json_object"}
)
pytree_str = response.choices[0].message.content
"temperature": 0.1 if mode == "complex" else 0.0,
}
if not self.enable_reasoning_capture:
response_kwargs["response_format"] = {"type": "json_object"}
# 基于模式设定最大输出token数直接在代码中配置
response_kwargs["max_tokens"] = self.simple_max_tokens if mode == "simple" else self.complex_max_tokens
response = client.chat.completions.create(**response_kwargs)
# 兼容可能存在的 reasoning_content 字段
try:
msg = response.choices[0].message
msg_content = getattr(msg, "content", None)
msg_reasoning = getattr(msg, "reasoning_content", None)
except Exception:
msg = response.choices[0]["message"] if isinstance(response.choices[0], dict) else None
msg_content = (msg or {}).get("content") if isinstance(msg, dict) else None
msg_reasoning = (msg or {}).get("reasoning_content") if isinstance(msg, dict) else None
combined_text = ""
if isinstance(msg_reasoning, str) and msg_reasoning.strip():
# 将 reasoning_content 包装为 <think>,便于统一解析
combined_text += f"<think>\n{msg_reasoning}\n</think>\n"
if isinstance(msg_content, str) and msg_content.strip():
combined_text += msg_content
pytree_str = combined_text if combined_text else (msg_content or "")
raw_full_text_for_logging = pytree_str # 保存完整原文(含 <think>)以便失败时完整打印
# 提取 <think> 推理链内容(若存在)
reasoning_text = None
try:
think_match = re.search(r"<think>([\s\S]*?)</think>", pytree_str)
if think_match:
reasoning_text = think_match.group(1).strip()
# 去除推理文本后再尝试解析JSON
pytree_str = re.sub(r"<think>[\s\S]*?</think>", "", pytree_str).strip()
except Exception:
reasoning_text = None
# 单独捕获JSON解析错误并打印原始响应
try:
pytree_dict = json.loads(pytree_str)
except json.JSONDecodeError as e:
logging.error(f"❌ JSON解析失败{attempt + 1}/3 次)。原始响应如下:\n{pytree_str}")
logging.error(f"❌ JSON解析失败{attempt + 1}/3 次)。\n—— 完整原始文本(含<think>) ——\n{raw_full_text_for_logging}")
# 尝试打印响应对象的完整结构
try:
raw_response_dump = None
if hasattr(response, 'model_dump_json'):
raw_response_dump = response.model_dump_json(indent=2, exclude_none=False)
elif hasattr(response, 'dict'):
raw_response_dump = json.dumps(response.dict(), ensure_ascii=False, indent=2, default=str)
else:
# 兜底尝试将choices与关键字段展开
safe_obj = {
"id": getattr(response, 'id', None),
"model": getattr(response, 'model', None),
"object": getattr(response, 'object', None),
"usage": getattr(response, 'usage', None),
"choices": [
{
"index": getattr(c, 'index', None),
"finish_reason": getattr(c, 'finish_reason', None),
"message": {
"role": getattr(getattr(c, 'message', None), 'role', None),
"content": getattr(getattr(c, 'message', None), 'content', None),
"reasoning_content": getattr(getattr(c, 'message', None), 'reasoning_content', None)
} if getattr(c, 'message', None) is not None else None
}
for c in getattr(response, 'choices', [])
] if hasattr(response, 'choices') else None
}
raw_response_dump = json.dumps(safe_obj, ensure_ascii=False, indent=2, default=str)
logging.error(f"—— 完整响应对象 ——\n{raw_response_dump}")
except Exception as dump_e:
try:
logging.error(f"响应对象转储失败repr如下\n{repr(response)}")
except Exception:
pass
continue
# 简单/复杂分别验证与返回
@@ -702,6 +784,25 @@ class PyTreeGenerator:
pytree_dict['visualization_url'] = f"/static/{vis_filename}"
except Exception as e:
logging.warning(f"简单模式可视化失败: {e}")
# 保存推理链(若有)
try:
if reasoning_text:
reasoning_path = os.path.join(self.reasoning_dir, "reasoning_content.md")
with open(reasoning_path, 'w', encoding='utf-8') as rf:
rf.write(reasoning_text)
logging.info(f"📝 推理链已保存: {reasoning_path}")
# 终端预览最多N行
try:
lines = reasoning_text.splitlines()
preview = "\n".join(lines[: self.reasoning_preview_lines])
logging.info("🧠 推理链预览(前%d行)\n%s", self.reasoning_preview_lines, preview)
except Exception:
pass
else:
logging.info("未在模型输出中发现 <think> 推理链片段。若需捕获,请设置 ENABLE_REASONING_CAPTURE=true 以放宽JSON强制格式。")
except Exception as e:
logging.warning(f"保存推理链Markdown失败: {e}")
return pytree_dict
# 复杂模式回退:若模型误返回简单结构,则自动包装为含安全监控的行为树
@@ -754,6 +855,25 @@ class PyTreeGenerator:
vis_path = os.path.join(self.vis_dir, vis_filename)
_visualize_pytree(pytree_dict['root'], os.path.splitext(vis_path)[0])
pytree_dict['visualization_url'] = f"/static/{vis_filename}"
# 保存推理链(若有)
try:
if reasoning_text:
reasoning_path = os.path.join(self.reasoning_dir, "reasoning_content.md")
with open(reasoning_path, 'w', encoding='utf-8') as rf:
rf.write(reasoning_text)
logging.info(f"📝 推理链已保存: {reasoning_path}")
# 终端预览最多N行
try:
lines = reasoning_text.splitlines()
preview = "\n".join(lines[: self.reasoning_preview_lines])
logging.info("🧠 推理链预览(前%d行)\n%s", self.reasoning_preview_lines, preview)
except Exception:
pass
else:
logging.info("未在模型输出中发现 <think> 推理链片段。若需捕获,请设置 ENABLE_REASONING_CAPTURE=true 以放宽JSON强制格式。")
except Exception as e:
logging.warning(f"保存推理链Markdown失败: {e}")
return pytree_dict
else:
# 打印未通过验证的Pytree以便排查

View File

@@ -12,7 +12,7 @@ BASE_URL = "http://127.0.0.1:8000"
ENDPOINT = "/generate_plan"
# The user prompt we will send for the test
TEST_PROMPT = "飞到学生宿舍"
TEST_PROMPT = "已知目标检测红色气球危险性高于蓝色气球高于绿色气球飞往搜索区搜索并锁定危险性最高的气球对其跟踪30秒后进行打击操作"
def test_generate_plan():
"""

174
tools/test_llama_server.py Normal file
View File

@@ -0,0 +1,174 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import json
import argparse
from typing import Any, Dict
import requests
def build_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="调用本地 llama-server (OpenAI兼容) 进行推理,支持自定义系统/用户提示词"
)
parser.add_argument(
"--base-url",
default=os.getenv("SIMPLE_BASE_URL", "http://127.0.0.1:8081/v1"),
help="llama-server 的基础URL默认: http://127.0.0.1:8081/v1或环境变量 SIMPLE_BASE_URL",
)
parser.add_argument(
"--model",
default=os.getenv("SIMPLE_MODEL", "local-model"),
help="模型名称(默认: local-model或环境变量 SIMPLE_MODEL",
)
parser.add_argument(
"--system",
default="You are a helpful assistant.",
help="系统提示词system role",
)
parser.add_argument(
"--system-file",
default=None,
help="系统提示词文件路径txt若提供则覆盖 --system 的字符串",
)
parser.add_argument(
"--user",
default=None,
help="用户提示词user role若不传则从交互式输入读取",
)
parser.add_argument(
"--temperature",
type=float,
default=0.2,
help="采样温度(默认: 0.2",
)
parser.add_argument(
"--max-tokens",
type=int,
default=4096,
help="最大生成Token数默认: 4096",
)
parser.add_argument(
"--timeout",
type=float,
default=120.0,
help="HTTP超时时间秒默认: 120",
)
parser.add_argument(
"--verbose",
action="store_true",
help="打印完整返回JSON",
)
return parser.parse_args()
def call_llama_server(
base_url: str,
model: str,
system_prompt: str,
user_prompt: str,
temperature: float,
max_tokens: int,
timeout: float,
) -> Dict[str, Any]:
endpoint = base_url.rstrip("/") + "/chat/completions"
headers: Dict[str, str] = {"Content-Type": "application/json"}
# 兼容需要API Key的代理/服务llama-server通常不强制
api_key = os.getenv("OPENAI_API_KEY")
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
payload: Dict[str, Any] = {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": temperature,
"max_tokens": max_tokens,
}
resp = requests.post(endpoint, headers=headers, data=json.dumps(payload), timeout=timeout)
resp.raise_for_status()
return resp.json()
def main() -> None:
args = build_args()
user_prompt = args.user
if not user_prompt:
try:
user_prompt = input("请输入用户提示词: ")
except KeyboardInterrupt:
print("\n已取消。")
sys.exit(1)
# 解析系统提示词:优先使用 --system-file
system_prompt = args.system
if args.system_file:
try:
with open(args.system_file, "r", encoding="utf-8") as f:
system_prompt = f.read()
except Exception as e:
print("\n❌ 读取系统提示词文件失败:")
print(str(e))
sys.exit(1)
try:
print("--- llama-server 推理 ---")
print(f"Base URL: {args.base_url}")
print(f"Model: {args.model}")
if args.system_file:
print(f"System(from file): {args.system_file}")
else:
print(f"System: {system_prompt}")
print(f"User: {user_prompt}")
data = call_llama_server(
base_url=args.base_url,
model=args.model,
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=args.temperature,
max_tokens=args.max_tokens,
timeout=args.timeout,
)
if args.verbose:
print("\n完整返回JSON:")
print(json.dumps(data, ensure_ascii=False, indent=2))
# 尝试按OpenAI兼容格式提取assistant内容
content = None
try:
content = data["choices"][0]["message"]["content"]
except Exception:
pass
if content is not None:
print("\n模型输出:")
print(content)
else:
# 兜底打印
print("\n无法按OpenAI兼容字段解析内容原始返回如下")
print(json.dumps(data, ensure_ascii=False))
except requests.exceptions.RequestException as e:
print("\n❌ 请求失败:请确认 llama-server 已在 8081 端口启动并可访问。")
print(f"详情: {e}")
sys.exit(2)
except Exception as e:
print("\n❌ 发生未预期的错误:")
print(str(e))
sys.exit(3)
if __name__ == "__main__":
main()

Binary file not shown.