bev-project/mmdet3d/datasets/pipelines/loading.py

798 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from typing import Any, Dict, Tuple
import mmcv
import numpy as np
from mmdet3d.core.points import RadarPoints
from nuscenes.utils.data_classes import RadarPointCloud
from nuscenes.map_expansion.map_api import NuScenesMap
from nuscenes.map_expansion.map_api import locations as LOCATIONS
from PIL import Image
from mmdet3d.core.points import BasePoints, get_points_type
from mmdet.datasets.builder import PIPELINES
from mmdet.datasets.pipelines import LoadAnnotations
import torch
from .loading_utils import load_augmented_point_cloud, reduce_LiDAR_beams
@PIPELINES.register_module()
class LoadMultiViewImageFromFiles:
"""Load multi channel images from a list of separate channel files.
Expects results['image_paths'] to be a list of filenames.
Args:
to_float32 (bool): Whether to convert the img to float32.
Defaults to False.
color_type (str): Color type of the file. Defaults to 'unchanged'.
"""
def __init__(self, to_float32=False, color_type="unchanged"):
self.to_float32 = to_float32
self.color_type = color_type
def __call__(self, results):
"""Call function to load multi-view image from files.
Args:
results (dict): Result dict containing multi-view image filenames.
Returns:
dict: The result dict containing the multi-view image data. \
Added keys and values are described below.
- filename (str): Multi-view image filenames.
- img (np.ndarray): Multi-view image arrays.
- img_shape (tuple[int]): Shape of multi-view image arrays.
- ori_shape (tuple[int]): Shape of original image arrays.
- pad_shape (tuple[int]): Shape of padded image arrays.
- scale_factor (float): Scale factor.
- img_norm_cfg (dict): Normalization configuration of images.
"""
filename = results["image_paths"]
# img is of shape (h, w, c, num_views)
# modified for waymo
images = []
h, w = 0, 0
for name in filename:
images.append(Image.open(name))
#TODO: consider image padding in waymo
results["filename"] = filename
# unravel to list, see `DefaultFormatBundle` in formating.py
# which will transpose each image separately and then stack into array
results["img"] = images
# [1600, 900]
results["img_shape"] = images[0].size
results["ori_shape"] = images[0].size
# Set initial values for default meta_keys
results["pad_shape"] = images[0].size
results["scale_factor"] = 1.0
return results
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(to_float32={self.to_float32}, "
repr_str += f"color_type='{self.color_type}')"
return repr_str
@PIPELINES.register_module()
class LoadPointsFromMultiSweeps:
"""Load points from multiple sweeps.
This is usually used for nuScenes dataset to utilize previous sweeps.
Args:
sweeps_num (int): Number of sweeps. Defaults to 10.
load_dim (int): Dimension number of the loaded points. Defaults to 5.
use_dim (list[int]): Which dimension to use. Defaults to [0, 1, 2, 4].
pad_empty_sweeps (bool): Whether to repeat keyframe when
sweeps is empty. Defaults to False.
remove_close (bool): Whether to remove close points.
Defaults to False.
test_mode (bool): If test_model=True used for testing, it will not
randomly sample sweeps but select the nearest N frames.
Defaults to False.
"""
def __init__(
self,
sweeps_num=10,
load_dim=5,
use_dim=[0, 1, 2, 4],
pad_empty_sweeps=False,
remove_close=False,
test_mode=False,
load_augmented=None,
reduce_beams=None,
):
self.load_dim = load_dim
self.sweeps_num = sweeps_num
if isinstance(use_dim, int):
use_dim = list(range(use_dim))
self.use_dim = use_dim
self.pad_empty_sweeps = pad_empty_sweeps
self.remove_close = remove_close
self.test_mode = test_mode
self.load_augmented = load_augmented
self.reduce_beams = reduce_beams
def _load_points(self, lidar_path):
"""Private function to load point clouds data.
Args:
lidar_path (str): Filename of point clouds data.
Returns:
np.ndarray: An array containing point clouds data.
"""
mmcv.check_file_exist(lidar_path)
if self.load_augmented:
assert self.load_augmented in ["pointpainting", "mvp"]
virtual = self.load_augmented == "mvp"
points = load_augmented_point_cloud(
lidar_path, virtual=virtual, reduce_beams=self.reduce_beams
)
elif lidar_path.endswith(".npy"):
points = np.load(lidar_path)
else:
points = np.fromfile(lidar_path, dtype=np.float32)
return points
def _remove_close(self, points, radius=1.0):
"""Removes point too close within a certain radius from origin.
Args:
points (np.ndarray | :obj:`BasePoints`): Sweep points.
radius (float): Radius below which points are removed.
Defaults to 1.0.
Returns:
np.ndarray: Points after removing.
"""
if isinstance(points, np.ndarray):
points_numpy = points
elif isinstance(points, BasePoints):
points_numpy = points.tensor.numpy()
else:
raise NotImplementedError
x_filt = np.abs(points_numpy[:, 0]) < radius
y_filt = np.abs(points_numpy[:, 1]) < radius
not_close = np.logical_not(np.logical_and(x_filt, y_filt))
return points[not_close]
def __call__(self, results):
"""Call function to load multi-sweep point clouds from files.
Args:
results (dict): Result dict containing multi-sweep point cloud \
filenames.
Returns:
dict: The result dict containing the multi-sweep points data. \
Added key and value are described below.
- points (np.ndarray | :obj:`BasePoints`): Multi-sweep point \
cloud arrays.
"""
points = results["points"]
points = points[:, self.use_dim]
points.tensor[:, 4] = 0
sweep_points_list = [points]
ts = results["timestamp"] / 1e6
if self.pad_empty_sweeps and len(results["sweeps"]) == 0:
for i in range(self.sweeps_num):
if self.remove_close:
sweep_points_list.append(self._remove_close(points))
else:
sweep_points_list.append(points)
else:
if len(results["sweeps"]) <= self.sweeps_num:
choices = np.arange(len(results["sweeps"]))
elif self.test_mode:
choices = np.arange(self.sweeps_num)
else:
# NOTE: seems possible to load frame -11?
if not self.load_augmented:
choices = np.random.choice(
len(results["sweeps"]), self.sweeps_num, replace=False
)
else:
# don't allow to sample the earliest frame, match with Tianwei's implementation.
choices = np.random.choice(
len(results["sweeps"]) - 1, self.sweeps_num, replace=False
)
for idx in choices:
sweep = results["sweeps"][idx]
points_sweep = self._load_points(sweep["data_path"])
points_sweep = np.copy(points_sweep).reshape(-1, self.load_dim)
# TODO: make it more general
if self.reduce_beams and self.reduce_beams < 32:
points_sweep = reduce_LiDAR_beams(points_sweep, self.reduce_beams)
if self.remove_close:
points_sweep = self._remove_close(points_sweep)
points_sweep = points_sweep[:, self.use_dim]
sweep_ts = sweep["timestamp"] / 1e6
points_sweep[:, :3] = (
points_sweep[:, :3] @ sweep["sensor2lidar_rotation"].T
)
points_sweep[:, :3] += sweep["sensor2lidar_translation"]
points_sweep[:, 4] = ts - sweep_ts
points_sweep = points.new_point(points_sweep)
sweep_points_list.append(points_sweep)
points = points.cat(sweep_points_list)
results["points"] = points
return results
def __repr__(self):
"""str: Return a string that describes the module."""
return f"{self.__class__.__name__}(sweeps_num={self.sweeps_num})"
@PIPELINES.register_module()
class LoadBEVSegmentation:
def __init__(
self,
dataset_root: str,
xbound: Tuple[float, float, float],
ybound: Tuple[float, float, float],
classes: Tuple[str, ...],
) -> None:
super().__init__()
patch_h = ybound[1] - ybound[0]
patch_w = xbound[1] - xbound[0]
canvas_h = int(patch_h / ybound[2])
canvas_w = int(patch_w / xbound[2])
self.patch_size = (patch_h, patch_w)
self.canvas_size = (canvas_h, canvas_w)
self.classes = classes
self.maps = {}
for location in LOCATIONS:
self.maps[location] = NuScenesMap(dataset_root, location)
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
lidar2point = data["lidar_aug_matrix"]
point2lidar = np.linalg.inv(lidar2point)
lidar2ego = data["lidar2ego"]
ego2global = data["ego2global"]
lidar2global = ego2global @ lidar2ego @ point2lidar
map_pose = lidar2global[:2, 3]
patch_box = (map_pose[0], map_pose[1], self.patch_size[0], self.patch_size[1])
rotation = lidar2global[:3, :3]
v = np.dot(rotation, np.array([1, 0, 0]))
yaw = np.arctan2(v[1], v[0])
patch_angle = yaw / np.pi * 180
mappings = {}
for name in self.classes:
if name == "drivable_area*":
mappings[name] = ["road_segment", "lane"]
elif name == "divider":
mappings[name] = ["road_divider", "lane_divider"]
else:
mappings[name] = [name]
layer_names = []
for name in mappings:
layer_names.extend(mappings[name])
layer_names = list(set(layer_names))
# 修复检查location键是否存在如果不存在则使用默认值'boston-seaport'
location = data.get("location", "boston-seaport")
if location not in self.maps:
location = "boston-seaport" # 默认使用boston-seaport
masks = self.maps[location].get_map_mask(
patch_box=patch_box,
patch_angle=patch_angle,
layer_names=layer_names,
canvas_size=self.canvas_size,
)
# masks = masks[:, ::-1, :].copy()
masks = masks.transpose(0, 2, 1)
masks = masks.astype(np.bool)
num_classes = len(self.classes)
labels = np.zeros((num_classes, *self.canvas_size), dtype=np.long)
for k, name in enumerate(self.classes):
for layer_name in mappings[name]:
index = layer_names.index(layer_name)
labels[k, masks[index]] = 1
data["gt_masks_bev"] = labels
return data
@PIPELINES.register_module()
class LoadPointsFromFile:
"""Load Points From File.
Load sunrgbd and scannet points from file.
Args:
coord_type (str): The type of coordinates of points cloud.
Available options includes:
- 'LIDAR': Points in LiDAR coordinates.
- 'DEPTH': Points in depth coordinates, usually for indoor dataset.
- 'CAMERA': Points in camera coordinates.
load_dim (int): The dimension of the loaded points.
Defaults to 6.
use_dim (list[int]): Which dimensions of the points to be used.
Defaults to [0, 1, 2]. For KITTI dataset, set use_dim=4
or use_dim=[0, 1, 2, 3] to use the intensity dimension.
shift_height (bool): Whether to use shifted height. Defaults to False.
use_color (bool): Whether to use color features. Defaults to False.
"""
def __init__(
self,
coord_type,
load_dim=6,
use_dim=[0, 1, 2],
shift_height=False,
use_color=False,
load_augmented=None,
reduce_beams=None,
):
self.shift_height = shift_height
self.use_color = use_color
if isinstance(use_dim, int):
use_dim = list(range(use_dim))
assert (
max(use_dim) < load_dim
), f"Expect all used dimensions < {load_dim}, got {use_dim}"
assert coord_type in ["CAMERA", "LIDAR", "DEPTH"]
self.coord_type = coord_type
self.load_dim = load_dim
self.use_dim = use_dim
self.load_augmented = load_augmented
self.reduce_beams = reduce_beams
def _load_points(self, lidar_path):
"""Private function to load point clouds data.
Args:
lidar_path (str): Filename of point clouds data.
Returns:
np.ndarray: An array containing point clouds data.
"""
mmcv.check_file_exist(lidar_path)
if self.load_augmented:
assert self.load_augmented in ["pointpainting", "mvp"]
virtual = self.load_augmented == "mvp"
points = load_augmented_point_cloud(
lidar_path, virtual=virtual, reduce_beams=self.reduce_beams
)
elif lidar_path.endswith(".npy"):
points = np.load(lidar_path)
else:
points = np.fromfile(lidar_path, dtype=np.float32)
return points
def __call__(self, results):
"""Call function to load points data from file.
Args:
results (dict): Result dict containing point clouds data.
Returns:
dict: The result dict containing the point clouds data. \
Added key and value are described below.
- points (:obj:`BasePoints`): Point clouds data.
"""
lidar_path = results["lidar_path"]
points = self._load_points(lidar_path)
points = points.reshape(-1, self.load_dim)
# TODO: make it more general
if self.reduce_beams and self.reduce_beams < 32:
points = reduce_LiDAR_beams(points, self.reduce_beams)
points = points[:, self.use_dim]
attribute_dims = None
if self.shift_height:
floor_height = np.percentile(points[:, 2], 0.99)
height = points[:, 2] - floor_height
points = np.concatenate(
[points[:, :3], np.expand_dims(height, 1), points[:, 3:]], 1
)
attribute_dims = dict(height=3)
if self.use_color:
assert len(self.use_dim) >= 6
if attribute_dims is None:
attribute_dims = dict()
attribute_dims.update(
dict(
color=[
points.shape[1] - 3,
points.shape[1] - 2,
points.shape[1] - 1,
]
)
)
points_class = get_points_type(self.coord_type)
points = points_class(
points, points_dim=points.shape[-1], attribute_dims=attribute_dims
)
results["points"] = points
return results
@PIPELINES.register_module()
class LoadAnnotations3D(LoadAnnotations):
"""Load Annotations3D.
Load instance mask and semantic mask of points and
encapsulate the items into related fields.
Args:
with_bbox_3d (bool, optional): Whether to load 3D boxes.
Defaults to True.
with_label_3d (bool, optional): Whether to load 3D labels.
Defaults to True.
with_attr_label (bool, optional): Whether to load attribute label.
Defaults to False.
with_bbox (bool, optional): Whether to load 2D boxes.
Defaults to False.
with_label (bool, optional): Whether to load 2D labels.
Defaults to False.
with_mask (bool, optional): Whether to load 2D instance masks.
Defaults to False.
with_seg (bool, optional): Whether to load 2D semantic masks.
Defaults to False.
with_bbox_depth (bool, optional): Whether to load 2.5D boxes.
Defaults to False.
poly2mask (bool, optional): Whether to convert polygon annotations
to bitmasks. Defaults to True.
"""
def __init__(
self,
with_bbox_3d=True,
with_label_3d=True,
with_attr_label=False,
with_bbox=False,
with_label=False,
with_mask=False,
with_seg=False,
with_bbox_depth=False,
poly2mask=True,
):
super().__init__(
with_bbox,
with_label,
with_mask,
with_seg,
poly2mask,
)
self.with_bbox_3d = with_bbox_3d
self.with_bbox_depth = with_bbox_depth
self.with_label_3d = with_label_3d
self.with_attr_label = with_attr_label
def _load_bboxes_3d(self, results):
"""Private function to load 3D bounding box annotations.
Args:
results (dict): Result dict from :obj:`mmdet3d.CustomDataset`.
Returns:
dict: The dict containing loaded 3D bounding box annotations.
"""
results["gt_bboxes_3d"] = results["ann_info"]["gt_bboxes_3d"]
results["bbox3d_fields"].append("gt_bboxes_3d")
return results
def _load_bboxes_depth(self, results):
"""Private function to load 2.5D bounding box annotations.
Args:
results (dict): Result dict from :obj:`mmdet3d.CustomDataset`.
Returns:
dict: The dict containing loaded 2.5D bounding box annotations.
"""
results["centers2d"] = results["ann_info"]["centers2d"]
results["depths"] = results["ann_info"]["depths"]
return results
def _load_labels_3d(self, results):
"""Private function to load label annotations.
Args:
results (dict): Result dict from :obj:`mmdet3d.CustomDataset`.
Returns:
dict: The dict containing loaded label annotations.
"""
results["gt_labels_3d"] = results["ann_info"]["gt_labels_3d"]
return results
def _load_attr_labels(self, results):
"""Private function to load label annotations.
Args:
results (dict): Result dict from :obj:`mmdet3d.CustomDataset`.
Returns:
dict: The dict containing loaded label annotations.
"""
results["attr_labels"] = results["ann_info"]["attr_labels"]
return results
def __call__(self, results):
"""Call function to load multiple types annotations.
Args:
results (dict): Result dict from :obj:`mmdet3d.CustomDataset`.
Returns:
dict: The dict containing loaded 3D bounding box, label, mask and
semantic segmentation annotations.
"""
results = super().__call__(results)
if self.with_bbox_3d:
results = self._load_bboxes_3d(results)
if results is None:
return None
if self.with_bbox_depth:
results = self._load_bboxes_depth(results)
if results is None:
return None
if self.with_label_3d:
results = self._load_labels_3d(results)
if self.with_attr_label:
results = self._load_attr_labels(results)
return results
@PIPELINES.register_module()
class NormalizePointFeatures:
def __call__(self, results):
points = results["points"]
points.tensor[:, 3] = torch.tanh(points.tensor[:, 3])
results["points"] = points
return results
@PIPELINES.register_module()
class LoadRadarPointsMultiSweeps(object):
"""Load radar points from multiple sweeps.
This is usually used for nuScenes dataset to utilize previous sweeps.
Args:
sweeps_num (int): Number of sweeps. Defaults to 10.
load_dim (int): Dimension number of the loaded points. Defaults to 5.
use_dim (list[int]): Which dimension to use. Defaults to [0, 1, 2, 4].
file_client_args (dict): Config dict of file clients, refer to
https://github.com/open-mmlab/mmcv/blob/master/mmcv/fileio/file_client.py
for more details. Defaults to dict(backend='disk').
pad_empty_sweeps (bool): Whether to repeat keyframe when
sweeps is empty. Defaults to False.
remove_close (bool): Whether to remove close points.
Defaults to False.
test_mode (bool): If test_model=True used for testing, it will not
randomly sample sweeps but select the nearest N frames.
Defaults to False.
"""
def __init__(self,
load_dim=18,
use_dim=[0, 1, 2, 3, 4],
sweeps_num=3,
file_client_args=dict(backend='disk'),
max_num=300,
pc_range=[-51.2, -51.2, -5.0, 51.2, 51.2, 3.0],
compensate_velocity=False,
normalize_dims=[(3, 0, 50), (4, -100, 100), (5, -100, 100)],
filtering='default',
normalize=False,
test_mode=False):
self.load_dim = load_dim
self.use_dim = use_dim
self.sweeps_num = sweeps_num
self.file_client_args = file_client_args.copy()
self.file_client = None
self.max_num = max_num
self.test_mode = test_mode
self.pc_range = pc_range
self.compensate_velocity = compensate_velocity
self.normalize_dims = normalize_dims
self.filtering = filtering
self.normalize = normalize
self.encoding = [
(3, 'one-hot', 8), # dynprop
(11, 'one-hot', 5), # ambig_state
(14, 'one-hot', 18), # invalid_state
(15, 'ordinal', 7), # pdh
(0, 'nusc-filter', 1) # binary feature: 1 if nusc would have filtered it out
]
def perform_encodings(self, points, encoding):
for idx, encoding_type, encoding_dims in self.encoding:
assert encoding_type in ['one-hot', 'ordinal', 'nusc-filter']
feat = points[:, idx]
if encoding_type == 'one-hot':
encoding = np.zeros((points.shape[0], encoding_dims))
encoding[np.arange(feat.shape[0]), np.rint(feat).astype(int)] = 1
if encoding_type == 'ordinal':
encoding = np.zeros((points.shape[0], encoding_dims))
for i in range(encoding_dims):
encoding[:, i] = (np.rint(feat) > i).astype(int)
if encoding_type == 'nusc-filter':
encoding = np.zeros((points.shape[0], encoding_dims))
mask1 = (points[:, 14] == 0)
mask2 = (points[:, 3] < 7)
mask3 = (points[:, 11] == 3)
encoding[mask1 & mask2 & mask3, 0] = 1
points = np.concatenate([points, encoding], axis=1)
return points
def _load_points(self, pts_filename):
"""Private function to load point clouds data.
Args:
pts_filename (str): Filename of point clouds data.
Returns:
np.ndarray: An array containing point clouds data.
[N, 18]
"""
invalid_states, dynprop_states, ambig_states = {
'default': ([0], range(7), [3]),
'none': (range(18), range(8), range(5)),
}[self.filtering]
radar_obj = RadarPointCloud.from_file(
pts_filename,
invalid_states, dynprop_states, ambig_states
)
#[18, N]
points = radar_obj.points
return points.transpose().astype(np.float32)
def _pad_or_drop(self, points):
'''
points: [N, 18]
'''
num_points = points.shape[0]
if num_points == self.max_num:
masks = np.ones((num_points, 1),
dtype=points.dtype)
return points, masks
if num_points > self.max_num:
points = np.random.permutation(points)[:self.max_num, :]
masks = np.ones((self.max_num, 1),
dtype=points.dtype)
return points, masks
if num_points < self.max_num:
zeros = np.zeros((self.max_num - num_points, points.shape[1]),
dtype=points.dtype)
masks = np.ones((num_points, 1),
dtype=points.dtype)
points = np.concatenate((points, zeros), axis=0)
masks = np.concatenate((masks, zeros.copy()[:, [0]]), axis=0)
return points, masks
def normalize_feats(self, points, normalize_dims):
for dim, min, max in normalize_dims:
points[:, dim] -= min
points[:, dim] /= (max-min)
return points
def __call__(self, results):
"""Call function to load multi-sweep point clouds from files.
Args:
results (dict): Result dict containing multi-sweep point cloud \
filenames.
Returns:
dict: The result dict containing the multi-sweep points data. \
Added key and value are described below.
- points (np.ndarray | :obj:`BasePoints`): Multi-sweep point \
cloud arrays.
"""
radars_dict = results['radar']
points_sweep_list = []
for key, sweeps in radars_dict.items():
if len(sweeps) < self.sweeps_num:
idxes = list(range(len(sweeps)))
else:
idxes = list(range(self.sweeps_num))
ts = sweeps[0]['timestamp'] * 1e-6
for idx in idxes:
sweep = sweeps[idx]
points_sweep = self._load_points(sweep['data_path'])
points_sweep = np.copy(points_sweep).reshape(-1, self.load_dim)
timestamp = sweep['timestamp'] * 1e-6
time_diff = ts - timestamp
time_diff = np.ones((points_sweep.shape[0], 1)) * time_diff
# velocity compensated by the ego motion in sensor frame
velo_comp = points_sweep[:, 8:10]
velo_comp = np.concatenate(
(velo_comp, np.zeros((velo_comp.shape[0], 1))), 1)
velo_comp = velo_comp @ sweep['sensor2lidar_rotation'].T
velo_comp = velo_comp[:, :2]
# velocity in sensor frame
velo = points_sweep[:, 6:8]
velo = np.concatenate(
(velo, np.zeros((velo.shape[0], 1))), 1)
velo = velo @ sweep['sensor2lidar_rotation'].T
velo = velo[:, :2]
points_sweep[:, :3] = points_sweep[:, :3] @ sweep[
'sensor2lidar_rotation'].T
points_sweep[:, :3] += sweep['sensor2lidar_translation']
if self.compensate_velocity:
points_sweep[:, :2] += velo_comp * time_diff
points_sweep_ = np.concatenate(
[points_sweep[:, :6], velo,
velo_comp, points_sweep[:, 10:],
time_diff], axis=1)
# current format is x y z dyn_prop id rcs vx vy vx_comp vy_comp is_quality_valid ambig_state x_rms y_rms invalid_state pdh0 vx_rms vy_rms timestamp
points_sweep_list.append(points_sweep_)
points = np.concatenate(points_sweep_list, axis=0)
points = self.perform_encodings(points, self.encoding)
points = points[:, self.use_dim]
if self.normalize:
points = self.normalize_feats(points, self.normalize_dims)
points = RadarPoints(
points, points_dim=points.shape[-1], attribute_dims=None
)
results['radar'] = points
return results
def __repr__(self):
"""str: Return a string that describes the module."""
return f'{self.__class__.__name__}(sweeps_num={self.sweeps_num})'