#!/usr/bin/env python """ 分析BEVFusion评估结果的完整脚本 支持3D检测和BEV分割结果分析 """ import os import pickle import numpy as np import matplotlib.pyplot as plt from sklearn.metrics import confusion_matrix, jaccard_score from sklearn.metrics import classification_report # 设置环境 os.environ['PATH'] = '/opt/conda/bin:' + os.environ.get('PATH', '') os.environ['LD_LIBRARY_PATH'] = '/opt/conda/lib/python3.8/site-packages/torch/lib:/opt/conda/lib:/usr/local/cuda/lib64:' + os.environ.get('LD_LIBRARY_PATH', '') os.environ['PYTHONPATH'] = '/workspace/bevfusion:' + os.environ.get('PYTHONPATH', '') def load_and_inspect_pkl(pkl_path): """加载pkl文件并检查基本结构""" print("="*60) print("加载和检查pkl文件结构") print("="*60) try: with open(pkl_path, 'rb') as f: results = pickle.load(f) print(f"✓ 成功加载pkl文件: {pkl_path}") print(f"数据类型: {type(results)}") if isinstance(results, list): print(f"包含 {len(results)} 个样本的结果") if len(results) > 0: sample = results[0] print("\n第一个样本的结构:") print(f" 键: {list(sample.keys())}") # 分析每个字段 for key, value in sample.items(): if hasattr(value, 'shape'): print(f" {key}: {type(value)}, shape: {value.shape}") elif hasattr(value, '__len__') and not isinstance(value, str): print(f" {key}: {type(value)}, length: {len(value)}") else: print(f" {key}: {type(value)}, value: {value}") elif isinstance(results, dict): print(f"字典结构,包含 {len(results)} 个键: {list(results.keys())}") for key, value in results.items(): if hasattr(value, 'shape'): print(f" {key}: {type(value)}, shape: {value.shape}") elif hasattr(value, '__len__') and not isinstance(value, str): print(f" {key}: {type(value)}, length: {len(value)}") else: print(f" {key}: {type(value)}, value: {value}") return results except Exception as e: print(f"❌ 加载失败: {e}") return None def analyze_detection_results(results): """分析3D检测结果""" print("\n" + "="*60) print("分析3D检测结果") print("="*60) if not isinstance(results, list): print("❌ 结果不是列表格式,无法分析检测结果") return total_boxes = 0 all_scores = [] all_labels = [] sample_stats = [] for i, sample in enumerate(results): if 'boxes_3d' in sample: boxes = sample['boxes_3d'] scores = sample.get('scores_3d', []) labels = sample.get('labels_3d', []) total_boxes += len(boxes) all_scores.extend(scores) all_labels.extend(labels) sample_stats.append({ 'sample_id': i, 'num_boxes': len(boxes), 'avg_score': float(np.mean(scores)) if len(scores) > 0 else 0, 'max_score': float(np.max(scores)) if len(scores) > 0 else 0 }) print(f"总检测框数量: {total_boxes}") print(".2f") if all_scores: print(f"分数范围: {min(all_scores):.3f} ~ {max(all_scores):.3f}") print(f"平均分数: {np.mean(all_scores):.3f}") if all_labels: unique_labels, counts = np.unique(all_labels, return_counts=True) print("\n类别分布:") for label, count in zip(unique_labels, counts): print(f" 类别 {label}: {count} 个检测框") # 可视化检测置信度分布 plt.figure(figsize=(12, 5)) plt.subplot(1, 2, 1) plt.hist(all_scores, bins=50, alpha=0.7, edgecolor='black') plt.xlabel('Detection Confidence') plt.ylabel('Count') plt.title('Detection Confidence Distribution') plt.grid(True, alpha=0.3) plt.subplot(1, 2, 2) plt.bar(unique_labels, counts, alpha=0.7, edgecolor='black') plt.xlabel('Class ID') plt.ylabel('Count') plt.title('Class Distribution') plt.xticks(unique_labels) plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig('/data/eval_fast/detection_analysis.png', dpi=300, bbox_inches='tight') plt.show() print("✓ 检测分析图表已保存至: /data/eval_fast/detection_analysis.png") # 样本统计 if sample_stats: print("\n前10个样本的检测统计:") for stat in sample_stats[:10]: print(f" 样本 {stat['sample_id']}: {stat['num_boxes']} 框, 平均置信度 {stat['avg_score']:.3f}, 最高置信度 {stat['max_score']:.3f}") def analyze_segmentation_results(results): """分析BEV分割结果""" print("\n" + "="*60) print("分析BEV分割结果") print("="*60) if not isinstance(results, list): print("❌ 结果不是列表格式,无法分析分割结果") return segmentation_stats = [] for i, sample in enumerate(results): if 'masks_bev' in sample: mask = sample['masks_bev'] # 应该是 (C, H, W) 格式 gt_mask = sample.get('gt_masks_bev') if isinstance(mask, np.ndarray): # 如果是numpy数组,直接分析 pred_mask = mask elif hasattr(mask, 'cpu'): # 如果是torch tensor,转换为numpy pred_mask = mask.cpu().numpy() else: print(f"⚠️ 样本 {i} 的mask格式不支持: {type(mask)}") continue # 计算各类别像素数量 unique_classes, pixel_counts = np.unique(pred_mask, return_counts=True) stat = { 'sample_id': i, 'mask_shape': pred_mask.shape, 'num_classes': len(unique_classes), 'pixel_counts': dict(zip(unique_classes, pixel_counts)), 'total_pixels': pred_mask.size } if gt_mask is not None: if hasattr(gt_mask, 'cpu'): gt_mask = gt_mask.cpu().numpy() # 计算IoU pred_flat = pred_mask.flatten() gt_flat = gt_mask.flatten() # 获取真实标签中的唯一类别 gt_unique_classes = np.unique(gt_flat) # 只使用存在于真实标签中的类别 valid_classes = np.intersect1d(unique_classes, gt_unique_classes) if len(valid_classes) > 0: # 计算混淆矩阵 cm = confusion_matrix(gt_flat, pred_flat, labels=valid_classes) else: cm = None # 计算各类IoU if cm is not None: iou_per_class = [] for j in range(len(valid_classes)): if cm[j,:].sum() + cm[:,j].sum() - cm[j,j] > 0: iou = cm[j,j] / (cm[j,:].sum() + cm[:,j].sum() - cm[j,j]) iou_per_class.append(iou) else: iou_per_class.append(0) stat['iou_per_class'] = dict(zip(valid_classes, iou_per_class)) stat['miou'] = np.mean(iou_per_class) if iou_per_class else 0 else: stat['iou_per_class'] = {} stat['miou'] = 0 segmentation_stats.append(stat) if segmentation_stats: print(f"分析了 {len(segmentation_stats)} 个样本的分割结果") # 汇总统计 all_classes = set() for stat in segmentation_stats: all_classes.update(stat['pixel_counts'].keys()) print(f"\n总共发现 {len(all_classes)} 个类别: {sorted(all_classes)}") # 计算平均IoU if 'miou' in segmentation_stats[0]: mious = [stat['miou'] for stat in segmentation_stats if 'miou' in stat] print(f"平均mIoU: {np.mean(mious):.4f}") # 各类别平均IoU class_iou_sum = {} class_iou_count = {} for stat in segmentation_stats: if 'iou_per_class' in stat: for cls, iou in stat['iou_per_class'].items(): if cls not in class_iou_sum: class_iou_sum[cls] = 0 class_iou_count[cls] = 0 class_iou_sum[cls] += iou class_iou_count[cls] += 1 print("\n各类别平均IoU:") for cls in sorted(class_iou_sum.keys()): avg_iou = class_iou_sum[cls] / class_iou_count[cls] print(f" 类别 {cls}: {avg_iou:.4f}") # 可视化分割结果分布 plt.figure(figsize=(15, 10)) # 像素分布 plt.subplot(2, 2, 1) sample_pixels = segmentation_stats[0]['pixel_counts'] classes = list(sample_pixels.keys()) pixels = list(sample_pixels.values()) plt.bar(classes, pixels, alpha=0.7, edgecolor='black') plt.xlabel('Class ID') plt.ylabel('Pixel Count') plt.title('Pixel Distribution (Sample 0)') plt.xticks(classes) plt.grid(True, alpha=0.3) # IoU分布 if 'iou_per_class' in segmentation_stats[0]: plt.subplot(2, 2, 2) ious = list(segmentation_stats[0]['iou_per_class'].values()) plt.bar(classes, ious, alpha=0.7, edgecolor='black', color='green') plt.xlabel('Class ID') plt.ylabel('IoU') plt.title('IoU per Class (Sample 0)') plt.xticks(classes) plt.ylim(0, 1) plt.grid(True, alpha=0.3) # mIoU分布 if 'miou' in segmentation_stats[0]: plt.subplot(2, 2, 3) mious = [stat['miou'] for stat in segmentation_stats if 'miou' in stat] plt.hist(mious, bins=20, alpha=0.7, edgecolor='black') plt.xlabel('mIoU') plt.ylabel('Count') plt.title('mIoU Distribution Across Samples') plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig('/data/eval_fast/segmentation_analysis.png', dpi=300, bbox_inches='tight') plt.show() print("✓ 分割分析图表已保存至: /data/eval_fast/segmentation_analysis.png") def visualize_sample(results, sample_idx=0): """可视化单个样本的结果""" print("\n" + "="*60) print(f"可视化样本 {sample_idx}") print("="*60) if not isinstance(results, list) or sample_idx >= len(results): print(f"❌ 无效的样本索引 {sample_idx}") return sample = results[sample_idx] fig = plt.figure(figsize=(20, 10)) # 检测结果 if 'boxes_3d' in sample and 'scores_3d' in sample: plt.subplot(2, 3, 1) scores = sample['scores_3d'] plt.hist(scores, bins=20, alpha=0.7, edgecolor='black') plt.xlabel('Confidence Score') plt.ylabel('Count') plt.title(f'Detection Scores (Sample {sample_idx})') plt.grid(True, alpha=0.3) # BEV分割结果 if 'masks_bev' in sample: mask = sample['masks_bev'] if hasattr(mask, 'cpu'): mask = mask.cpu().numpy() if mask.ndim == 3: # (C, H, W) # 显示前6个类别 for i in range(min(6, mask.shape[0])): plt.subplot(2, 3, i+2) plt.imshow(mask[i], cmap='viridis') plt.title(f'BEV Class {i} (Sample {sample_idx})') plt.axis('off') else: plt.subplot(2, 3, 2) plt.imshow(mask, cmap='tab20') plt.title(f'BEV Segmentation (Sample {sample_idx})') plt.axis('off') plt.tight_layout() plt.savefig(f'/data/eval_fast/sample_{sample_idx}_visualization.png', dpi=300, bbox_inches='tight') plt.show() print(f"✓ 样本{sample_idx}可视化已保存至: /data/eval_fast/sample_{sample_idx}_visualization.png") def generate_performance_report(results): """生成性能报告""" print("\n" + "="*60) print("生成性能报告") print("="*60) report = {} if not isinstance(results, list): print("❌ 无法生成报告:结果不是列表格式") return report # 检测指标 if any('boxes_3d' in sample for sample in results): detection_stats = { 'total_samples': len(results), 'total_detections': 0, 'avg_detections_per_sample': 0, 'score_distribution': {'min': float('inf'), 'max': 0, 'mean': 0}, 'class_distribution': {} } all_scores = [] all_labels = [] for sample in results: if 'boxes_3d' in sample: detection_stats['total_detections'] += len(sample['boxes_3d']) if 'scores_3d' in sample: all_scores.extend(sample['scores_3d']) if 'labels_3d' in sample: all_labels.extend(sample['labels_3d']) detection_stats['avg_detections_per_sample'] = detection_stats['total_detections'] / detection_stats['total_samples'] if all_scores: detection_stats['score_distribution'] = { 'min': float(np.min(all_scores)), 'max': float(np.max(all_scores)), 'mean': float(np.mean(all_scores)) } if all_labels: unique_labels, counts = np.unique(all_labels, return_counts=True) detection_stats['class_distribution'] = dict(zip(unique_labels.astype(int), counts)) report['detection'] = detection_stats # 分割指标 seg_samples = [s for s in results if 'masks_bev' in s and 'gt_masks_bev' in s] if seg_samples: segmentation_stats = { 'total_samples': len(seg_samples), 'avg_miou': 0, 'class_wise_iou': {} } all_mious = [] class_iou_sum = {} class_iou_count = {} for sample in seg_samples: pred_mask = sample['masks_bev'] gt_mask = sample['gt_masks_bev'] if hasattr(pred_mask, 'cpu'): pred_mask = pred_mask.cpu().numpy() if hasattr(gt_mask, 'cpu'): gt_mask = gt_mask.cpu().numpy() pred_flat = pred_mask.flatten() gt_flat = gt_mask.flatten() # 计算mIoU try: miou = jaccard_score(gt_flat, pred_flat, average='macro') all_mious.append(miou) except: continue if all_mious: segmentation_stats['avg_miou'] = float(np.mean(all_mious)) report['segmentation'] = segmentation_stats # 打印报告 print("性能报告:") for task, metrics in report.items(): print(f"\n{task.upper()}:") if isinstance(metrics, dict): for key, value in metrics.items(): if isinstance(value, dict): print(f" {key}:") for subkey, subvalue in value.items(): print(f" {subkey}: {subvalue}") else: print(f" {key}: {value}") return report def compare_with_baseline(current_results, baseline_path): """与基准结果比较""" print("\n" + "="*60) print("与基准结果比较") print("="*60) try: with open(baseline_path, 'rb') as f: baseline = pickle.load(f) current_report = generate_performance_report(current_results) baseline_report = generate_performance_report(baseline) print("\n性能比较:") for task in current_report: if task in baseline_report: print(f"\n{task.upper()}:") current_metrics = current_report[task] baseline_metrics = baseline_report[task] if isinstance(current_metrics, dict) and isinstance(baseline_metrics, dict): for metric in current_metrics: if metric in baseline_metrics: current_val = current_metrics[metric] baseline_val = baseline_metrics[metric] if isinstance(current_val, (int, float)) and isinstance(baseline_val, (int, float)): diff = current_val - baseline_val print(f" {metric}: {current_val:.4f} vs {baseline_val:.4f} ({diff:+.4f})") elif isinstance(current_val, dict) and isinstance(baseline_val, dict): print(f" {metric}:") for submetric in current_val: if submetric in baseline_val: c_val = current_val[submetric] b_val = baseline_val[submetric] if isinstance(c_val, (int, float)) and isinstance(b_val, (int, float)): diff = c_val - b_val print(f" {submetric}: {c_val:.4f} vs {b_val:.4f} ({diff:+.4f})") except Exception as e: print(f"❌ 基准比较失败: {e}") def main(): # 结果文件路径 results_path = '/data/eval_fast/epoch1_fast_20251119_133104/fast_results.pkl' # 加载和检查结果 results = load_and_inspect_pkl(results_path) if results is None: return # 分析检测结果 analyze_detection_results(results) # 分析分割结果 analyze_segmentation_results(results) # 可视化样本 if len(results) > 0: visualize_sample(results, sample_idx=0) # 生成性能报告 performance_report = generate_performance_report(results) # 可选:与基准比较 (如果有基准文件) # baseline_path = '/path/to/baseline_results.pkl' # compare_with_baseline(results, baseline_path) print("\n" + "="*60) print("分析完成!") print("生成的文件:") print(" - /data/eval_fast/detection_analysis.png") print(" - /data/eval_fast/segmentation_analysis.png") print(" - /data/eval_fast/sample_0_visualization.png") print("="*60) if __name__ == '__main__': main()