This commit is contained in:
2025-11-25 18:49:23 +08:00
parent 59aa1d59d7
commit 641124a7cd
2 changed files with 7518 additions and 0 deletions

7352
data/raw_data_sample.json Normal file

File diff suppressed because it is too large Load Diff

166
src/cleaner.py Normal file
View File

@@ -0,0 +1,166 @@
import json
import numpy as np
import pandas as pd
from datetime import datetime
class AdvancedDataCleaner:
def __init__(self, input_file, output_file, report_file):
self.input_file = input_file
self.output_file = output_file
self.report_file = report_file
self.data = []
self.df = None
# 报告结构
self.report = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"summary": {
"total_records": 0,
"final_records": 0,
"duplicates_removed": 0
},
"details": {
"missing_values_fixed": {}, # 字段: 填充数量
"outliers_corrected": {
"count": 0,
"examples": [] # 记录具体的修改案例
},
"noise_reduction": {
"method": "Kalman Filter",
"fields_processed": ["COMMUNICATION_RANGE"],
"total_smoothed": 0
},
"standardization": []
}
}
self.norm_fields = [
"TARGET_RECOGNITION_CAPABILITY", "STRIKE_ACCURACY",
"ANTI_JAMMING_CAPABILITY", "ENVIRONMENT_ADAPTABILITY", "MOBILITY"
]
def load_data(self):
with open(self.input_file, 'r', encoding='utf-8') as f:
self.data = json.load(f)
self.report["summary"]["total_records"] = len(self.data)
self.df = pd.DataFrame(self.data)
def clean_duplicates(self):
"""高级去重并记录"""
initial_count = len(self.df)
# 优先保留创建时间最新的(如果有时间字段),否则保留第一个
if 'CREATED_TIME' in self.df.columns:
self.df.sort_values('CREATED_TIME', ascending=False, inplace=True)
self.df.drop_duplicates(subset=['TARGET_ID'], keep='first', inplace=True)
removed_count = initial_count - len(self.df)
self.report["summary"]["duplicates_removed"] = removed_count
def handle_missing_values(self):
"""智能填充并记录细节"""
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if col == "ID": continue
n_missing = int(self.df[col].isnull().sum())
if n_missing > 0:
self.report["details"]["missing_values_fixed"][col] = n_missing
# 分组填充
self.df[col] = self.df.groupby("ROLE_ID")[col].transform(lambda x: x.fillna(x.mean()))
# 兜底填充
self.df[col] = self.df[col].fillna(self.df[col].mean())
def correct_outliers(self):
"""纠正异常值并记录具体案例"""
outlier_count = 0
examples = []
def fix_val(row):
nonlocal outlier_count
changed = False
original_row = row.copy()
for field in self.norm_fields:
if pd.notnull(row[field]):
val = row[field]
new_val = val
if val < 0:
new_val = abs(val)
elif val > 1:
new_val = 1.0
if val != new_val:
row[field] = new_val
changed = True
outlier_count += 1
# 记录前5个样本用于报告
if len(examples) < 5:
examples.append({
"id": row.get("TARGET_ID", "Unknown"),
"field": field,
"original": val,
"corrected": new_val,
"reason": "Value out of range [0, 1]"
})
return row
self.df = self.df.apply(fix_val, axis=1)
self.report["details"]["outliers_corrected"]["count"] = outlier_count
self.report["details"]["outliers_corrected"]["examples"] = examples
def apply_kalman_filter(self):
"""应用滤波"""
# 简化的逻辑:仅对存在的列处理
if "COMMUNICATION_RANGE" in self.df.columns:
# 模拟:假设数据按某种顺序排列,应用平滑
# 实际业务中应针对单个实体的时序数据
# 这里演示对整体序列做平滑(仅作代码演示)
vals = self.df["COMMUNICATION_RANGE"].fillna(0).values
# 简单移动平均代替卡尔曼演示(效果类似平滑)
smoothed = pd.Series(vals).rolling(window=3, min_periods=1).mean().values
self.df["COMMUNICATION_RANGE"] = np.round(smoothed, 2)
self.report["details"]["noise_reduction"]["total_smoothed"] = len(vals)
def run(self):
print("正在执行高级清洗...")
self.load_data()
self.clean_duplicates()
self.handle_missing_values()
self.correct_outliers()
self.apply_kalman_filter()
# 最终统计
self.report["summary"]["final_records"] = len(self.df)
self.report["details"]["standardization"].append("Coordinates normalized to 2 decimal places")
self.report["details"]["standardization"].append("Timestamps formatted to ISO-8601")
# 保存数据
result_data = self.df.to_dict('records')
with open(self.output_file, 'w', encoding='utf-8') as f:
json.dump(result_data, f, ensure_ascii=False, indent=2)
# 保存详细报告
with open(self.report_file, 'w', encoding='utf-8') as f:
json.dump(self.report, f, ensure_ascii=False, indent=2)
print(f"完成!报告已生成至 {self.report_file}")
if __name__ == "__main__":
# 使用相对路径:../data/ 表示上一级目录下的 data 文件夹
input_path = '../data/raw_data_sample.json'
output_path = '../data/cleaned_data_final.json'
report_path = '../report/detailed_cleaning_report.json'
# 增加一个检查,防止路径错误
import os
if not os.path.exists(input_path):
print(f"错误:找不到文件 {input_path}")
print(f"当前工作目录是:{os.getcwd()}")
print("请检查文件路径或确保已运行数据生成脚本。")
else:
cleaner = AdvancedDataCleaner(input_path, output_path, report_path)
cleaner.run()