feat: 完善批量可视化测试验证逻辑(仅提交业务代码)

This commit is contained in:
lulijing
2025-12-09 13:20:39 +08:00
parent c08cdfb339
commit 60f156f5e6
129 changed files with 21380 additions and 1984 deletions

View File

@@ -0,0 +1,269 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
从API测试日志中提取JSON响应并批量可视化
"""
import json
import os
import re
import logging
import platform
import random
import html
from typing import Dict, List, Tuple
from collections import defaultdict
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def sanitize_filename(text: str) -> str:
"""将文本转换为安全的文件名"""
# 移除或替换不安全的字符
text = re.sub(r'[<>:"/\\|?*]', '_', text)
# 限制长度
if len(text) > 100:
text = text[:100]
return text
def _pick_zh_font():
"""选择合适的中文字体"""
sys = platform.system()
if sys == "Windows":
return "Microsoft YaHei"
elif sys == "Darwin":
return "PingFang SC"
else:
return "Noto Sans CJK SC"
def _add_nodes_and_edges(node: dict, dot, parent_id: str | None = None) -> str:
"""递归辅助函数,用于添加节点和边。"""
try:
from graphviz import Digraph
except ImportError:
logging.critical("错误未安装graphviz库。请运行: pip install graphviz")
return ""
current_id = f"{id(node)}_{random.randint(1000, 9999)}"
# 准备节点标签HTML-like正确换行与转义
name = html.escape(str(node.get('name', '')))
ntype = html.escape(str(node.get('type', '')))
label_parts = [f"<B>{name}</B> <FONT POINT-SIZE='10'><I>({ntype})</I></FONT>"]
# 格式化参数显示
params = node.get('params') or {}
if params:
params_lines = []
for key, value in params.items():
k = html.escape(str(key))
if isinstance(value, float):
value_str = f"{value:.2f}".rstrip('0').rstrip('.')
else:
value_str = str(value)
v = html.escape(value_str)
params_lines.append(f"{k}: {v}")
params_text = "<BR ALIGN='LEFT'/>".join(params_lines)
label_parts.append(f"<FONT POINT-SIZE='9' COLOR='#555555'>{params_text}</FONT>")
node_label = f"<{'<BR/>'.join(label_parts)}>"
# 根据类型设置节点样式和颜色
node_type = (node.get('type') or '').lower()
shape = 'ellipse'
style = 'filled'
fillcolor = '#e6e6e6' # 默认灰色填充
border_color = '#666666' # 默认描边色
if node_type == 'action':
shape = 'box'
style = 'rounded,filled'
fillcolor = "#cde4ff" # 浅蓝
elif node_type == 'condition':
shape = 'diamond'
style = 'filled'
fillcolor = "#fff2cc" # 浅黄
elif node_type == 'sequence':
shape = 'ellipse'
style = 'filled'
fillcolor = '#d5e8d4' # 绿色
elif node_type == 'selector':
shape = 'ellipse'
style = 'filled'
fillcolor = '#ffe6cc' # 橙色
elif node_type == 'parallel':
shape = 'ellipse'
style = 'filled'
fillcolor = '#e1d5e7' # 紫色
# 特别标记安全相关节点
if node.get('name') in ['battery_above', 'gps_status', 'SafetyMonitor']:
border_color = '#ff0000' # 红色边框突出显示安全节点
style = 'filled,bold' # 加粗
dot.node(current_id, label=node_label, shape=shape, style=style, fillcolor=fillcolor, color=border_color)
# 连接父节点
if parent_id:
dot.edge(parent_id, current_id)
# 递归处理子节点
children = node.get("children", [])
if not children:
return current_id
# 记录所有子节点的ID
child_ids = []
# 正确的递归连接:每个子节点都连接到当前节点
for child in children:
child_id = _add_nodes_and_edges(child, dot, current_id)
child_ids.append(child_id)
# 子节点同级排列(横向排布,更直观地表现同层)
if len(child_ids) > 1:
with dot.subgraph(name=f"rank_{current_id}") as s:
s.attr(rank='same')
for cid in child_ids:
s.node(cid)
return current_id
def _visualize_pytree(node: Dict, file_path: str):
"""
使用Graphviz将Pytree字典可视化并保存到指定路径。
"""
try:
from graphviz import Digraph
except ImportError:
logging.critical("错误未安装graphviz库。请运行: pip install graphviz")
return
fontname = _pick_zh_font()
dot = Digraph('Pytree', comment='Drone Mission Plan')
dot.attr(rankdir='TB', label='Drone Mission Plan', fontsize='20', fontname=fontname)
dot.attr('node', shape='box', style='rounded,filled', fontname=fontname)
dot.attr('edge', fontname=fontname)
_add_nodes_and_edges(node, dot)
try:
# 确保输出目录存在,并避免生成 .png.png
base_path, ext = os.path.splitext(file_path)
render_path = base_path if ext.lower() == '.png' else file_path
out_dir = os.path.dirname(render_path)
if out_dir and not os.path.exists(out_dir):
os.makedirs(out_dir, exist_ok=True)
# 保存为 .png 文件,并自动删除源码 .gv 文件
output_path = dot.render(render_path, format='png', cleanup=True, view=False)
logging.info(f"✅ 可视化成功: {output_path}")
except Exception as e:
logging.error(f"❌ 生成可视化图形失败: {e}")
def parse_log_file(log_file_path: str) -> Dict[str, List[Dict]]:
"""
解析日志文件提取原始指令和完整API响应JSON
返回: {原始指令: [JSON响应列表]}
"""
with open(log_file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 按分隔符分割条目
entries = re.split(r'={80,}', content)
results = defaultdict(list)
for entry in entries:
if not entry.strip():
continue
# 提取原始指令
instruction_match = re.search(r'原始指令:\s*(.+)', entry)
if not instruction_match:
continue
original_instruction = instruction_match.group(1).strip()
# 提取完整API响应JSON
json_match = re.search(r'完整API响应:\s*\n(\{.*\})', entry, re.DOTALL)
if not json_match:
logging.warning(f"未找到指令 '{original_instruction}' 的JSON响应")
continue
json_str = json_match.group(1).strip()
try:
json_obj = json.loads(json_str)
results[original_instruction].append(json_obj)
logging.info(f"成功提取指令 '{original_instruction}' 的JSON响应")
except json.JSONDecodeError as e:
logging.error(f"解析指令 '{original_instruction}' 的JSON失败: {e}")
continue
return results
def process_and_visualize(log_file_path: str, output_dir: str):
"""
处理日志文件并批量可视化
"""
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 解析日志文件
logging.info(f"开始解析日志文件: {log_file_path}")
instruction_responses = parse_log_file(log_file_path)
logging.info(f"共找到 {len(instruction_responses)} 个不同的原始指令")
# 处理每个指令的所有响应
for instruction, responses in instruction_responses.items():
logging.info(f"\n处理指令: {instruction} (共 {len(responses)} 个响应)")
# 创建指令目录(使用安全的文件名)
safe_instruction_name = sanitize_filename(instruction)
instruction_dir = os.path.join(output_dir, safe_instruction_name)
os.makedirs(instruction_dir, exist_ok=True)
# 处理每个响应
for idx, response in enumerate(responses, 1):
try:
# 提取root节点
root_node = response.get('root')
if not root_node:
logging.warning(f"响应 #{idx} 没有root节点跳过")
continue
# 生成文件名
json_filename = f"response_{idx}.json"
png_filename = f"response_{idx}.png"
json_path = os.path.join(instruction_dir, json_filename)
png_path = os.path.join(instruction_dir, png_filename)
# 保存JSON文件
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(response, f, ensure_ascii=False, indent=2)
logging.info(f" 保存JSON: {json_filename}")
# 生成可视化
_visualize_pytree(root_node, png_path)
logging.info(f" 生成可视化: {png_filename}")
except Exception as e:
logging.error(f"处理响应 #{idx} 时出错: {e}")
continue
logging.info(f"\n✅ 所有处理完成!结果保存在: {output_dir}")
if __name__ == "__main__":
log_file = "/home/iscas/WorkSpace/code/DronePlanning/tools/test_validate/api_test_log.txt"
output_directory = "/home/iscas/WorkSpace/code/DronePlanning/tools/test_validate/validation"
process_and_visualize(log_file, output_directory)