新增测试方法,优化系统提示词

This commit is contained in:
2025-09-08 09:21:00 +08:00
parent 9bbfd9186e
commit 161389fa1e
5 changed files with 925 additions and 190 deletions

View File

@@ -12,7 +12,7 @@ BASE_URL = "http://127.0.0.1:8000"
ENDPOINT = "/generate_plan"
# The user prompt we will send for the test
TEST_PROMPT = "起飞后移动到跷跷板上方查找(搜索/检测)行人"
TEST_PROMPT = "起飞后移动到学生宿舍上方搜索蓝色车辆,并进行打击"
def test_generate_plan():
"""

View File

@@ -0,0 +1,13 @@
起飞后移动到学生宿舍上方降落
起飞后移动到学生宿舍上方查找蓝色的车
起飞后移动到学生宿舍上方寻找蓝色的车
起飞后移动到学生宿舍上方检测蓝色的车
飞到学生宿舍上方查找蓝色的车
飞到学生宿舍上方查找蓝色车辆并进行打击
起飞后移动到学生宿舍上方搜索蓝色车辆,并进行打击
起飞到学生宿舍上方搜索被困人员,并为被困人员投递救援物资
飞到学生宿舍上方搜索方圆10米范围内的蓝色车辆
飞到学生宿舍上方搜索半径为10米区域范围内的蓝色车辆
起飞到学生宿舍搜索有没有被困人员,然后抛洒救援物资

View File

@@ -0,0 +1,389 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
import json
import csv
import time
from datetime import datetime
import os
import re
# --- Configuration ---
BASE_URL = "http://127.0.0.1:8000"
ENDPOINT = "/generate_plan"
INSTRUCTIONS_FILE = "instructions.txt"
RESULTS_CSV = "test_results.csv"
SUMMARY_CSV = "test_summary.csv"
LOG_FILE = "api_test_log.txt"
# 测试参数
TESTS_PER_INSTRUCTION = 10
MAX_RETRIES = 3
RETRY_DELAY = 2
# 添加调试模式
DEBUG = True
def debug_print(message):
"""调试输出"""
if DEBUG:
print(f"🐛 DEBUG: {message}")
def check_safety_monitoring(node):
"""简化安全监控检查"""
has_battery = False
has_emergency = False
def check_node(current_node):
nonlocal has_battery, has_emergency
# 检查电池相关条件
if (current_node.get('type') == 'condition' and
'battery' in str(current_node.get('name', '')).lower()):
has_battery = True
# 检查紧急动作
if (current_node.get('type') == 'action' and
any(keyword in str(current_node.get('name', '')).lower()
for keyword in ['emergency', 'safe', 'land'])):
has_emergency = True
for child in current_node.get('children', []):
check_node(child)
check_node(node)
return has_battery or has_emergency # 放宽要求
def check_leaf_nodes(node, depth=0, max_depth=50):
"""检查节点结构"""
if depth > max_depth:
return True # 不因深度限制而失败
# 动作和条件节点不应该有子节点
if node.get('type') in ['action', 'condition']:
return 'children' not in node or not node['children']
# 控制流节点应该有子节点
if node.get('type') in ['Sequence', 'Selector', 'Parallel']:
if 'children' not in node or not node['children']:
return False
# 递归检查
for child in node.get('children', []):
if not check_leaf_nodes(child, depth + 1, max_depth):
return False
return True
def send_api_request(prompt, instruction_idx, run_number):
"""发送API请求并返回结果"""
url = BASE_URL + ENDPOINT
payload = {"user_prompt": prompt}
headers = {"Content-Type": "application/json"}
for attempt in range(MAX_RETRIES):
try:
debug_print(f"指令 {instruction_idx}-{run_number} 尝试 {attempt + 1}")
start_time = time.time()
response = requests.post(url, data=json.dumps(payload), headers=headers, timeout=60) # 增加超时
response_time = time.time() - start_time
# 首先检查HTTP状态
response.raise_for_status()
# 尝试解析JSON
try:
data = response.json()
except json.JSONDecodeError as e:
debug_print(f"JSON解析失败: {e}, 响应文本: {response.text[:200]}")
raise
root_node = data.get('root', {})
# 基本验证 - 放宽要求
validation_checks = {
"is_dict": isinstance(data, dict),
"has_root": "root" in data,
"root_has_children": bool(root_node.get('children')),
"has_plan_id": "plan_id" in data,
"has_visualization_url": "visualization_url" in data,
}
# 可选的高级验证
advanced_checks = {
"leaf_nodes_valid": check_leaf_nodes(root_node),
"has_safety": check_safety_monitoring(root_node)
}
# 合并验证结果
validation_checks.update(advanced_checks)
# 统计无效节点但不作为失败条件
invalid_actions = []
invalid_conditions = []
def collect_nodes(current_node):
if current_node.get('type') == 'action':
action_name = current_node.get('name', '')
if action_name not in ['deliver_payload', 'emergency_return', 'fly_to_waypoint',
'land', 'loiter', 'object_detect', 'preflight_checks',
'search_pattern', 'strike_target', 'battle_damage_assessment', 'takeoff']:
invalid_actions.append(action_name)
elif current_node.get('type') == 'condition':
condition_name = current_node.get('name', '')
if condition_name not in ['battery_above', 'at_waypoint', 'object_detected',
'target_destroyed', 'time_elapsed', 'gps_status']:
invalid_conditions.append(condition_name)
for child in current_node.get('children', []):
collect_nodes(child)
collect_nodes(root_node)
# 主要检查基本验证,高级验证作为警告
success = all(validation_checks[k] for k in ["is_dict", "has_root", "root_has_children",
"has_plan_id", "has_visualization_url"])
debug_print(f"验证结果: 成功={success}, 基本验证通过={all(validation_checks.values())}")
return {
"success": success,
"data": data,
"validation_checks": validation_checks,
"response_time": response_time,
"invalid_actions": invalid_actions,
"invalid_conditions": invalid_conditions,
"error": None,
"attempts": attempt + 1,
"http_status": response.status_code
}
except requests.exceptions.RequestException as e:
error_msg = f"请求失败: {e}"
debug_print(f"请求异常: {error_msg}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_DELAY)
continue
return {
"success": False,
"data": None,
"validation_checks": {},
"response_time": 0,
"invalid_actions": [],
"invalid_conditions": [],
"error": error_msg,
"attempts": attempt + 1,
"http_status": getattr(e.response, 'status_code', None) if hasattr(e, 'response') else None
}
except Exception as e:
error_msg = f"未知错误: {e}"
debug_print(f"未知错误: {error_msg}")
return {
"success": False,
"data": None,
"validation_checks": {},
"response_time": 0,
"invalid_actions": [],
"invalid_conditions": [],
"error": error_msg,
"attempts": attempt + 1,
"http_status": None
}
def read_instructions(filename):
"""读取指令列表"""
instructions = []
try:
with open(filename, 'r', encoding='utf-8') as file:
for line in file:
line = line.strip()
if line and not line.startswith('#'):
instructions.append(line)
return instructions
except Exception as e:
print(f"❌ 读取指令文件时出错: {e}")
return []
def write_log_entry(log_file, instruction_idx, run_number, prompt, result):
"""写入详细日志"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"\n{'='*80}\n")
f.write(f"指令 #{instruction_idx} - 运行 #{run_number} - {timestamp}\n")
f.write(f"HTTP状态: {result.get('http_status', 'N/A')}\n")
f.write(f"指令: {prompt}\n")
f.write(f"尝试次数: {result['attempts']}\n")
f.write(f"响应时间: {result['response_time']:.2f}\n")
f.write(f"结果: {'✅ 成功' if result['success'] else '❌ 失败'}\n")
if result['success']:
f.write("验证结果:\n")
for check_name, check_result in result['validation_checks'].items():
f.write(f" {check_name}: {'' if check_result else ''}\n")
if result['invalid_actions']:
f.write(f"⚠️ 无效动作节点: {result['invalid_actions']}\n")
if result['invalid_conditions']:
f.write(f"⚠️ 无效条件节点: {result['invalid_conditions']}\n")
else:
f.write(f"错误信息: {result['error']}\n")
def generate_summary_report(instructions, results_summary):
"""
生成统计摘要报告(修复除零错误)
"""
try:
with open(SUMMARY_CSV, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['instruction_index', 'instruction', 'total_runs', 'successful_runs',
'success_rate', 'avg_response_time', 'min_response_time',
'max_response_time', 'total_response_time']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for i, instruction in enumerate(instructions):
summary = results_summary[i]
success_count = summary['success_count']
# 防止除零错误
avg_time = "N/A"
min_time = "N/A"
max_time = "N/A"
if success_count > 0:
avg_time = f"{summary['total_response_time'] / success_count:.2f}s"
min_time = f"{summary['min_response_time']:.2f}s"
max_time = f"{summary['max_response_time']:.2f}s"
writer.writerow({
'instruction_index': i + 1,
'instruction': instruction,
'total_runs': TESTS_PER_INSTRUCTION,
'successful_runs': success_count,
'success_rate': f"{(success_count / TESTS_PER_INSTRUCTION * 100):.2f}%",
'avg_response_time': avg_time,
'min_response_time': min_time,
'max_response_time': max_time,
'total_response_time': f"{summary['total_response_time']:.2f}s"
})
print(f"📊 统计摘要已保存至: {SUMMARY_CSV}")
except Exception as e:
print(f"❌ 保存统计摘要时出错: {e}")
def main():
"""主测试函数"""
print("🚀 开始批量API测试")
print(f"每个指令测试 {TESTS_PER_INSTRUCTION}")
instructions = read_instructions(INSTRUCTIONS_FILE)
if not instructions:
return
print(f"找到 {len(instructions)} 条指令")
# 初始化统计
results_summary = [{
'success_count': 0,
'total_response_time': 0,
'min_response_time': float('inf'),
'max_response_time': 0,
'http_statuses': []
} for _ in instructions]
detailed_results = []
# 执行测试
for instruction_idx, prompt in enumerate(instructions, 1):
print(f"\n{'='*60}")
print(f"📋 测试指令 {instruction_idx}/{len(instructions)}")
print(f"指令: {prompt[:80]}{'...' if len(prompt) > 80 else ''}")
print(f"{'='*60}")
for run_number in range(1, TESTS_PER_INSTRUCTION + 1):
print(f" 运行 {run_number}/{TESTS_PER_INSTRUCTION}...", end=" ", flush=True)
result = send_api_request(prompt, instruction_idx, run_number)
write_log_entry(LOG_FILE, instruction_idx, run_number, prompt, result)
# 记录结果
detailed_result = {
"instruction_index": instruction_idx,
"instruction": prompt,
"run_number": run_number,
"success": result["success"],
"attempts": result["attempts"],
"response_time": result["response_time"],
"http_status": result.get("http_status"),
"error": result["error"] or "",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
detailed_results.append(detailed_result)
# 更新统计
idx = instruction_idx - 1
if result["success"]:
results_summary[idx]['success_count'] += 1
results_summary[idx]['total_response_time'] += result['response_time']
results_summary[idx]['min_response_time'] = min(
results_summary[idx]['min_response_time'], result['response_time']
)
results_summary[idx]['max_response_time'] = max(
results_summary[idx]['max_response_time'], result['response_time']
)
print(f"✅ 成功 ({result['response_time']:.1f}s)")
else:
print(f"❌ 失败 (HTTP: {result.get('http_status', 'N/A')})")
# 记录HTTP状态
if 'http_status' in result:
results_summary[idx]['http_statuses'].append(result['http_status'])
time.sleep(1) # 避免服务器过载
# 生成详细结果CSV
try:
with open(RESULTS_CSV, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['instruction_index', 'instruction', 'run_number', 'success',
'attempts', 'response_time', 'plan_id', 'error', 'timestamp']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for result in detailed_results:
writer.writerow(result)
print(f"\n📊 详细结果已保存至: {RESULTS_CSV}")
except Exception as e:
print(f"❌ 保存详细结果时出错: {e}")
# 生成统计摘要
generate_summary_report(instructions, results_summary)
# 打印最终统计
print(f"\n{'='*60}")
print("📈 最终测试统计")
print(f"{'='*60}")
print(f"总测试次数: {total_tests}")
print(f"成功次数: {total_successful}")
print(f"失败次数: {total_tests - total_successful}")
print(f"总成功率: {(total_successful / total_tests * 100):.2f}%")
# 打印每个指令的统计
print(f"\n📋 每个指令的统计:")
for i, (instruction, summary) in enumerate(zip(instructions, results_summary), 1):
success_rate = (summary['success_count'] / TESTS_PER_INSTRUCTION * 100)
avg_time = summary['total_response_time'] / summary['success_count'] if summary['success_count'] > 0 else 0
print(f" 指令 {i}: {success_rate:.1f}% 成功 ({summary['success_count']}/{TESTS_PER_INSTRUCTION}), "
f"平均时间: {avg_time:.2f}s")
print(f"\n📁 输出文件:")
print(f"详细日志: {LOG_FILE}")
print(f"详细结果: {RESULTS_CSV}")
print(f"统计摘要: {SUMMARY_CSV}")
if __name__ == "__main__":
main()