import os from typing import Any, Dict, Tuple import mmcv import numpy as np from mmdet3d.core.points import RadarPoints from nuscenes.utils.data_classes import RadarPointCloud from nuscenes.map_expansion.map_api import NuScenesMap from nuscenes.map_expansion.map_api import locations as LOCATIONS from PIL import Image from mmdet3d.core.points import BasePoints, get_points_type from mmdet.datasets.builder import PIPELINES from mmdet.datasets.pipelines import LoadAnnotations import torch from .loading_utils import load_augmented_point_cloud, reduce_LiDAR_beams @PIPELINES.register_module() class LoadMultiViewImageFromFiles: """Load multi channel images from a list of separate channel files. Expects results['image_paths'] to be a list of filenames. Args: to_float32 (bool): Whether to convert the img to float32. Defaults to False. color_type (str): Color type of the file. Defaults to 'unchanged'. """ def __init__(self, to_float32=False, color_type="unchanged"): self.to_float32 = to_float32 self.color_type = color_type def __call__(self, results): """Call function to load multi-view image from files. Args: results (dict): Result dict containing multi-view image filenames. Returns: dict: The result dict containing the multi-view image data. \ Added keys and values are described below. - filename (str): Multi-view image filenames. - img (np.ndarray): Multi-view image arrays. - img_shape (tuple[int]): Shape of multi-view image arrays. - ori_shape (tuple[int]): Shape of original image arrays. - pad_shape (tuple[int]): Shape of padded image arrays. - scale_factor (float): Scale factor. - img_norm_cfg (dict): Normalization configuration of images. """ filename = results["image_paths"] # img is of shape (h, w, c, num_views) # modified for waymo images = [] h, w = 0, 0 for name in filename: images.append(Image.open(name)) #TODO: consider image padding in waymo results["filename"] = filename # unravel to list, see `DefaultFormatBundle` in formating.py # which will transpose each image separately and then stack into array results["img"] = images # [1600, 900] results["img_shape"] = images[0].size results["ori_shape"] = images[0].size # Set initial values for default meta_keys results["pad_shape"] = images[0].size results["scale_factor"] = 1.0 return results def __repr__(self): """str: Return a string that describes the module.""" repr_str = self.__class__.__name__ repr_str += f"(to_float32={self.to_float32}, " repr_str += f"color_type='{self.color_type}')" return repr_str @PIPELINES.register_module() class LoadPointsFromMultiSweeps: """Load points from multiple sweeps. This is usually used for nuScenes dataset to utilize previous sweeps. Args: sweeps_num (int): Number of sweeps. Defaults to 10. load_dim (int): Dimension number of the loaded points. Defaults to 5. use_dim (list[int]): Which dimension to use. Defaults to [0, 1, 2, 4]. pad_empty_sweeps (bool): Whether to repeat keyframe when sweeps is empty. Defaults to False. remove_close (bool): Whether to remove close points. Defaults to False. test_mode (bool): If test_model=True used for testing, it will not randomly sample sweeps but select the nearest N frames. Defaults to False. """ def __init__( self, sweeps_num=10, load_dim=5, use_dim=[0, 1, 2, 4], pad_empty_sweeps=False, remove_close=False, test_mode=False, load_augmented=None, reduce_beams=None, ): self.load_dim = load_dim self.sweeps_num = sweeps_num if isinstance(use_dim, int): use_dim = list(range(use_dim)) self.use_dim = use_dim self.pad_empty_sweeps = pad_empty_sweeps self.remove_close = remove_close self.test_mode = test_mode self.load_augmented = load_augmented self.reduce_beams = reduce_beams def _load_points(self, lidar_path): """Private function to load point clouds data. Args: lidar_path (str): Filename of point clouds data. Returns: np.ndarray: An array containing point clouds data. """ mmcv.check_file_exist(lidar_path) if self.load_augmented: assert self.load_augmented in ["pointpainting", "mvp"] virtual = self.load_augmented == "mvp" points = load_augmented_point_cloud( lidar_path, virtual=virtual, reduce_beams=self.reduce_beams ) elif lidar_path.endswith(".npy"): points = np.load(lidar_path) else: points = np.fromfile(lidar_path, dtype=np.float32) return points def _remove_close(self, points, radius=1.0): """Removes point too close within a certain radius from origin. Args: points (np.ndarray | :obj:`BasePoints`): Sweep points. radius (float): Radius below which points are removed. Defaults to 1.0. Returns: np.ndarray: Points after removing. """ if isinstance(points, np.ndarray): points_numpy = points elif isinstance(points, BasePoints): points_numpy = points.tensor.numpy() else: raise NotImplementedError x_filt = np.abs(points_numpy[:, 0]) < radius y_filt = np.abs(points_numpy[:, 1]) < radius not_close = np.logical_not(np.logical_and(x_filt, y_filt)) return points[not_close] def __call__(self, results): """Call function to load multi-sweep point clouds from files. Args: results (dict): Result dict containing multi-sweep point cloud \ filenames. Returns: dict: The result dict containing the multi-sweep points data. \ Added key and value are described below. - points (np.ndarray | :obj:`BasePoints`): Multi-sweep point \ cloud arrays. """ points = results["points"] points = points[:, self.use_dim] points.tensor[:, 4] = 0 sweep_points_list = [points] ts = results["timestamp"] / 1e6 if self.pad_empty_sweeps and len(results["sweeps"]) == 0: for i in range(self.sweeps_num): if self.remove_close: sweep_points_list.append(self._remove_close(points)) else: sweep_points_list.append(points) else: if len(results["sweeps"]) <= self.sweeps_num: choices = np.arange(len(results["sweeps"])) elif self.test_mode: choices = np.arange(self.sweeps_num) else: # NOTE: seems possible to load frame -11? if not self.load_augmented: choices = np.random.choice( len(results["sweeps"]), self.sweeps_num, replace=False ) else: # don't allow to sample the earliest frame, match with Tianwei's implementation. choices = np.random.choice( len(results["sweeps"]) - 1, self.sweeps_num, replace=False ) for idx in choices: sweep = results["sweeps"][idx] points_sweep = self._load_points(sweep["data_path"]) points_sweep = np.copy(points_sweep).reshape(-1, self.load_dim) # TODO: make it more general if self.reduce_beams and self.reduce_beams < 32: points_sweep = reduce_LiDAR_beams(points_sweep, self.reduce_beams) if self.remove_close: points_sweep = self._remove_close(points_sweep) points_sweep = points_sweep[:, self.use_dim] sweep_ts = sweep["timestamp"] / 1e6 points_sweep[:, :3] = ( points_sweep[:, :3] @ sweep["sensor2lidar_rotation"].T ) points_sweep[:, :3] += sweep["sensor2lidar_translation"] points_sweep[:, 4] = ts - sweep_ts points_sweep = points.new_point(points_sweep) sweep_points_list.append(points_sweep) points = points.cat(sweep_points_list) results["points"] = points return results def __repr__(self): """str: Return a string that describes the module.""" return f"{self.__class__.__name__}(sweeps_num={self.sweeps_num})" @PIPELINES.register_module() class LoadBEVSegmentation: def __init__( self, dataset_root: str, xbound: Tuple[float, float, float], ybound: Tuple[float, float, float], classes: Tuple[str, ...], ) -> None: super().__init__() patch_h = ybound[1] - ybound[0] patch_w = xbound[1] - xbound[0] canvas_h = int(patch_h / ybound[2]) canvas_w = int(patch_w / xbound[2]) self.patch_size = (patch_h, patch_w) self.canvas_size = (canvas_h, canvas_w) self.classes = classes self.maps = {} for location in LOCATIONS: self.maps[location] = NuScenesMap(dataset_root, location) def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: lidar2point = data["lidar_aug_matrix"] point2lidar = np.linalg.inv(lidar2point) lidar2ego = data["lidar2ego"] ego2global = data["ego2global"] lidar2global = ego2global @ lidar2ego @ point2lidar map_pose = lidar2global[:2, 3] patch_box = (map_pose[0], map_pose[1], self.patch_size[0], self.patch_size[1]) rotation = lidar2global[:3, :3] v = np.dot(rotation, np.array([1, 0, 0])) yaw = np.arctan2(v[1], v[0]) patch_angle = yaw / np.pi * 180 mappings = {} for name in self.classes: if name == "drivable_area*": mappings[name] = ["road_segment", "lane"] elif name == "divider": mappings[name] = ["road_divider", "lane_divider"] else: mappings[name] = [name] layer_names = [] for name in mappings: layer_names.extend(mappings[name]) layer_names = list(set(layer_names)) # 修复:检查location键是否存在,如果不存在则使用默认值'boston-seaport' location = data.get("location", "boston-seaport") if location not in self.maps: location = "boston-seaport" # 默认使用boston-seaport masks = self.maps[location].get_map_mask( patch_box=patch_box, patch_angle=patch_angle, layer_names=layer_names, canvas_size=self.canvas_size, ) # masks = masks[:, ::-1, :].copy() masks = masks.transpose(0, 2, 1) masks = masks.astype(np.bool) num_classes = len(self.classes) labels = np.zeros((num_classes, *self.canvas_size), dtype=np.long) for k, name in enumerate(self.classes): for layer_name in mappings[name]: index = layer_names.index(layer_name) labels[k, masks[index]] = 1 data["gt_masks_bev"] = labels return data @PIPELINES.register_module() class LoadPointsFromFile: """Load Points From File. Load sunrgbd and scannet points from file. Args: coord_type (str): The type of coordinates of points cloud. Available options includes: - 'LIDAR': Points in LiDAR coordinates. - 'DEPTH': Points in depth coordinates, usually for indoor dataset. - 'CAMERA': Points in camera coordinates. load_dim (int): The dimension of the loaded points. Defaults to 6. use_dim (list[int]): Which dimensions of the points to be used. Defaults to [0, 1, 2]. For KITTI dataset, set use_dim=4 or use_dim=[0, 1, 2, 3] to use the intensity dimension. shift_height (bool): Whether to use shifted height. Defaults to False. use_color (bool): Whether to use color features. Defaults to False. """ def __init__( self, coord_type, load_dim=6, use_dim=[0, 1, 2], shift_height=False, use_color=False, load_augmented=None, reduce_beams=None, ): self.shift_height = shift_height self.use_color = use_color if isinstance(use_dim, int): use_dim = list(range(use_dim)) assert ( max(use_dim) < load_dim ), f"Expect all used dimensions < {load_dim}, got {use_dim}" assert coord_type in ["CAMERA", "LIDAR", "DEPTH"] self.coord_type = coord_type self.load_dim = load_dim self.use_dim = use_dim self.load_augmented = load_augmented self.reduce_beams = reduce_beams def _load_points(self, lidar_path): """Private function to load point clouds data. Args: lidar_path (str): Filename of point clouds data. Returns: np.ndarray: An array containing point clouds data. """ mmcv.check_file_exist(lidar_path) if self.load_augmented: assert self.load_augmented in ["pointpainting", "mvp"] virtual = self.load_augmented == "mvp" points = load_augmented_point_cloud( lidar_path, virtual=virtual, reduce_beams=self.reduce_beams ) elif lidar_path.endswith(".npy"): points = np.load(lidar_path) else: points = np.fromfile(lidar_path, dtype=np.float32) return points def __call__(self, results): """Call function to load points data from file. Args: results (dict): Result dict containing point clouds data. Returns: dict: The result dict containing the point clouds data. \ Added key and value are described below. - points (:obj:`BasePoints`): Point clouds data. """ lidar_path = results["lidar_path"] points = self._load_points(lidar_path) points = points.reshape(-1, self.load_dim) # TODO: make it more general if self.reduce_beams and self.reduce_beams < 32: points = reduce_LiDAR_beams(points, self.reduce_beams) points = points[:, self.use_dim] attribute_dims = None if self.shift_height: floor_height = np.percentile(points[:, 2], 0.99) height = points[:, 2] - floor_height points = np.concatenate( [points[:, :3], np.expand_dims(height, 1), points[:, 3:]], 1 ) attribute_dims = dict(height=3) if self.use_color: assert len(self.use_dim) >= 6 if attribute_dims is None: attribute_dims = dict() attribute_dims.update( dict( color=[ points.shape[1] - 3, points.shape[1] - 2, points.shape[1] - 1, ] ) ) points_class = get_points_type(self.coord_type) points = points_class( points, points_dim=points.shape[-1], attribute_dims=attribute_dims ) results["points"] = points return results @PIPELINES.register_module() class LoadAnnotations3D(LoadAnnotations): """Load Annotations3D. Load instance mask and semantic mask of points and encapsulate the items into related fields. Args: with_bbox_3d (bool, optional): Whether to load 3D boxes. Defaults to True. with_label_3d (bool, optional): Whether to load 3D labels. Defaults to True. with_attr_label (bool, optional): Whether to load attribute label. Defaults to False. with_bbox (bool, optional): Whether to load 2D boxes. Defaults to False. with_label (bool, optional): Whether to load 2D labels. Defaults to False. with_mask (bool, optional): Whether to load 2D instance masks. Defaults to False. with_seg (bool, optional): Whether to load 2D semantic masks. Defaults to False. with_bbox_depth (bool, optional): Whether to load 2.5D boxes. Defaults to False. poly2mask (bool, optional): Whether to convert polygon annotations to bitmasks. Defaults to True. """ def __init__( self, with_bbox_3d=True, with_label_3d=True, with_attr_label=False, with_bbox=False, with_label=False, with_mask=False, with_seg=False, with_bbox_depth=False, poly2mask=True, ): super().__init__( with_bbox, with_label, with_mask, with_seg, poly2mask, ) self.with_bbox_3d = with_bbox_3d self.with_bbox_depth = with_bbox_depth self.with_label_3d = with_label_3d self.with_attr_label = with_attr_label def _load_bboxes_3d(self, results): """Private function to load 3D bounding box annotations. Args: results (dict): Result dict from :obj:`mmdet3d.CustomDataset`. Returns: dict: The dict containing loaded 3D bounding box annotations. """ results["gt_bboxes_3d"] = results["ann_info"]["gt_bboxes_3d"] results["bbox3d_fields"].append("gt_bboxes_3d") return results def _load_bboxes_depth(self, results): """Private function to load 2.5D bounding box annotations. Args: results (dict): Result dict from :obj:`mmdet3d.CustomDataset`. Returns: dict: The dict containing loaded 2.5D bounding box annotations. """ results["centers2d"] = results["ann_info"]["centers2d"] results["depths"] = results["ann_info"]["depths"] return results def _load_labels_3d(self, results): """Private function to load label annotations. Args: results (dict): Result dict from :obj:`mmdet3d.CustomDataset`. Returns: dict: The dict containing loaded label annotations. """ results["gt_labels_3d"] = results["ann_info"]["gt_labels_3d"] return results def _load_attr_labels(self, results): """Private function to load label annotations. Args: results (dict): Result dict from :obj:`mmdet3d.CustomDataset`. Returns: dict: The dict containing loaded label annotations. """ results["attr_labels"] = results["ann_info"]["attr_labels"] return results def __call__(self, results): """Call function to load multiple types annotations. Args: results (dict): Result dict from :obj:`mmdet3d.CustomDataset`. Returns: dict: The dict containing loaded 3D bounding box, label, mask and semantic segmentation annotations. """ results = super().__call__(results) if self.with_bbox_3d: results = self._load_bboxes_3d(results) if results is None: return None if self.with_bbox_depth: results = self._load_bboxes_depth(results) if results is None: return None if self.with_label_3d: results = self._load_labels_3d(results) if self.with_attr_label: results = self._load_attr_labels(results) return results @PIPELINES.register_module() class NormalizePointFeatures: def __call__(self, results): points = results["points"] points.tensor[:, 3] = torch.tanh(points.tensor[:, 3]) results["points"] = points return results @PIPELINES.register_module() class LoadRadarPointsMultiSweeps(object): """Load radar points from multiple sweeps. This is usually used for nuScenes dataset to utilize previous sweeps. Args: sweeps_num (int): Number of sweeps. Defaults to 10. load_dim (int): Dimension number of the loaded points. Defaults to 5. use_dim (list[int]): Which dimension to use. Defaults to [0, 1, 2, 4]. file_client_args (dict): Config dict of file clients, refer to https://github.com/open-mmlab/mmcv/blob/master/mmcv/fileio/file_client.py for more details. Defaults to dict(backend='disk'). pad_empty_sweeps (bool): Whether to repeat keyframe when sweeps is empty. Defaults to False. remove_close (bool): Whether to remove close points. Defaults to False. test_mode (bool): If test_model=True used for testing, it will not randomly sample sweeps but select the nearest N frames. Defaults to False. """ def __init__(self, load_dim=18, use_dim=[0, 1, 2, 3, 4], sweeps_num=3, file_client_args=dict(backend='disk'), max_num=300, pc_range=[-51.2, -51.2, -5.0, 51.2, 51.2, 3.0], compensate_velocity=False, normalize_dims=[(3, 0, 50), (4, -100, 100), (5, -100, 100)], filtering='default', normalize=False, test_mode=False): self.load_dim = load_dim self.use_dim = use_dim self.sweeps_num = sweeps_num self.file_client_args = file_client_args.copy() self.file_client = None self.max_num = max_num self.test_mode = test_mode self.pc_range = pc_range self.compensate_velocity = compensate_velocity self.normalize_dims = normalize_dims self.filtering = filtering self.normalize = normalize self.encoding = [ (3, 'one-hot', 8), # dynprop (11, 'one-hot', 5), # ambig_state (14, 'one-hot', 18), # invalid_state (15, 'ordinal', 7), # pdh (0, 'nusc-filter', 1) # binary feature: 1 if nusc would have filtered it out ] def perform_encodings(self, points, encoding): for idx, encoding_type, encoding_dims in self.encoding: assert encoding_type in ['one-hot', 'ordinal', 'nusc-filter'] feat = points[:, idx] if encoding_type == 'one-hot': encoding = np.zeros((points.shape[0], encoding_dims)) encoding[np.arange(feat.shape[0]), np.rint(feat).astype(int)] = 1 if encoding_type == 'ordinal': encoding = np.zeros((points.shape[0], encoding_dims)) for i in range(encoding_dims): encoding[:, i] = (np.rint(feat) > i).astype(int) if encoding_type == 'nusc-filter': encoding = np.zeros((points.shape[0], encoding_dims)) mask1 = (points[:, 14] == 0) mask2 = (points[:, 3] < 7) mask3 = (points[:, 11] == 3) encoding[mask1 & mask2 & mask3, 0] = 1 points = np.concatenate([points, encoding], axis=1) return points def _load_points(self, pts_filename): """Private function to load point clouds data. Args: pts_filename (str): Filename of point clouds data. Returns: np.ndarray: An array containing point clouds data. [N, 18] """ invalid_states, dynprop_states, ambig_states = { 'default': ([0], range(7), [3]), 'none': (range(18), range(8), range(5)), }[self.filtering] radar_obj = RadarPointCloud.from_file( pts_filename, invalid_states, dynprop_states, ambig_states ) #[18, N] points = radar_obj.points return points.transpose().astype(np.float32) def _pad_or_drop(self, points): ''' points: [N, 18] ''' num_points = points.shape[0] if num_points == self.max_num: masks = np.ones((num_points, 1), dtype=points.dtype) return points, masks if num_points > self.max_num: points = np.random.permutation(points)[:self.max_num, :] masks = np.ones((self.max_num, 1), dtype=points.dtype) return points, masks if num_points < self.max_num: zeros = np.zeros((self.max_num - num_points, points.shape[1]), dtype=points.dtype) masks = np.ones((num_points, 1), dtype=points.dtype) points = np.concatenate((points, zeros), axis=0) masks = np.concatenate((masks, zeros.copy()[:, [0]]), axis=0) return points, masks def normalize_feats(self, points, normalize_dims): for dim, min, max in normalize_dims: points[:, dim] -= min points[:, dim] /= (max-min) return points def __call__(self, results): """Call function to load multi-sweep point clouds from files. Args: results (dict): Result dict containing multi-sweep point cloud \ filenames. Returns: dict: The result dict containing the multi-sweep points data. \ Added key and value are described below. - points (np.ndarray | :obj:`BasePoints`): Multi-sweep point \ cloud arrays. """ radars_dict = results['radar'] points_sweep_list = [] for key, sweeps in radars_dict.items(): if len(sweeps) < self.sweeps_num: idxes = list(range(len(sweeps))) else: idxes = list(range(self.sweeps_num)) ts = sweeps[0]['timestamp'] * 1e-6 for idx in idxes: sweep = sweeps[idx] points_sweep = self._load_points(sweep['data_path']) points_sweep = np.copy(points_sweep).reshape(-1, self.load_dim) timestamp = sweep['timestamp'] * 1e-6 time_diff = ts - timestamp time_diff = np.ones((points_sweep.shape[0], 1)) * time_diff # velocity compensated by the ego motion in sensor frame velo_comp = points_sweep[:, 8:10] velo_comp = np.concatenate( (velo_comp, np.zeros((velo_comp.shape[0], 1))), 1) velo_comp = velo_comp @ sweep['sensor2lidar_rotation'].T velo_comp = velo_comp[:, :2] # velocity in sensor frame velo = points_sweep[:, 6:8] velo = np.concatenate( (velo, np.zeros((velo.shape[0], 1))), 1) velo = velo @ sweep['sensor2lidar_rotation'].T velo = velo[:, :2] points_sweep[:, :3] = points_sweep[:, :3] @ sweep[ 'sensor2lidar_rotation'].T points_sweep[:, :3] += sweep['sensor2lidar_translation'] if self.compensate_velocity: points_sweep[:, :2] += velo_comp * time_diff points_sweep_ = np.concatenate( [points_sweep[:, :6], velo, velo_comp, points_sweep[:, 10:], time_diff], axis=1) # current format is x y z dyn_prop id rcs vx vy vx_comp vy_comp is_quality_valid ambig_state x_rms y_rms invalid_state pdh0 vx_rms vy_rms timestamp points_sweep_list.append(points_sweep_) points = np.concatenate(points_sweep_list, axis=0) points = self.perform_encodings(points, self.encoding) points = points[:, self.use_dim] if self.normalize: points = self.normalize_feats(points, self.normalize_dims) points = RadarPoints( points, points_dim=points.shape[-1], attribute_dims=None ) results['radar'] = points return results def __repr__(self): """str: Return a string that describes the module.""" return f'{self.__class__.__name__}(sweeps_num={self.sweeps_num})'