Files
DronePlanning/docs/地面站后端开发文档.md
2025-08-17 22:41:54 +08:00

141 lines
6.5 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 地面站后端开发文档
**版本**: 1.0
**负责人**: 后端开发团队
---
## 1. 概述与目标
本文档旨在为 **模块二:后端服务** 提供一份完整、清晰的开发与实现指南。后端服务是整个无人机自然语言控制系统的核心枢纽,其主要职责包括:
1. **接收前端指令**: 通过Web API接收来自QGC插件的自然语言指令。
2. **智能任务规划**: 集成并调用RAG检索增强生成系统和大型语言模型LLM将用户指令转化为结构化的无人机任务`py_tree.json`)。
3. **任务下发与监控**: 通过ROS2 Action将用户确认后的任务下发给无人机执行并实时接收无人机的执行状态。
4. **状态实时反馈**: 通过WebSocket将无人机的执行状态实时、透明地转发给前端QGC插件实现闭环监控。
本项目将整合现有零散的脚本和资源构建一个统一、健壮且可扩展的FastAPI应用程序。
## 2. 技术栈
- **Web框架**: FastAPI (用于构建高性能的HTTP和WebSocket接口)
- **ROS2通信**: `rclpy` (Python客户端库用于与无人机ROS2节点交互)
- **异步处理**: `asyncio` (Python标准库FastAPI和`rclpy`都基于此)
- **数据模型**: Pydantic (用于API请求/响应的数据验证)
- **RAG向量数据库**: ChromaDB (基于现有`vector_store`目录推断)
- **依赖管理**: `pip``requirements.txt`
## 3. 最终项目结构
为了实现模块化和高可维护性,项目将采用以下目录结构:
```
.
├── backend_service/
│ ├── src/
│ │ ├── __init__.py
│ │ ├── main.py # FastAPI应用主入口
│ │ ├── websocket_manager.py # WebSocket连接管理器
│ │ ├── py_tree_generator.py # RAG与LLM集成生成py_tree
│ │ ├── ros2_client.py # ROS2 Action客户端
│ │ ├── models.py # Pydantic数据模型
│ │ └── prompts/ # 存放LLM的prompt模板
│ │ ├── system_prompt.txt
│ │ └── ...
│ ├── knowledge_base/ # RAG知识库源文件
│ │ └── first.ndjson
│ ├── vector_store/ # ChromaDB向量数据库
│ │ └── ...
│ └── requirements.txt # Python依赖项
├── drone_interfaces/ # ROS2接口定义 (保持不变)
│ └── ...
└── (其他项目文件...)
```
## 4. 核心模块详解
### 4.1. FastAPI应用 (`main.py`)
这是整个服务的入口点负责初始化所有组件并定义API端点。
- **职责**:
- 创建FastAPI应用实例。
- 初始化`PyTreeGenerator`, `MissionActionClient`, 和 `ConnectionManager`的单例对象。
- 定义`POST /generate_plan`, `POST /execute_mission`, 和 `WS /ws/status`三个核心接口。
- 在后台启动ROS2节点并确保其与FastAPI的事件循环正确集成。
### 4.2. WebSocket管理器 (`websocket_manager.py`)
该模块专门处理与QGC插件的WebSocket连接。
- **类**: `ConnectionManager`
- **职责**:
- `connect(websocket: WebSocket)`: 接受并存储新的客户端连接。
- `disconnect(websocket: WebSocket)`: 移除断开的连接。
- `broadcast(message: str)`: 向所有已连接的客户端广播消息。此方法必须是**线程安全**的因为它将被ROS2回调运行在不同线程调用。
### 4.3. PyTree生成器 (`py_tree_generator.py`)
封装了所有与任务生成相关的复杂逻辑。
- **类**: `PyTreeGenerator`
- **职责**:
- 初始化RAG系统加载`vector_store`中的知识库。
- 提供一个核心方法 `generate(user_prompt: str) -> dict`
- **`generate`方法内部流程**:
1. 接收用户输入。
2. 使用RAG的检索器从`vector_store`中查找最相关的知识片段(例如,可用的无人机动作、地理位置信息等)。
3.`prompts/`目录加载系统Prompt模板。
4. 将用户输入、检索到的知识组合成一个完整的Prompt。
5. 调用外部或本地的LLM服务获取生成的`py_tree.json`字符串。
6. 验证返回的JSON是否符合`00_System_Overview_and_Interfaces.md`中定义的契约,确保其结构正确。
7. 返回验证通过的Python字典。
### 4.4. ROS2 Action客户端 (`ros2_client.py`)
负责与无人机(模块三)进行通信。
- **类**: `MissionActionClient` (继承自 `rclpy.node.Node`)
- **职责**:
- 初始化一个ROS2节点。
- 创建一个`ExecuteMission` Action的客户端。
- 提供一个核心方法 `send_goal(py_tree: dict)`
- **`send_goal`方法内部流程**:
1.`py_tree`字典序列化为JSON字符串。
2. 构建`ExecuteMission.Goal`消息。
3. 异步发送Action目标并注册`feedback_callback`
- **`feedback_callback`方法**:
1. 此回调由`rclpy`的执行器在单独的线程中触发。
2. 当收到无人机的状态反馈时,将`current_node_id``status`打包成一个字典。
3. 获取FastAPI的事件循环并使用`loop.call_soon_threadsafe()`安全地调用`websocket_manager.broadcast()`方法,将状态更新推送到前端。
### 4.5. 数据模型 (`models.py`)
使用Pydantic定义清晰、严格的API数据结构。
- **类**:
- `GeneratePlanRequest(BaseModel)`: 包含`user_prompt: str`
- `ExecuteMissionRequest(BaseModel)`: 包含`py_tree: dict`
- `StatusUpdate(BaseModel)`: 包含`node_id: str``status: int`
## 5. 数据流
### 5.1. 任务规划流程
`QGC Plugin` -> `POST /generate_plan` -> `main.py` -> `py_tree_generator.generate()` -> `RAG/LLM` -> `py_tree.json` -> `main.py` -> `QGC Plugin`
### 5.2. 任务执行与反馈流程
`QGC Plugin` -> `POST /execute_mission` -> `main.py` -> `ros2_client.send_goal()` -> `ROS2 Network` -> `Drone`
`Drone` -> `ROS2 Feedback` -> `ros2_client.feedback_callback()` -> `websocket_manager.broadcast()` -> `WS /ws/status` -> `QGC Plugin`
## 6. 后续步骤
1. **环境搭建**: 根据`requirements.txt`创建虚拟环境并安装所有依赖。
2. **代码实现**: 按照上述设计,逐一编写`websocket_manager.py`, `py_tree_generator.py`, `ros2_client.py`, `models.py`, 和 `main.py`
3. **单元测试与集成测试**: (可选,但建议) 为关键模块(如`py_tree_generator`)编写单元测试,并进行完整的集成测试。
4. **部署**: 使用`uvicorn`等ASGI服务器运行FastAPI应用。
---