bev-project/ANALYZE_RESULTS.py

513 lines
18 KiB
Python
Raw Permalink Normal View History

2025-11-21 10:50:51 +08:00
#!/usr/bin/env python
"""
分析BEVFusion评估结果的完整脚本
支持3D检测和BEV分割结果分析
"""
import os
import pickle
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, jaccard_score
from sklearn.metrics import classification_report
# 设置环境
os.environ['PATH'] = '/opt/conda/bin:' + os.environ.get('PATH', '')
os.environ['LD_LIBRARY_PATH'] = '/opt/conda/lib/python3.8/site-packages/torch/lib:/opt/conda/lib:/usr/local/cuda/lib64:' + os.environ.get('LD_LIBRARY_PATH', '')
os.environ['PYTHONPATH'] = '/workspace/bevfusion:' + os.environ.get('PYTHONPATH', '')
def load_and_inspect_pkl(pkl_path):
"""加载pkl文件并检查基本结构"""
print("="*60)
print("加载和检查pkl文件结构")
print("="*60)
try:
with open(pkl_path, 'rb') as f:
results = pickle.load(f)
print(f"✓ 成功加载pkl文件: {pkl_path}")
print(f"数据类型: {type(results)}")
if isinstance(results, list):
print(f"包含 {len(results)} 个样本的结果")
if len(results) > 0:
sample = results[0]
print("\n第一个样本的结构:")
print(f" 键: {list(sample.keys())}")
# 分析每个字段
for key, value in sample.items():
if hasattr(value, 'shape'):
print(f" {key}: {type(value)}, shape: {value.shape}")
elif hasattr(value, '__len__') and not isinstance(value, str):
print(f" {key}: {type(value)}, length: {len(value)}")
else:
print(f" {key}: {type(value)}, value: {value}")
elif isinstance(results, dict):
print(f"字典结构,包含 {len(results)} 个键: {list(results.keys())}")
for key, value in results.items():
if hasattr(value, 'shape'):
print(f" {key}: {type(value)}, shape: {value.shape}")
elif hasattr(value, '__len__') and not isinstance(value, str):
print(f" {key}: {type(value)}, length: {len(value)}")
else:
print(f" {key}: {type(value)}, value: {value}")
return results
except Exception as e:
print(f"❌ 加载失败: {e}")
return None
def analyze_detection_results(results):
"""分析3D检测结果"""
print("\n" + "="*60)
print("分析3D检测结果")
print("="*60)
if not isinstance(results, list):
print("❌ 结果不是列表格式,无法分析检测结果")
return
total_boxes = 0
all_scores = []
all_labels = []
sample_stats = []
for i, sample in enumerate(results):
if 'boxes_3d' in sample:
boxes = sample['boxes_3d']
scores = sample.get('scores_3d', [])
labels = sample.get('labels_3d', [])
total_boxes += len(boxes)
all_scores.extend(scores)
all_labels.extend(labels)
sample_stats.append({
'sample_id': i,
'num_boxes': len(boxes),
'avg_score': float(np.mean(scores)) if len(scores) > 0 else 0,
'max_score': float(np.max(scores)) if len(scores) > 0 else 0
})
print(f"总检测框数量: {total_boxes}")
print(".2f")
if all_scores:
print(f"分数范围: {min(all_scores):.3f} ~ {max(all_scores):.3f}")
print(f"平均分数: {np.mean(all_scores):.3f}")
if all_labels:
unique_labels, counts = np.unique(all_labels, return_counts=True)
print("\n类别分布:")
for label, count in zip(unique_labels, counts):
print(f" 类别 {label}: {count} 个检测框")
# 可视化检测置信度分布
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.hist(all_scores, bins=50, alpha=0.7, edgecolor='black')
plt.xlabel('Detection Confidence')
plt.ylabel('Count')
plt.title('Detection Confidence Distribution')
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.bar(unique_labels, counts, alpha=0.7, edgecolor='black')
plt.xlabel('Class ID')
plt.ylabel('Count')
plt.title('Class Distribution')
plt.xticks(unique_labels)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('/data/eval_fast/detection_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
print("✓ 检测分析图表已保存至: /data/eval_fast/detection_analysis.png")
# 样本统计
if sample_stats:
print("\n前10个样本的检测统计:")
for stat in sample_stats[:10]:
print(f" 样本 {stat['sample_id']}: {stat['num_boxes']} 框, 平均置信度 {stat['avg_score']:.3f}, 最高置信度 {stat['max_score']:.3f}")
def analyze_segmentation_results(results):
"""分析BEV分割结果"""
print("\n" + "="*60)
print("分析BEV分割结果")
print("="*60)
if not isinstance(results, list):
print("❌ 结果不是列表格式,无法分析分割结果")
return
segmentation_stats = []
for i, sample in enumerate(results):
if 'masks_bev' in sample:
mask = sample['masks_bev'] # 应该是 (C, H, W) 格式
gt_mask = sample.get('gt_masks_bev')
if isinstance(mask, np.ndarray):
# 如果是numpy数组直接分析
pred_mask = mask
elif hasattr(mask, 'cpu'):
# 如果是torch tensor转换为numpy
pred_mask = mask.cpu().numpy()
else:
print(f"⚠️ 样本 {i} 的mask格式不支持: {type(mask)}")
continue
# 计算各类别像素数量
unique_classes, pixel_counts = np.unique(pred_mask, return_counts=True)
stat = {
'sample_id': i,
'mask_shape': pred_mask.shape,
'num_classes': len(unique_classes),
'pixel_counts': dict(zip(unique_classes, pixel_counts)),
'total_pixels': pred_mask.size
}
if gt_mask is not None:
if hasattr(gt_mask, 'cpu'):
gt_mask = gt_mask.cpu().numpy()
# 计算IoU
pred_flat = pred_mask.flatten()
gt_flat = gt_mask.flatten()
# 获取真实标签中的唯一类别
gt_unique_classes = np.unique(gt_flat)
# 只使用存在于真实标签中的类别
valid_classes = np.intersect1d(unique_classes, gt_unique_classes)
if len(valid_classes) > 0:
# 计算混淆矩阵
cm = confusion_matrix(gt_flat, pred_flat, labels=valid_classes)
else:
cm = None
# 计算各类IoU
if cm is not None:
iou_per_class = []
for j in range(len(valid_classes)):
if cm[j,:].sum() + cm[:,j].sum() - cm[j,j] > 0:
iou = cm[j,j] / (cm[j,:].sum() + cm[:,j].sum() - cm[j,j])
iou_per_class.append(iou)
else:
iou_per_class.append(0)
stat['iou_per_class'] = dict(zip(valid_classes, iou_per_class))
stat['miou'] = np.mean(iou_per_class) if iou_per_class else 0
else:
stat['iou_per_class'] = {}
stat['miou'] = 0
segmentation_stats.append(stat)
if segmentation_stats:
print(f"分析了 {len(segmentation_stats)} 个样本的分割结果")
# 汇总统计
all_classes = set()
for stat in segmentation_stats:
all_classes.update(stat['pixel_counts'].keys())
print(f"\n总共发现 {len(all_classes)} 个类别: {sorted(all_classes)}")
# 计算平均IoU
if 'miou' in segmentation_stats[0]:
mious = [stat['miou'] for stat in segmentation_stats if 'miou' in stat]
print(f"平均mIoU: {np.mean(mious):.4f}")
# 各类别平均IoU
class_iou_sum = {}
class_iou_count = {}
for stat in segmentation_stats:
if 'iou_per_class' in stat:
for cls, iou in stat['iou_per_class'].items():
if cls not in class_iou_sum:
class_iou_sum[cls] = 0
class_iou_count[cls] = 0
class_iou_sum[cls] += iou
class_iou_count[cls] += 1
print("\n各类别平均IoU:")
for cls in sorted(class_iou_sum.keys()):
avg_iou = class_iou_sum[cls] / class_iou_count[cls]
print(f" 类别 {cls}: {avg_iou:.4f}")
# 可视化分割结果分布
plt.figure(figsize=(15, 10))
# 像素分布
plt.subplot(2, 2, 1)
sample_pixels = segmentation_stats[0]['pixel_counts']
classes = list(sample_pixels.keys())
pixels = list(sample_pixels.values())
plt.bar(classes, pixels, alpha=0.7, edgecolor='black')
plt.xlabel('Class ID')
plt.ylabel('Pixel Count')
plt.title('Pixel Distribution (Sample 0)')
plt.xticks(classes)
plt.grid(True, alpha=0.3)
# IoU分布
if 'iou_per_class' in segmentation_stats[0]:
plt.subplot(2, 2, 2)
ious = list(segmentation_stats[0]['iou_per_class'].values())
plt.bar(classes, ious, alpha=0.7, edgecolor='black', color='green')
plt.xlabel('Class ID')
plt.ylabel('IoU')
plt.title('IoU per Class (Sample 0)')
plt.xticks(classes)
plt.ylim(0, 1)
plt.grid(True, alpha=0.3)
# mIoU分布
if 'miou' in segmentation_stats[0]:
plt.subplot(2, 2, 3)
mious = [stat['miou'] for stat in segmentation_stats if 'miou' in stat]
plt.hist(mious, bins=20, alpha=0.7, edgecolor='black')
plt.xlabel('mIoU')
plt.ylabel('Count')
plt.title('mIoU Distribution Across Samples')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('/data/eval_fast/segmentation_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
print("✓ 分割分析图表已保存至: /data/eval_fast/segmentation_analysis.png")
def visualize_sample(results, sample_idx=0):
"""可视化单个样本的结果"""
print("\n" + "="*60)
print(f"可视化样本 {sample_idx}")
print("="*60)
if not isinstance(results, list) or sample_idx >= len(results):
print(f"❌ 无效的样本索引 {sample_idx}")
return
sample = results[sample_idx]
fig = plt.figure(figsize=(20, 10))
# 检测结果
if 'boxes_3d' in sample and 'scores_3d' in sample:
plt.subplot(2, 3, 1)
scores = sample['scores_3d']
plt.hist(scores, bins=20, alpha=0.7, edgecolor='black')
plt.xlabel('Confidence Score')
plt.ylabel('Count')
plt.title(f'Detection Scores (Sample {sample_idx})')
plt.grid(True, alpha=0.3)
# BEV分割结果
if 'masks_bev' in sample:
mask = sample['masks_bev']
if hasattr(mask, 'cpu'):
mask = mask.cpu().numpy()
if mask.ndim == 3: # (C, H, W)
# 显示前6个类别
for i in range(min(6, mask.shape[0])):
plt.subplot(2, 3, i+2)
plt.imshow(mask[i], cmap='viridis')
plt.title(f'BEV Class {i} (Sample {sample_idx})')
plt.axis('off')
else:
plt.subplot(2, 3, 2)
plt.imshow(mask, cmap='tab20')
plt.title(f'BEV Segmentation (Sample {sample_idx})')
plt.axis('off')
plt.tight_layout()
plt.savefig(f'/data/eval_fast/sample_{sample_idx}_visualization.png', dpi=300, bbox_inches='tight')
plt.show()
print(f"✓ 样本{sample_idx}可视化已保存至: /data/eval_fast/sample_{sample_idx}_visualization.png")
def generate_performance_report(results):
"""生成性能报告"""
print("\n" + "="*60)
print("生成性能报告")
print("="*60)
report = {}
if not isinstance(results, list):
print("❌ 无法生成报告:结果不是列表格式")
return report
# 检测指标
if any('boxes_3d' in sample for sample in results):
detection_stats = {
'total_samples': len(results),
'total_detections': 0,
'avg_detections_per_sample': 0,
'score_distribution': {'min': float('inf'), 'max': 0, 'mean': 0},
'class_distribution': {}
}
all_scores = []
all_labels = []
for sample in results:
if 'boxes_3d' in sample:
detection_stats['total_detections'] += len(sample['boxes_3d'])
if 'scores_3d' in sample:
all_scores.extend(sample['scores_3d'])
if 'labels_3d' in sample:
all_labels.extend(sample['labels_3d'])
detection_stats['avg_detections_per_sample'] = detection_stats['total_detections'] / detection_stats['total_samples']
if all_scores:
detection_stats['score_distribution'] = {
'min': float(np.min(all_scores)),
'max': float(np.max(all_scores)),
'mean': float(np.mean(all_scores))
}
if all_labels:
unique_labels, counts = np.unique(all_labels, return_counts=True)
detection_stats['class_distribution'] = dict(zip(unique_labels.astype(int), counts))
report['detection'] = detection_stats
# 分割指标
seg_samples = [s for s in results if 'masks_bev' in s and 'gt_masks_bev' in s]
if seg_samples:
segmentation_stats = {
'total_samples': len(seg_samples),
'avg_miou': 0,
'class_wise_iou': {}
}
all_mious = []
class_iou_sum = {}
class_iou_count = {}
for sample in seg_samples:
pred_mask = sample['masks_bev']
gt_mask = sample['gt_masks_bev']
if hasattr(pred_mask, 'cpu'):
pred_mask = pred_mask.cpu().numpy()
if hasattr(gt_mask, 'cpu'):
gt_mask = gt_mask.cpu().numpy()
pred_flat = pred_mask.flatten()
gt_flat = gt_mask.flatten()
# 计算mIoU
try:
miou = jaccard_score(gt_flat, pred_flat, average='macro')
all_mious.append(miou)
except:
continue
if all_mious:
segmentation_stats['avg_miou'] = float(np.mean(all_mious))
report['segmentation'] = segmentation_stats
# 打印报告
print("性能报告:")
for task, metrics in report.items():
print(f"\n{task.upper()}:")
if isinstance(metrics, dict):
for key, value in metrics.items():
if isinstance(value, dict):
print(f" {key}:")
for subkey, subvalue in value.items():
print(f" {subkey}: {subvalue}")
else:
print(f" {key}: {value}")
return report
def compare_with_baseline(current_results, baseline_path):
"""与基准结果比较"""
print("\n" + "="*60)
print("与基准结果比较")
print("="*60)
try:
with open(baseline_path, 'rb') as f:
baseline = pickle.load(f)
current_report = generate_performance_report(current_results)
baseline_report = generate_performance_report(baseline)
print("\n性能比较:")
for task in current_report:
if task in baseline_report:
print(f"\n{task.upper()}:")
current_metrics = current_report[task]
baseline_metrics = baseline_report[task]
if isinstance(current_metrics, dict) and isinstance(baseline_metrics, dict):
for metric in current_metrics:
if metric in baseline_metrics:
current_val = current_metrics[metric]
baseline_val = baseline_metrics[metric]
if isinstance(current_val, (int, float)) and isinstance(baseline_val, (int, float)):
diff = current_val - baseline_val
print(f" {metric}: {current_val:.4f} vs {baseline_val:.4f} ({diff:+.4f})")
elif isinstance(current_val, dict) and isinstance(baseline_val, dict):
print(f" {metric}:")
for submetric in current_val:
if submetric in baseline_val:
c_val = current_val[submetric]
b_val = baseline_val[submetric]
if isinstance(c_val, (int, float)) and isinstance(b_val, (int, float)):
diff = c_val - b_val
print(f" {submetric}: {c_val:.4f} vs {b_val:.4f} ({diff:+.4f})")
except Exception as e:
print(f"❌ 基准比较失败: {e}")
def main():
# 结果文件路径
results_path = '/data/eval_fast/epoch1_fast_20251119_133104/fast_results.pkl'
# 加载和检查结果
results = load_and_inspect_pkl(results_path)
if results is None:
return
# 分析检测结果
analyze_detection_results(results)
# 分析分割结果
analyze_segmentation_results(results)
# 可视化样本
if len(results) > 0:
visualize_sample(results, sample_idx=0)
# 生成性能报告
performance_report = generate_performance_report(results)
# 可选:与基准比较 (如果有基准文件)
# baseline_path = '/path/to/baseline_results.pkl'
# compare_with_baseline(results, baseline_path)
print("\n" + "="*60)
print("分析完成!")
print("生成的文件:")
print(" - /data/eval_fast/detection_analysis.png")
print(" - /data/eval_fast/segmentation_analysis.png")
print(" - /data/eval_fast/sample_0_visualization.png")
print("="*60)
if __name__ == '__main__':
main()