Files
DronePlanning/tools/test_validate/modules/visualizer.py

287 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
从API测试日志中提取JSON响应并批量可视化
"""
import json
import os
import re
import logging
import platform
import random
import html
from typing import Dict, List, Tuple
from collections import defaultdict
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def sanitize_filename(text: str) -> str:
"""将文本转换为安全的文件名"""
# 移除或替换不安全的字符
text = re.sub(r'[<>:"/\\|?*]', '_', text)
# 限制长度
if len(text) > 100:
text = text[:100]
return text
def _pick_zh_font():
"""选择合适的中文字体"""
sys = platform.system()
if sys == "Windows":
return "Microsoft YaHei"
elif sys == "Darwin":
return "PingFang SC"
else:
return "Noto Sans CJK SC"
def _add_nodes_and_edges(node: dict, dot, parent_id: str | None = None) -> str:
"""递归辅助函数,用于添加节点和边。"""
try:
from graphviz import Digraph
except ImportError:
logging.critical("错误未安装graphviz库。请运行: pip install graphviz")
return ""
current_id = f"{id(node)}_{random.randint(1000, 9999)}"
# 准备节点标签HTML-like正确换行与转义
name = html.escape(str(node.get('name', '')))
ntype = html.escape(str(node.get('type', '')))
label_parts = [f"<B>{name}</B> <FONT POINT-SIZE='10'><I>({ntype})</I></FONT>"]
# 格式化参数显示
params = node.get('params') or {}
if params:
params_lines = []
for key, value in params.items():
k = html.escape(str(key))
if isinstance(value, float):
value_str = f"{value:.2f}".rstrip('0').rstrip('.')
else:
value_str = str(value)
v = html.escape(value_str)
params_lines.append(f"{k}: {v}")
params_text = "<BR ALIGN='LEFT'/>".join(params_lines)
label_parts.append(f"<FONT POINT-SIZE='9' COLOR='#555555'>{params_text}</FONT>")
node_label = f"<{'<BR/>'.join(label_parts)}>"
# 根据类型设置节点样式和颜色
node_type = (node.get('type') or '').lower()
shape = 'ellipse'
style = 'filled'
fillcolor = '#e6e6e6' # 默认灰色填充
border_color = '#666666' # 默认描边色
if node_type == 'action':
shape = 'box'
style = 'rounded,filled'
fillcolor = "#cde4ff" # 浅蓝
elif node_type == 'condition':
shape = 'diamond'
style = 'filled'
fillcolor = "#fff2cc" # 浅黄
elif node_type == 'sequence':
shape = 'ellipse'
style = 'filled'
fillcolor = '#d5e8d4' # 绿色
elif node_type == 'selector':
shape = 'ellipse'
style = 'filled'
fillcolor = '#ffe6cc' # 橙色
elif node_type == 'parallel':
shape = 'ellipse'
style = 'filled'
fillcolor = '#e1d5e7' # 紫色
# 特别标记安全相关节点
if node.get('name') in ['battery_above', 'gps_status', 'SafetyMonitor']:
border_color = '#ff0000' # 红色边框突出显示安全节点
style = 'filled,bold' # 加粗
dot.node(current_id, label=node_label, shape=shape, style=style, fillcolor=fillcolor, color=border_color)
# 连接父节点
if parent_id:
dot.edge(parent_id, current_id)
# 递归处理子节点
children = node.get("children", [])
if not children:
return current_id
# 记录所有子节点的ID
child_ids = []
# 正确的递归连接:每个子节点都连接到当前节点
for child in children:
child_id = _add_nodes_and_edges(child, dot, current_id)
child_ids.append(child_id)
# 子节点同级排列(横向排布,更直观地表现同层)
if len(child_ids) > 1:
with dot.subgraph(name=f"rank_{current_id}") as s:
s.attr(rank='same')
for cid in child_ids:
s.node(cid)
return current_id
def generate_visualization(node: Dict, file_path: str):
"""
使用Graphviz将Pytree字典可视化并保存到指定路径。
"""
try:
from graphviz import Digraph
except ImportError:
logging.critical("错误未安装graphviz库。请运行: pip install graphviz")
return False
fontname = _pick_zh_font()
dot = Digraph('Pytree', comment='Drone Mission Plan')
dot.attr(rankdir='TB', label='Drone Mission Plan', fontsize='20', fontname=fontname)
dot.attr('node', shape='box', style='rounded,filled', fontname=fontname)
dot.attr('edge', fontname=fontname)
_add_nodes_and_edges(node, dot)
try:
# 确保输出目录存在
base_path, ext = os.path.splitext(file_path)
render_path = base_path if ext.lower() == '.png' else file_path
out_dir = os.path.dirname(render_path)
if out_dir and not os.path.exists(out_dir):
os.makedirs(out_dir, exist_ok=True)
# 保存为 .png 文件,并自动删除源码 .gv 文件
dot.render(render_path, format='png', cleanup=True, view=False)
return True
except Exception as e:
logging.error(f"❌ 生成可视化图形失败: {e}")
return False
# 保留旧的函数以兼容(如果有其他脚本引用)
def _visualize_pytree(node: Dict, file_path: str):
return generate_visualization(node, file_path)
def parse_log_file(log_file_path: str) -> Dict[str, List[Dict]]:
"""
解析日志文件提取原始指令和完整API响应JSON
返回: {原始指令: [JSON响应列表]}
"""
with open(log_file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 按分隔符分割条目
entries = re.split(r'={80,}', content)
results = defaultdict(list)
for entry in entries:
if not entry.strip():
continue
# 提取原始指令
instruction_match = re.search(r'原始指令:\s*(.+)', entry)
if not instruction_match:
continue
original_instruction = instruction_match.group(1).strip()
# 提取完整API响应JSON
json_match = re.search(r'完整API响应:\s*\n(\{.*\})', entry, re.DOTALL)
if not json_match:
logging.warning(f"未找到指令 '{original_instruction}' 的JSON响应")
continue
json_str = json_match.group(1).strip()
try:
json_obj = json.loads(json_str)
results[original_instruction].append(json_obj)
logging.info(f"成功提取指令 '{original_instruction}' 的JSON响应")
except json.JSONDecodeError as e:
logging.error(f"解析指令 '{original_instruction}' 的JSON失败: {e}")
continue
return results
def process_and_visualize(log_file_path: str, output_dir: str):
"""
处理日志文件并批量可视化
"""
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 解析日志文件
logging.info(f"开始解析日志文件: {log_file_path}")
instruction_responses = parse_log_file(log_file_path)
logging.info(f"共找到 {len(instruction_responses)} 个不同的原始指令")
# 处理每个指令的所有响应
for instruction, responses in instruction_responses.items():
logging.info(f"\n处理指令: {instruction} (共 {len(responses)} 个响应)")
# 创建指令目录(使用安全的文件名)
safe_instruction_name = sanitize_filename(instruction)
instruction_dir = os.path.join(output_dir, safe_instruction_name)
os.makedirs(instruction_dir, exist_ok=True)
# 处理每个响应
for idx, response in enumerate(responses, 1):
try:
# 提取root节点
root_node = response.get('root')
if not root_node:
logging.warning(f"响应 #{idx} 没有root节点跳过")
continue
# 生成文件名
json_filename = f"response_{idx}.json"
png_filename = f"response_{idx}.png"
json_path = os.path.join(instruction_dir, json_filename)
png_path = os.path.join(instruction_dir, png_filename)
# 保存JSON文件
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(response, f, ensure_ascii=False, indent=2)
logging.info(f" 保存JSON: {json_filename}")
# 生成可视化
generate_visualization(root_node, png_path)
logging.info(f" 生成可视化: {png_filename}")
except Exception as e:
logging.error(f"处理响应 #{idx} 时出错: {e}")
continue
logging.info(f"\n✅ 所有处理完成!结果保存在: {output_dir}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="批量可视化API测试日志")
parser.add_argument("--log", default=os.path.join(os.path.dirname(os.path.dirname(__file__)), "api_test_log.txt"), help="日志文件路径")
parser.add_argument("--out", default=os.path.join(os.path.dirname(os.path.dirname(__file__)), "validation"), help="输出目录")
args = parser.parse_args()
log_file = args.log
output_directory = args.out
print(f"日志文件: {log_file}")
print(f"输出目录: {output_directory}")
if os.path.exists(log_file):
process_and_visualize(log_file, output_directory)
else:
print(f"错误: 找不到日志文件 {log_file}")