Merge pull request #1 from metadriverse/initial-commit

Initial commit
This commit is contained in:
Quanyi Li
2023-05-08 17:38:58 +01:00
committed by GitHub
61 changed files with 3189 additions and 0 deletions

5
.github/.codecov.yml vendored Normal file
View File

@@ -0,0 +1,5 @@
comment: false
coverage:
status:
patch: off
project: off

9
.github/pull_request_template.md vendored Normal file
View File

@@ -0,0 +1,9 @@
## What changes do you make in this PR?
* Please describe why you create this PR
## Checklist
* [ ] I have merged the latest main branch into current branch.
* [ ] I have run `bash scripts/format.sh` before merging.
* Please use "squash and merge" mode.

50
.github/workflows/main.yml vendored Normal file
View File

@@ -0,0 +1,50 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions
name: test
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
code_style:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Check code style
run: |
pip install "yapf==0.30.0"
bash format.sh --test
test_functionality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Blackbox tests
run: |
pip install cython
pip install numpy
pip install -e .
pip install pytest
pip install pytest-cov
pip install ray
git clone https://github.com/metadriverse/metadrive.git
cd metadrive
pip install -e .
cd ../
cd scenarionet/
pytest --cov=./ --cov-config=.coveragerc --cov-report=xml -sv tests

21
.gitignore vendored Normal file
View File

@@ -0,0 +1,21 @@
/scenarionet.egg-info/
*.pyc
*.egg-info
/.idea/
*.glf
*.rar
/docs/build/
/docs/.idea/
/build/
/dist/
/documentation/build/
dataset/*
**/combine/
**.json
**.log
**/tmp/**
**/failed_scenarios/
**/passed_scenarios/
**/waymo_origin
/dataset/

7
.style.yapf Normal file
View File

@@ -0,0 +1,7 @@
[style]
based_on_style=pep8
dedent_closing_brackets=True
split_before_first_argument=True
allow_split_before_dict_value=False
join_multiple_lines=False
column_limit=120

20
format.sh Executable file
View File

@@ -0,0 +1,20 @@
#!/usr/bin/env bash
# Usage: at the root dir >> bash scripts/format.sh
# Check yapf version.
ver=$(yapf --version)
if ! echo $ver | grep -q 0.30.0; then
echo "Wrong YAPF version installed: 0.30.0 is required, not $ver. Please install via `pip install yapf==0.30.0`"
exit 1
fi
yapf --in-place --recursive -p --verbose --style .style.yapf scenarionet/
if [[ "$1" == '--test' ]]; then # Only for CI usage, user should not use --test flag.
if ! git diff --quiet &>/dev/null; then
echo '*** You have not formatted your code! Please run [bash format.sh] at root directory before commit! Thanks! ***'
exit 1
else
echo "Code style test passed!"
fi
fi

5
scenarionet/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
import os
SCENARIONET_PACKAGE_PATH = os.path.dirname(os.path.abspath(__file__))
SCENARIONET_REPO_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SCENARIONET_DATASET_PATH = os.path.join(SCENARIONET_REPO_PATH, "dataset")

View File

View File

@@ -0,0 +1,60 @@
from functools import partial
from metadrive.scenario.scenario_description import ScenarioDescription as SD
class ScenarioFilter:
GREATER = "greater"
SMALLER = "smaller"
@staticmethod
def sdc_moving_dist(metadata, target_dist, condition=GREATER):
"""
This function filters the scenario based on SDC information.
"""
assert condition in [ScenarioFilter.GREATER, ScenarioFilter.SMALLER], "Wrong condition type"
sdc_info = metadata[SD.SUMMARY.OBJECT_SUMMARY][metadata[SD.SDC_ID]]
moving_dist = sdc_info[SD.SUMMARY.MOVING_DIST]
if moving_dist > target_dist and condition == ScenarioFilter.GREATER:
return True
if moving_dist < target_dist and condition == ScenarioFilter.SMALLER:
return True
return False
@staticmethod
def object_number(metadata, number_threshold, object_type=None, condition=SMALLER):
"""
Return True if the scenario satisfying the object number condition
:param metadata: metadata in each scenario
:param number_threshold: number of objects threshold
:param object_type: MetaDriveType.VEHICLE or other object type. If none, calculate number for all object types
:param condition: SMALLER or GREATER
:return: boolean
"""
assert condition in [ScenarioFilter.GREATER, ScenarioFilter.SMALLER], "Wrong condition type"
if object_type is not None:
num = metadata[SD.SUMMARY.NUMBER_SUMMARY][SD.SUMMARY.NUM_OBJECTS_EACH_TYPE].get(object_type, 0)
else:
num = metadata[SD.SUMMARY.NUMBER_SUMMARY][SD.SUMMARY.NUM_OBJECTS]
if num > number_threshold and condition == ScenarioFilter.GREATER:
return True
if num < number_threshold and condition == ScenarioFilter.SMALLER:
return True
return False
@staticmethod
def has_traffic_light(metadata):
return metadata[SD.SUMMARY.NUMBER_SUMMARY][SD.SUMMARY.NUM_TRAFFIC_LIGHTS] > 0
@staticmethod
def make(func, **kwargs):
"""
A wrapper for partial() for filling some parameters
:param func: func in this class
:param kwargs: kwargs for filter
:return: func taking only metadat as input
"""
assert "metadata" not in kwargs, "You should only fill conditions, metadata will be fill automatically"
if "condition" in kwargs:
assert kwargs["condition"] in [ScenarioFilter.GREATER, ScenarioFilter.SMALLER], "Wrong condition type"
return partial(func, **kwargs)

View File

@@ -0,0 +1,103 @@
import copy
import logging
import os
import os.path as osp
import pickle
import shutil
from typing import Callable, List
from metadrive.scenario.scenario_description import ScenarioDescription
from scenarionet.common_utils import save_summary_anda_mapping
logger = logging.getLogger(__name__)
def try_generating_summary(file_folder):
# Create a fake one
files = os.listdir(file_folder)
summary = {}
for file in files:
if ScenarioDescription.is_scenario_file(file):
with open(osp.join(file_folder, file), "rb+") as f:
scenario = pickle.load(f)
summary[file] = copy.deepcopy(scenario[ScenarioDescription.METADATA])
return summary
def combine_multiple_dataset(
output_path, *dataset_paths, force_overwrite=False, try_generate_missing_file=True, filters: List[Callable] = None
):
"""
Combine multiple datasets. Each dataset should have a dataset_summary.pkl
:param output_path: The path to store the output dataset
:param force_overwrite: If True, overwrite the output_path even if it exists
:param try_generate_missing_file: If dataset_summary.pkl and mapping.pkl are missing, whether to try generating them
:param dataset_paths: Path of each dataset
:param filters: a set of filters to choose which scenario to be selected and added into this combined dataset
:return: summary, mapping
"""
filters = filters or []
output_abs_path = osp.abspath(output_path)
if os.path.exists(output_abs_path):
if not force_overwrite:
raise FileExistsError("Output path already exists!")
else:
shutil.rmtree(output_abs_path)
os.makedirs(output_abs_path, exist_ok=False)
summaries = {}
mappings = {}
# collect
for dataset_path in dataset_paths:
abs_dir_path = osp.abspath(dataset_path)
# summary
assert osp.exists(abs_dir_path), "Wrong dataset path. Can not find dataset at: {}".format(abs_dir_path)
if not osp.exists(osp.join(abs_dir_path, ScenarioDescription.DATASET.SUMMARY_FILE)):
if try_generate_missing_file:
# TODO add test for 1. number dataset 2. missing summary dataset 3. missing mapping dataset
summary = try_generating_summary(abs_dir_path)
else:
raise FileNotFoundError("Can not find summary file for dataset: {}".format(abs_dir_path))
else:
with open(osp.join(abs_dir_path, ScenarioDescription.DATASET.SUMMARY_FILE), "rb+") as f:
summary = pickle.load(f)
intersect = set(summaries.keys()).intersection(set(summary.keys()))
if len(intersect) > 0:
existing = []
for v in list(intersect):
existing.append(mappings[v])
logging.warning("Repeat scenarios: {} in : {}. Existing: {}".format(intersect, abs_dir_path, existing))
summaries.update(summary)
# mapping
if not osp.exists(osp.join(abs_dir_path, ScenarioDescription.DATASET.MAPPING_FILE)):
if try_generate_missing_file:
mapping = {k: "" for k in summary}
else:
raise FileNotFoundError("Can not find mapping file for dataset: {}".format(abs_dir_path))
else:
with open(osp.join(abs_dir_path, ScenarioDescription.DATASET.MAPPING_FILE), "rb+") as f:
mapping = pickle.load(f)
new_mapping = {}
for file, rel_path in mapping.items():
# mapping to real file path
new_mapping[file] = os.path.relpath(osp.join(abs_dir_path, rel_path), output_abs_path)
mappings.update(new_mapping)
# apply filter stage
file_to_pop = []
for file_name, metadata, in summaries.items():
if not all([fil(metadata) for fil in filters]):
file_to_pop.append(file_name)
for file in file_to_pop:
summaries.pop(file)
mappings.pop(file)
summary_file = osp.join(output_abs_path, ScenarioDescription.DATASET.SUMMARY_FILE)
mapping_file = osp.join(output_abs_path, ScenarioDescription.DATASET.MAPPING_FILE)
save_summary_anda_mapping(summary_file, mapping_file, summaries, mappings)
return summaries, mappings

View File

@@ -0,0 +1,92 @@
import os.path
import pickle
import numpy as np
from metadrive.scenario import utils as sd_utils
def recursive_equal(data1, data2, need_assert=False):
from metadrive.utils.config import Config
if isinstance(data1, Config):
data1 = data1.get_dict()
if isinstance(data2, Config):
data2 = data2.get_dict()
if isinstance(data1, np.ndarray):
tmp = np.asarray(data2)
return np.all(data1 == tmp)
if isinstance(data2, np.ndarray):
tmp = np.asarray(data1)
return np.all(tmp == data2)
if isinstance(data1, dict):
is_ins = isinstance(data2, dict)
key_right = set(data1.keys()) == set(data2.keys())
if need_assert:
assert is_ins and key_right, (data1.keys(), data2.keys())
if not (is_ins and key_right):
return False
ret = []
for k in data1:
ret.append(recursive_equal(data1[k], data2[k], need_assert=need_assert))
return all(ret)
elif isinstance(data1, (list, tuple)):
len_right = len(data1) == len(data2)
is_ins = isinstance(data2, (list, tuple))
if need_assert:
assert len_right and is_ins, (len(data1), len(data2), data1, data2)
if not (is_ins and len_right):
return False
ret = []
for i in range(len(data1)):
ret.append(recursive_equal(data1[i], data2[i], need_assert=need_assert))
return all(ret)
elif isinstance(data1, np.ndarray):
ret = np.isclose(data1, data2).all()
if need_assert:
assert ret, (type(data1), type(data2), data1, data2)
return ret
else:
ret = data1 == data2
if need_assert:
assert ret, (type(data1), type(data2), data1, data2)
return ret
def dict_recursive_remove_array_and_set(d):
if isinstance(d, np.ndarray):
return d.tolist()
if isinstance(d, set):
return tuple(d)
if isinstance(d, dict):
for k in d.keys():
d[k] = dict_recursive_remove_array_and_set(d[k])
return d
def save_summary_anda_mapping(summary_file_path, mapping_file_path, summary, mapping):
with open(summary_file_path, "wb") as file:
pickle.dump(dict_recursive_remove_array_and_set(summary), file)
with open(mapping_file_path, "wb") as file:
pickle.dump(mapping, file)
print(
"\n ================ Dataset Summary and Mapping are saved at: {} "
"================ \n".format(summary_file_path)
)
def read_dataset_summary(dataset_path):
return sd_utils.read_dataset_summary(dataset_path)
def read_scenario(dataset_path, mapping, scenario_file_name):
"""
read a scenario
:param dataset_path: the location where dataset_summary.pkl is
:param mapping: a dict recording the relative position from dataset_path to real scenario file
:param scenario_file_name: scenario filename
:return: ScenarioDescription
"""
return sd_utils.read_scenario_data(os.path.join(dataset_path, mapping[scenario_file_name], scenario_file_name))

View File

View File

View File

@@ -0,0 +1,42 @@
import logging
from metadrive.type import MetaDriveType
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType
from nuplan.common.maps.maps_datatypes import TrafficLightStatusType
NuPlanEgoType = TrackedObjectType.EGO
except ImportError as e:
logger.warning("Can not import nuplan-devkit: {}".format(e))
def get_traffic_obj_type(nuplan_type):
if nuplan_type == TrackedObjectType.VEHICLE:
return MetaDriveType.VEHICLE
elif nuplan_type == TrackedObjectType.TRAFFIC_CONE:
return MetaDriveType.TRAFFIC_CONE
elif nuplan_type == TrackedObjectType.PEDESTRIAN:
return MetaDriveType.PEDESTRIAN
elif nuplan_type == TrackedObjectType.BICYCLE:
return MetaDriveType.CYCLIST
elif nuplan_type == TrackedObjectType.BARRIER:
return MetaDriveType.TRAFFIC_BARRIER
elif nuplan_type == TrackedObjectType.EGO:
raise ValueError("Ego should not be in detected resukts")
else:
return None
def set_light_status(status):
if status == TrafficLightStatusType.GREEN:
return MetaDriveType.LIGHT_GREEN
elif status == TrafficLightStatusType.RED:
return MetaDriveType.LIGHT_RED
elif status == TrafficLightStatusType.YELLOW:
return MetaDriveType.LIGHT_YELLOW
elif status == TrafficLightStatusType.UNKNOWN:
return MetaDriveType.LIGHT_UNKNOWN

View File

@@ -0,0 +1,492 @@
import logging
import os
import tempfile
from dataclasses import dataclass
from os.path import join
from typing import Union
import numpy as np
from metadrive.scenario import ScenarioDescription as SD
from metadrive.type import MetaDriveType
from shapely.geometry.linestring import LineString
from shapely.geometry.multilinestring import MultiLineString
from scenarionet.converter.nuplan.type import get_traffic_obj_type, NuPlanEgoType, set_light_status
from scenarionet.converter.utils import nuplan_to_metadrive_vector, compute_angular_velocity
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
import geopandas as gpd
from shapely.ops import unary_union
try:
from nuplan.common.actor_state.agent import Agent
from nuplan.common.actor_state.static_object import StaticObject
from nuplan.common.actor_state.state_representation import Point2D
from nuplan.common.maps.maps_datatypes import SemanticMapLayer, StopLineType
from nuplan.planning.scenario_builder.nuplan_db.nuplan_scenario import NuPlanScenario
import hydra
from nuplan.planning.scenario_builder.nuplan_db.nuplan_scenario import NuPlanScenario
from nuplan.planning.script.builders.scenario_building_builder import build_scenario_builder
from nuplan.planning.script.builders.scenario_filter_builder import build_scenario_filter
from nuplan.planning.script.utils import set_up_common_builder
import nuplan
NUPLAN_PACKAGE_PATH = os.path.dirname(nuplan.__file__)
except ImportError as e:
logger.warning("Can not import nuplan-devkit: {}".format(e))
EGO = "ego"
def get_nuplan_scenarios(data_root, map_root, logs: Union[list, None] = None, builder="nuplan_mini"):
"""
Getting scenarios. You could use your parameters to get a bunch of scenarios
:param data_root: path contains .db files, like /nuplan-v1.1/splits/mini
:param map_root: path to map
:param logs: a list of logs, like ['2021.07.16.20.45.29_veh-35_01095_01486']. If none, load all files in data_root
:param builder: builder file, we use the default nuplan builder file
:return:
"""
nuplan_package_path = NUPLAN_PACKAGE_PATH
logs = logs or [file for file in os.listdir(data_root)]
log_string = ""
for log in logs:
if log[-3:] == ".db":
log = log[:-3]
log_string += log
log_string += ","
log_string = log_string[:-1]
dataset_parameters = [
# builder setting
"scenario_builder={}".format(builder),
"scenario_builder.scenario_mapping.subsample_ratio_override=0.5", # 10 hz
"scenario_builder.data_root={}".format(data_root),
"scenario_builder.map_root={}".format(map_root),
# filter
"scenario_filter=all_scenarios", # simulate only one log
"scenario_filter.remove_invalid_goals=true",
"scenario_filter.shuffle=true",
"scenario_filter.log_names=[{}]".format(log_string),
# "scenario_filter.scenario_types={}".format(all_scenario_types),
# "scenario_filter.scenario_tokens=[]",
# "scenario_filter.map_names=[]",
# "scenario_filter.num_scenarios_per_type=1",
# "scenario_filter.limit_total_scenarios=1000",
# "scenario_filter.expand_scenarios=true",
# "scenario_filter.limit_scenarios_per_type=10", # use 10 scenarios per scenario type
"scenario_filter.timestamp_threshold_s=20", # minial scenario duration (s)
]
base_config_path = os.path.join(nuplan_package_path, "planning", "script")
simulation_hydra_paths = construct_simulation_hydra_paths(base_config_path)
# Initialize configuration management system
hydra.core.global_hydra.GlobalHydra.instance().clear() # reinitialize hydra if already initialized
hydra.initialize_config_dir(config_dir=simulation_hydra_paths.config_path)
save_dir = tempfile.mkdtemp()
ego_controller = 'perfect_tracking_controller' # [log_play_back_controller, perfect_tracking_controller]
observation = 'box_observation' # [box_observation, idm_agents_observation, lidar_pc_observation]
# Compose the configuration
overrides = [
f'group={save_dir}',
'worker=sequential',
f'ego_controller={ego_controller}',
f'observation={observation}',
f'hydra.searchpath=[{simulation_hydra_paths.common_dir}, {simulation_hydra_paths.experiment_dir}]',
'output_dir=${group}/${experiment}',
'metric_dir=${group}/${experiment}',
*dataset_parameters,
]
overrides.extend(
[
f'job_name=planner_tutorial', 'experiment=${experiment_name}/${job_name}',
f'experiment_name=planner_tutorial'
]
)
# get config
cfg = hydra.compose(config_name=simulation_hydra_paths.config_name, overrides=overrides)
profiler_name = 'building_simulation'
common_builder = set_up_common_builder(cfg=cfg, profiler_name=profiler_name)
# Build scenario builder
scenario_builder = build_scenario_builder(cfg=cfg)
scenario_filter = build_scenario_filter(cfg.scenario_filter)
# get scenarios from database
return scenario_builder.get_scenarios(scenario_filter, common_builder.worker)
def construct_simulation_hydra_paths(base_config_path: str):
"""
Specifies relative paths to simulation configs to pass to hydra to declutter tutorial.
:param base_config_path: Base config path.
:return: Hydra config path.
"""
common_dir = "file://" + join(base_config_path, 'config', 'common')
config_name = 'default_simulation'
config_path = join(base_config_path, 'config', 'simulation')
experiment_dir = "file://" + join(base_config_path, 'experiments')
return HydraConfigPaths(common_dir, config_name, config_path, experiment_dir)
@dataclass
class HydraConfigPaths:
"""
Stores relative hydra paths to declutter tutorial.
"""
common_dir: str
config_name: str
config_path: str
experiment_dir: str
def extract_centerline(map_obj, nuplan_center):
path = map_obj.baseline_path.discrete_path
points = np.array([nuplan_to_metadrive_vector([pose.x, pose.y], nuplan_center) for pose in path])
return points
def get_points_from_boundary(boundary, center):
path = boundary.discrete_path
points = [(pose.x, pose.y) for pose in path]
points = nuplan_to_metadrive_vector(points, center)
return points
def get_line_type(nuplan_type):
return MetaDriveType.LINE_BROKEN_SINGLE_WHITE
# Always return broken line type
if nuplan_type == 2:
return MetaDriveType.LINE_SOLID_SINGLE_WHITE
elif nuplan_type == 0:
return MetaDriveType.LINE_BROKEN_SINGLE_WHITE
elif nuplan_type == 3:
return MetaDriveType.LINE_UNKNOWN
else:
raise ValueError("Unknown line tyep: {}".format(nuplan_type))
def extract_map_features(map_api, center, radius=250):
ret = {}
np.seterr(all='ignore')
# Center is Important !
layer_names = [
SemanticMapLayer.LANE_CONNECTOR,
SemanticMapLayer.LANE,
SemanticMapLayer.CROSSWALK,
SemanticMapLayer.INTERSECTION,
SemanticMapLayer.STOP_LINE,
SemanticMapLayer.WALKWAYS,
SemanticMapLayer.CARPARK_AREA,
SemanticMapLayer.ROADBLOCK,
SemanticMapLayer.ROADBLOCK_CONNECTOR,
# unsupported yet
# SemanticMapLayer.STOP_SIGN,
# SemanticMapLayer.DRIVABLE_AREA,
]
center_for_query = Point2D(*center)
nearest_vector_map = map_api.get_proximal_map_objects(center_for_query, radius, layer_names)
boundaries = map_api._get_vector_map_layer(SemanticMapLayer.BOUNDARIES)
# Filter out stop polygons in turn stop
if SemanticMapLayer.STOP_LINE in nearest_vector_map:
stop_polygons = nearest_vector_map[SemanticMapLayer.STOP_LINE]
nearest_vector_map[SemanticMapLayer.STOP_LINE] = [
stop_polygon for stop_polygon in stop_polygons if stop_polygon.stop_line_type != StopLineType.TURN_STOP
]
block_polygons = []
for layer in [SemanticMapLayer.ROADBLOCK, SemanticMapLayer.ROADBLOCK_CONNECTOR]:
for block in nearest_vector_map[layer]:
for lane_meta_data in block.interior_edges:
if not hasattr(lane_meta_data, "baseline_path"):
continue
if isinstance(lane_meta_data.polygon.boundary, MultiLineString):
# logger.warning("Stop using boundaries! Use exterior instead!")
boundary = gpd.GeoSeries(lane_meta_data.polygon.boundary).explode(index_parts=True)
sizes = []
for idx, polygon in enumerate(boundary[0]):
sizes.append(len(polygon.xy[1]))
points = boundary[0][np.argmax(sizes)].xy
elif isinstance(lane_meta_data.polygon.boundary, LineString):
points = lane_meta_data.polygon.boundary.xy
polygon = [[points[0][i], points[1][i]] for i in range(len(points[0]))]
polygon = nuplan_to_metadrive_vector(polygon, nuplan_center=[center[0], center[1]])
ret[lane_meta_data.id] = {
SD.TYPE: MetaDriveType.LANE_SURFACE_STREET,
SD.POLYLINE: extract_centerline(lane_meta_data, center),
SD.POLYGON: polygon
}
if layer == SemanticMapLayer.ROADBLOCK_CONNECTOR:
continue
left = lane_meta_data.left_boundary
if left.id not in ret:
# only broken line in nuPlan data
# line_type = get_line_type(int(boundaries.loc[[str(left.id)]]["boundary_type_fid"]))
line_type = MetaDriveType.LINE_BROKEN_SINGLE_WHITE
if line_type != MetaDriveType.LINE_UNKNOWN:
ret[left.id] = {SD.TYPE: line_type, SD.POLYLINE: get_points_from_boundary(left, center)}
if layer == SemanticMapLayer.ROADBLOCK:
block_polygons.append(block.polygon)
interpolygons = [block.polygon for block in nearest_vector_map[SemanticMapLayer.INTERSECTION]]
# logger.warning("Stop using boundaries! Use exterior instead!")
boundaries = gpd.GeoSeries(unary_union(interpolygons + block_polygons)).boundary.explode(index_parts=True)
# boundaries.plot()
# plt.show()
for idx, boundary in enumerate(boundaries[0]):
block_points = np.array(list(i for i in zip(boundary.coords.xy[0], boundary.coords.xy[1])))
block_points = nuplan_to_metadrive_vector(block_points, center)
id = "boundary_{}".format(idx)
ret[id] = {SD.TYPE: MetaDriveType.LINE_SOLID_SINGLE_WHITE, SD.POLYLINE: block_points}
np.seterr(all='warn')
return ret
def set_light_position(scenario, lane_id, center, target_position=8):
lane = scenario.map_api.get_map_object(str(lane_id), SemanticMapLayer.LANE_CONNECTOR)
assert lane is not None, "Can not find lane: {}".format(lane_id)
path = lane.baseline_path.discrete_path
acc_length = 0
point = [path[0].x, path[0].y]
for k, point in enumerate(path[1:], start=1):
previous_p = path[k - 1]
acc_length += np.linalg.norm([point.x - previous_p.x, point.y - previous_p.y])
if acc_length > target_position:
break
return [point.x - center[0], point.y - center[1]]
def extract_traffic_light(scenario, center):
length = scenario.get_number_of_iterations()
frames = [
{str(t.lane_connector_id): t.status
for t in scenario.get_traffic_light_status_at_iteration(i)} for i in range(length)
]
all_lights = set()
for frame in frames:
all_lights.update(frame.keys())
lights = {
k: {
"type": MetaDriveType.TRAFFIC_LIGHT,
"state": {
SD.TRAFFIC_LIGHT_STATUS: [MetaDriveType.LIGHT_UNKNOWN] * length
},
SD.TRAFFIC_LIGHT_POSITION: None,
SD.TRAFFIC_LIGHT_LANE: str(k),
"metadata": dict(track_length=length, type=None, object_id=str(k), lane_id=str(k), dataset="nuplan")
}
for k in list(all_lights)
}
for k, frame in enumerate(frames):
for lane_id, status in frame.items():
lane_id = str(lane_id)
lights[lane_id]["state"][SD.TRAFFIC_LIGHT_STATUS][k] = set_light_status(status)
if lights[lane_id][SD.TRAFFIC_LIGHT_POSITION] is None:
assert isinstance(lane_id, str), "Lane ID should be str"
lights[lane_id][SD.TRAFFIC_LIGHT_POSITION] = set_light_position(scenario, lane_id, center)
lights[lane_id][SD.METADATA][SD.TYPE] = MetaDriveType.TRAFFIC_LIGHT
return lights
def parse_object_state(obj_state, nuplan_center):
ret = {}
ret["position"] = nuplan_to_metadrive_vector([obj_state.center.x, obj_state.center.y], nuplan_center)
ret["heading"] = obj_state.center.heading
ret["velocity"] = nuplan_to_metadrive_vector([obj_state.velocity.x, obj_state.velocity.y])
ret["valid"] = 1
ret["length"] = obj_state.box.length
ret["width"] = obj_state.box.width
ret["height"] = obj_state.box.height
return ret
def parse_ego_vehicle_state(state, nuplan_center):
center = nuplan_center
ret = {}
ret["position"] = nuplan_to_metadrive_vector([state.waypoint.x, state.waypoint.y], center)
ret["heading"] = state.waypoint.heading
ret["velocity"] = nuplan_to_metadrive_vector([state.agent.velocity.x, state.agent.velocity.y])
ret["angular_velocity"] = state.dynamic_car_state.angular_velocity
ret["valid"] = 1
ret["length"] = state.agent.box.length
ret["width"] = state.agent.box.width
ret["height"] = state.agent.box.height
return ret
def parse_ego_vehicle_state_trajectory(scenario, nuplan_center):
data = [
parse_ego_vehicle_state(scenario.get_ego_state_at_iteration(i), nuplan_center)
for i in range(scenario.get_number_of_iterations())
]
for i in range(len(data) - 1):
data[i]["angular_velocity"] = compute_angular_velocity(
initial_heading=data[i]["heading"], final_heading=data[i + 1]["heading"], dt=scenario.database_interval
)
return data
def extract_traffic(scenario: NuPlanScenario, center):
episode_len = scenario.get_number_of_iterations()
detection_ret = []
all_objs = set()
all_objs.add(EGO)
for frame_data in [scenario.get_tracked_objects_at_iteration(i).tracked_objects for i in range(episode_len)]:
new_frame_data = {}
for obj in frame_data:
new_frame_data[obj.track_token] = obj
all_objs.add(obj.track_token)
detection_ret.append(new_frame_data)
tracks = {
k: dict(
type=MetaDriveType.UNSET,
state=dict(
position=np.zeros(shape=(episode_len, 3)),
heading=np.zeros(shape=(episode_len, )),
velocity=np.zeros(shape=(episode_len, 2)),
valid=np.zeros(shape=(episode_len, )),
length=np.zeros(shape=(episode_len, 1)),
width=np.zeros(shape=(episode_len, 1)),
height=np.zeros(shape=(episode_len, 1))
),
metadata=dict(track_length=episode_len, nuplan_type=None, type=None, object_id=k, nuplan_id=k)
)
for k in list(all_objs)
}
tracks_to_remove = set()
for frame_idx, frame in enumerate(detection_ret):
for nuplan_id, obj_state, in frame.items():
assert isinstance(obj_state, Agent) or isinstance(obj_state, StaticObject)
obj_type = get_traffic_obj_type(obj_state.tracked_object_type)
if obj_type is None:
tracks_to_remove.add(nuplan_id)
continue
tracks[nuplan_id][SD.TYPE] = obj_type
if tracks[nuplan_id][SD.METADATA]["nuplan_type"] is None:
tracks[nuplan_id][SD.METADATA]["nuplan_type"] = int(obj_state.tracked_object_type)
tracks[nuplan_id][SD.METADATA]["type"] = obj_type
state = parse_object_state(obj_state, center)
tracks[nuplan_id]["state"]["position"][frame_idx] = [state["position"][0], state["position"][1], 0.0]
tracks[nuplan_id]["state"]["heading"][frame_idx] = state["heading"]
tracks[nuplan_id]["state"]["velocity"][frame_idx] = state["velocity"]
tracks[nuplan_id]["state"]["valid"][frame_idx] = 1
tracks[nuplan_id]["state"]["length"][frame_idx] = state["length"]
tracks[nuplan_id]["state"]["width"][frame_idx] = state["width"]
tracks[nuplan_id]["state"]["height"][frame_idx] = state["height"]
for track in list(tracks_to_remove):
tracks.pop(track)
# ego
sdc_traj = parse_ego_vehicle_state_trajectory(scenario, center)
ego_track = tracks[EGO]
for frame_idx, obj_state in enumerate(sdc_traj):
obj_type = MetaDriveType.VEHICLE
ego_track[SD.TYPE] = obj_type
if ego_track[SD.METADATA]["nuplan_type"] is None:
ego_track[SD.METADATA]["nuplan_type"] = int(NuPlanEgoType)
ego_track[SD.METADATA]["type"] = obj_type
state = obj_state
ego_track["state"]["position"][frame_idx] = [state["position"][0], state["position"][1], 0.0]
ego_track["state"]["valid"][frame_idx] = 1
ego_track["state"]["heading"][frame_idx] = state["heading"]
# this velocity is in ego car frame, abort
# ego_track["state"]["velocity"][frame_idx] = state["velocity"]
ego_track["state"]["length"][frame_idx] = state["length"]
ego_track["state"]["width"][frame_idx] = state["width"]
ego_track["state"]["height"][frame_idx] = state["height"]
# get velocity here
vel = ego_track["state"]["position"][1:] - ego_track["state"]["position"][:-1]
ego_track["state"]["velocity"][:-1] = vel[..., :2] / 0.1
ego_track["state"]["velocity"][-1] = ego_track["state"]["velocity"][-2]
# check
assert EGO in tracks
for track_id in tracks:
assert tracks[track_id][SD.TYPE] != MetaDriveType.UNSET
return tracks
def convert_nuplan_scenario(scenario: NuPlanScenario, version):
"""
Data will be interpolated to 0.1s time interval, while the time interval of original key frames are 0.5s.
"""
scenario_log_interval = scenario.database_interval
assert abs(scenario_log_interval - 0.1) < 1e-3, "Log interval should be 0.1 or Interpolating is required! " \
"By setting NuPlan subsample ratio can address this"
result = SD()
result[SD.ID] = scenario.scenario_name
result[SD.VERSION] = "nuplan_" + version
result[SD.LENGTH] = scenario.get_number_of_iterations()
# metadata
result[SD.METADATA] = {}
result[SD.METADATA]["dataset"] = "nuplan"
result[SD.METADATA]["map"] = scenario.map_api.map_name
result[SD.METADATA][SD.METADRIVE_PROCESSED] = False
result[SD.METADATA]["map_version"] = scenario.map_version
result[SD.METADATA]["log_name"] = scenario.log_name
result[SD.METADATA]["scenario_extraction_info"] = scenario._scenario_extraction_info.__dict__
result[SD.METADATA]["ego_vehicle_parameters"] = scenario.ego_vehicle_parameters.__dict__
result[SD.METADATA]["coordinate"] = "right-handed"
result[SD.METADATA]["scenario_token"] = scenario.scenario_name
result[SD.METADATA]["scenario_id"] = scenario.scenario_name
result[SD.METADATA][SD.ID] = scenario.scenario_name
result[SD.METADATA]["scenario_type"] = scenario.scenario_type
result[SD.METADATA]["sample_rate"] = scenario_log_interval
result[SD.METADATA][SD.TIMESTEP] = np.asarray([i * scenario_log_interval for i in range(result[SD.LENGTH])])
# centered all positions to ego car
state = scenario.get_ego_state_at_iteration(0)
scenario_center = [state.waypoint.x, state.waypoint.y]
result[SD.TRACKS] = extract_traffic(scenario, scenario_center)
result[SD.METADATA][SD.SDC_ID] = EGO
# traffic light
result[SD.DYNAMIC_MAP_STATES] = extract_traffic_light(scenario, scenario_center)
# map
result[SD.MAP_FEATURES] = extract_map_features(scenario.map_api, scenario_center)
return result
# only for example using
example_scenario_types = "[behind_pedestrian_on_pickup_dropoff, \
near_multiple_vehicles, \
high_magnitude_jerk, \
crossed_by_vehicle, \
following_lane_with_lead, \
changing_lane_to_left, \
accelerating_at_traffic_light_without_lead, \
stopping_at_stop_sign_with_lead, \
traversing_narrow_lane, \
waiting_for_pedestrian_to_cross, \
starting_left_turn, \
starting_high_speed_turn, \
starting_unprotected_cross_turn, \
starting_protected_noncross_turn, \
on_pickup_dropoff]"

View File

@@ -0,0 +1,78 @@
ALL_TYPE = {
"noise": 'noise',
"human.pedestrian.adult": 'adult',
"human.pedestrian.child": 'child',
"human.pedestrian.wheelchair": 'wheelchair',
"human.pedestrian.stroller": 'stroller',
"human.pedestrian.personal_mobility": 'p.mobility',
"human.pedestrian.police_officer": 'police',
"human.pedestrian.construction_worker": 'worker',
"animal": 'animal',
"vehicle.car": 'car',
"vehicle.motorcycle": 'motorcycle',
"vehicle.bicycle": 'bicycle',
"vehicle.bus.bendy": 'bus.bendy',
"vehicle.bus.rigid": 'bus.rigid',
"vehicle.truck": 'truck',
"vehicle.construction": 'constr. veh',
"vehicle.emergency.ambulance": 'ambulance',
"vehicle.emergency.police": 'police car',
"vehicle.trailer": 'trailer',
"movable_object.barrier": 'barrier',
"movable_object.trafficcone": 'trafficcone',
"movable_object.pushable_pullable": 'push/pullable',
"movable_object.debris": 'debris',
"static_object.bicycle_rack": 'bicycle racks',
"flat.driveable_surface": 'driveable',
"flat.sidewalk": 'sidewalk',
"flat.terrain": 'terrain',
"flat.other": 'flat.other',
"static.manmade": 'manmade',
"static.vegetation": 'vegetation',
"static.other": 'static.other',
"vehicle.ego": "ego"
}
NOISE_TYPE = {
"noise": 'noise',
"animal": 'animal',
"static_object.bicycle_rack": 'bicycle racks',
"movable_object.pushable_pullable": 'push/pullable',
"movable_object.debris": 'debris',
"static.manmade": 'manmade',
"static.vegetation": 'vegetation',
"static.other": 'static.other',
}
HUMAN_TYPE = {
"human.pedestrian.adult": 'adult',
"human.pedestrian.child": 'child',
"human.pedestrian.wheelchair": 'wheelchair',
"human.pedestrian.stroller": 'stroller',
"human.pedestrian.personal_mobility": 'p.mobility',
"human.pedestrian.police_officer": 'police',
"human.pedestrian.construction_worker": 'worker',
}
BICYCLE_TYPE = {
"vehicle.bicycle": 'bicycle',
"vehicle.motorcycle": 'motorcycle',
}
VEHICLE_TYPE = {
"vehicle.car": 'car',
"vehicle.bus.bendy": 'bus.bendy',
"vehicle.bus.rigid": 'bus.rigid',
"vehicle.truck": 'truck',
"vehicle.construction": 'constr. veh',
"vehicle.emergency.ambulance": 'ambulance',
"vehicle.emergency.police": 'police car',
"vehicle.trailer": 'trailer',
"vehicle.ego": "ego",
}
OBSTACLE_TYPE = {
"movable_object.barrier": 'barrier',
"movable_object.trafficcone": 'trafficcone',
}
TERRAIN_TYPE = {
"flat.driveable_surface": 'driveable',
"flat.sidewalk": 'sidewalk',
"flat.terrain": 'terrain',
"flat.other": 'flat.other'
}

View File

@@ -0,0 +1,394 @@
import copy
import logging
import geopandas as gpd
import numpy as np
from metadrive.scenario import ScenarioDescription as SD
from metadrive.type import MetaDriveType
from shapely.ops import unary_union
from scenarionet.converter.nuscenes.type import ALL_TYPE, HUMAN_TYPE, BICYCLE_TYPE, VEHICLE_TYPE
logger = logging.getLogger(__name__)
try:
from nuscenes import NuScenes
from nuscenes.can_bus.can_bus_api import NuScenesCanBus
from nuscenes.eval.common.utils import quaternion_yaw
from nuscenes.map_expansion.arcline_path_utils import discretize_lane
from nuscenes.map_expansion.map_api import NuScenesMap
from pyquaternion import Quaternion
except ImportError as e:
logger.warning("Can not import nuscenes-devkit: {}".format(e))
EGO = "ego"
def get_metadrive_type(obj_type):
meta_type = obj_type
md_type = None
if ALL_TYPE[obj_type] == "barrier":
md_type = MetaDriveType.TRAFFIC_BARRIER
elif ALL_TYPE[obj_type] == "trafficcone":
md_type = MetaDriveType.TRAFFIC_CONE
elif obj_type in VEHICLE_TYPE:
md_type = MetaDriveType.VEHICLE
elif obj_type in HUMAN_TYPE:
md_type = MetaDriveType.PEDESTRIAN
elif obj_type in BICYCLE_TYPE:
md_type = MetaDriveType.CYCLIST
# assert meta_type != MetaDriveType.UNSET and meta_type != "noise"
return md_type, meta_type
def parse_frame(frame, nuscenes: NuScenes):
ret = {}
for obj_id in frame["anns"]:
obj = nuscenes.get("sample_annotation", obj_id)
# velocity = nuscenes.box_velocity(obj_id)[:2]
# if np.nan in velocity:
velocity = np.array([0.0, 0.0])
ret[obj["instance_token"]] = {
"position": obj["translation"],
"obj_id": obj["instance_token"],
"heading": quaternion_yaw(Quaternion(*obj["rotation"])),
"rotation": obj["rotation"],
"velocity": velocity,
"size": obj["size"],
"visible": obj["visibility_token"],
"attribute": [nuscenes.get("attribute", i)["name"] for i in obj["attribute_tokens"]],
"type": obj["category_name"]
}
ego_token = nuscenes.get("sample_data", frame["data"]["LIDAR_TOP"])["ego_pose_token"]
ego_state = nuscenes.get("ego_pose", ego_token)
ret[EGO] = {
"position": ego_state["translation"],
"obj_id": EGO,
"heading": quaternion_yaw(Quaternion(*ego_state["rotation"])),
"rotation": ego_state["rotation"],
"type": "vehicle.car",
"velocity": np.array([0.0, 0.0]),
# size https://en.wikipedia.org/wiki/Renault_Zoe
"size": [4.08, 1.73, 1.56],
}
return ret
def interpolate_heading(heading_data, old_valid, new_valid, num_to_interpolate=5):
new_heading_theta = np.zeros_like(new_valid)
for k, valid in enumerate(old_valid[:-1]):
if abs(valid) > 1e-1 and abs(old_valid[k + 1]) > 1e-1:
diff = (heading_data[k + 1] - heading_data[k] + np.pi) % (2 * np.pi) - np.pi
# step = diff
interpolate_heading = np.linspace(heading_data[k], heading_data[k] + diff, 6)
new_heading_theta[k * num_to_interpolate:(k + 1) * num_to_interpolate] = interpolate_heading[:-1]
elif abs(valid) > 1e-1 and abs(old_valid[k + 1]) < 1e-1:
new_heading_theta[k * num_to_interpolate:(k + 1) * num_to_interpolate] = heading_data[k]
new_heading_theta[-1] = heading_data[-1]
return new_heading_theta * new_valid
def _interpolate_one_dim(data, old_valid, new_valid, num_to_interpolate=5):
new_data = np.zeros_like(new_valid)
for k, valid in enumerate(old_valid[:-1]):
if abs(valid) > 1e-1 and abs(old_valid[k + 1]) > 1e-1:
diff = data[k + 1] - data[k]
# step = diff
interpolate_data = np.linspace(data[k], data[k] + diff, num_to_interpolate + 1)
new_data[k * num_to_interpolate:(k + 1) * num_to_interpolate] = interpolate_data[:-1]
elif abs(valid) > 1e-1 and abs(old_valid[k + 1]) < 1e-1:
new_data[k * num_to_interpolate:(k + 1) * num_to_interpolate] = data[k]
new_data[-1] = data[-1]
return new_data * new_valid
def interpolate(origin_y, valid, new_valid):
if len(origin_y.shape) == 1:
ret = _interpolate_one_dim(origin_y, valid, new_valid)
elif len(origin_y.shape) == 2:
ret = []
for dim in range(origin_y.shape[-1]):
new_y = _interpolate_one_dim(origin_y[..., dim], valid, new_valid)
new_y = np.expand_dims(new_y, axis=-1)
ret.append(new_y)
ret = np.concatenate(ret, axis=-1)
else:
raise ValueError("Y has shape {}, Can not interpolate".format(origin_y.shape))
return ret
def get_tracks_from_frames(nuscenes: NuScenes, scene_info, frames, num_to_interpolate=5):
episode_len = len(frames)
# Fill tracks
all_objs = set()
for frame in frames:
all_objs.update(frame.keys())
tracks = {
k: dict(
type=MetaDriveType.UNSET,
state=dict(
position=np.zeros(shape=(episode_len, 3)),
heading=np.zeros(shape=(episode_len, )),
velocity=np.zeros(shape=(episode_len, 2)),
valid=np.zeros(shape=(episode_len, )),
length=np.zeros(shape=(episode_len, 1)),
width=np.zeros(shape=(episode_len, 1)),
height=np.zeros(shape=(episode_len, 1))
),
metadata=dict(track_length=episode_len, type=MetaDriveType.UNSET, object_id=k, original_id=k)
)
for k in list(all_objs)
}
tracks_to_remove = set()
for frame_idx in range(episode_len):
# Record all agents' states (position, velocity, ...)
for id, state in frames[frame_idx].items():
# Fill type
md_type, meta_type = get_metadrive_type(state["type"])
tracks[id]["type"] = md_type
tracks[id][SD.METADATA]["type"] = meta_type
if md_type is None or md_type == MetaDriveType.UNSET:
tracks_to_remove.add(id)
continue
tracks[id]["type"] = md_type
tracks[id][SD.METADATA]["type"] = meta_type
# Introducing the state item
tracks[id]["state"]["position"][frame_idx] = state["position"]
tracks[id]["state"]["heading"][frame_idx] = state["heading"]
tracks[id]["state"]["velocity"][frame_idx] = tracks[id]["state"]["velocity"][frame_idx]
tracks[id]["state"]["valid"][frame_idx] = 1
tracks[id]["state"]["length"][frame_idx] = state["size"][1]
tracks[id]["state"]["width"][frame_idx] = state["size"][0]
tracks[id]["state"]["height"][frame_idx] = state["size"][2]
tracks[id]["metadata"]["original_id"] = id
tracks[id]["metadata"]["object_id"] = id
for track in tracks_to_remove:
track_data = tracks.pop(track)
obj_type = track_data[SD.METADATA]["type"]
print("\nWARNING: Can not map type: {} to any MetaDrive Type".format(obj_type))
new_episode_len = (episode_len - 1) * num_to_interpolate + 1
# interpolate
interpolate_tracks = {}
for id, track, in tracks.items():
interpolate_tracks[id] = copy.deepcopy(track)
interpolate_tracks[id]["metadata"]["track_length"] = new_episode_len
# valid first
new_valid = np.zeros(shape=(new_episode_len, ))
if track["state"]["valid"][0]:
new_valid[0] = 1
for k, valid in enumerate(track["state"]["valid"][1:], start=1):
if valid:
if abs(new_valid[(k - 1) * num_to_interpolate] - 1) < 1e-2:
start_idx = (k - 1) * num_to_interpolate + 1
else:
start_idx = k * num_to_interpolate
new_valid[start_idx:k * num_to_interpolate + 1] = 1
interpolate_tracks[id]["state"]["valid"] = new_valid
# position
interpolate_tracks[id]["state"]["position"] = interpolate(
track["state"]["position"], track["state"]["valid"], new_valid
)
if id == "ego":
# We can get it from canbus
canbus = NuScenesCanBus(dataroot=nuscenes.dataroot)
imu_pos = np.asarray([state["pos"] for state in canbus.get_messages(scene_info["name"], "pose")[::5]])
interpolate_tracks[id]["state"]["position"][:len(imu_pos)] = imu_pos
# velocity
interpolate_tracks[id]["state"]["velocity"] = interpolate(
track["state"]["velocity"], track["state"]["valid"], new_valid
)
vel = interpolate_tracks[id]["state"]["position"][1:] - interpolate_tracks[id]["state"]["position"][:-1]
interpolate_tracks[id]["state"]["velocity"][:-1] = vel[..., :2] / 0.1
for k, valid in enumerate(new_valid[1:], start=1):
if valid == 0 or not valid or abs(valid) < 1e-2:
interpolate_tracks[id]["state"]["velocity"][k] = np.array([0., 0.])
interpolate_tracks[id]["state"]["velocity"][k - 1] = np.array([0., 0.])
# speed outlier check
max_vel = np.max(np.linalg.norm(interpolate_tracks[id]["state"]["velocity"], axis=-1))
assert max_vel < 50, "Abnormal velocity!"
if max_vel > 30:
print("\nWARNING: Too large peed for {}: {}".format(id, max_vel))
# heading
# then update position
new_heading = interpolate_heading(track["state"]["heading"], track["state"]["valid"], new_valid)
interpolate_tracks[id]["state"]["heading"] = new_heading
if id == "ego":
# We can get it from canbus
canbus = NuScenesCanBus(dataroot=nuscenes.dataroot)
imu_heading = np.asarray(
[
quaternion_yaw(Quaternion(state["orientation"]))
for state in canbus.get_messages(scene_info["name"], "pose")[::5]
]
)
interpolate_tracks[id]["state"]["heading"][:len(imu_heading)] = imu_heading
for k, v in track["state"].items():
if k in ["valid", "heading", "position", "velocity"]:
continue
else:
interpolate_tracks[id]["state"][k] = interpolate(v, track["state"]["valid"], new_valid)
# if id == "ego":
# ego is valid all time, so we can calculate the velocity in this way
return interpolate_tracks
def get_map_features(scene_info, nuscenes: NuScenes, map_center, radius=250, points_distance=1):
"""
Extract map features from nuscenes data. The objects in specified region will be returned. Sampling rate determines
the distance between 2 points when extracting lane center line.
"""
ret = {}
map_name = nuscenes.get("log", scene_info["log_token"])["location"]
map_api = NuScenesMap(dataroot=nuscenes.dataroot, map_name=map_name)
layer_names = [
# "line",
# "polygon",
# "node",
'drivable_area',
'road_segment',
'road_block',
'lane',
# 'ped_crossing',
# 'walkway',
# 'stop_line',
# 'carpark_area',
'lane_connector',
'road_divider',
'lane_divider',
'traffic_light'
]
map_objs = map_api.get_records_in_radius(map_center[0], map_center[1], radius, layer_names)
# build map boundary
polygons = []
# for id in map_objs["drivable_area"]:
# seg_info = map_api.get("drivable_area", id)
# assert seg_info["token"] == id
# for polygon_token in seg_info["polygon_tokens"]:
# polygon = map_api.extract_polygon(polygon_token)
# polygons.append(polygon)
for id in map_objs["road_segment"]:
seg_info = map_api.get("road_segment", id)
assert seg_info["token"] == id
polygon = map_api.extract_polygon(seg_info["polygon_token"])
polygons.append(polygon)
for id in map_objs["road_block"]:
seg_info = map_api.get("road_block", id)
assert seg_info["token"] == id
polygon = map_api.extract_polygon(seg_info["polygon_token"])
polygons.append(polygon)
polygons = [geom if geom.is_valid else geom.buffer(0) for geom in polygons]
# logger.warning("Stop using boundaries! Use exterior instead!")
boundaries = gpd.GeoSeries(unary_union(polygons)).boundary.explode(index_parts=True)
for idx, boundary in enumerate(boundaries[0]):
block_points = np.array(list(i for i in zip(boundary.coords.xy[0], boundary.coords.xy[1])))
id = "boundary_{}".format(idx)
ret[id] = {SD.TYPE: MetaDriveType.LINE_SOLID_SINGLE_WHITE, SD.POLYLINE: block_points}
for id in map_objs["lane_divider"]:
line_info = map_api.get("lane_divider", id)
assert line_info["token"] == id
line = map_api.extract_line(line_info["line_token"]).coords.xy
line = [[line[0][i], line[1][i]] for i in range(len(line[0]))]
ret[id] = {SD.TYPE: MetaDriveType.LINE_BROKEN_SINGLE_WHITE, SD.POLYLINE: line}
for id in map_objs["road_divider"]:
line_info = map_api.get("road_divider", id)
assert line_info["token"] == id
line = map_api.extract_line(line_info["line_token"]).coords.xy
line = [[line[0][i], line[1][i]] for i in range(len(line[0]))]
ret[id] = {SD.TYPE: MetaDriveType.LINE_SOLID_SINGLE_YELLOW, SD.POLYLINE: line}
for id in map_objs["lane"]:
lane_info = map_api.get("lane", id)
assert lane_info["token"] == id
boundary = map_api.extract_polygon(lane_info["polygon_token"]).boundary.xy
boundary_polygon = [[boundary[0][i], boundary[1][i]] for i in range(len(boundary[0]))]
# boundary_polygon += [[boundary[0][i], boundary[1][i]] for i in range(len(boundary[0]))]
ret[id] = {
SD.TYPE: MetaDriveType.LANE_SURFACE_STREET,
SD.POLYLINE: discretize_lane(map_api.arcline_path_3[id], resolution_meters=points_distance),
SD.POLYGON: boundary_polygon,
}
for id in map_objs["lane_connector"]:
lane_info = map_api.get("lane_connector", id)
assert lane_info["token"] == id
# boundary = map_api.extract_polygon(lane_info["polygon_token"]).boundary.xy
# boundary_polygon = [[boundary[0][i], boundary[1][i], 0.1] for i in range(len(boundary[0]))]
# boundary_polygon += [[boundary[0][i], boundary[1][i], 0.] for i in range(len(boundary[0]))]
ret[id] = {
SD.TYPE: MetaDriveType.LANE_SURFACE_STREET,
SD.POLYLINE: discretize_lane(map_api.arcline_path_3[id], resolution_meters=points_distance),
# SD.POLYGON: boundary_polygon,
"speed_limit_kmh": 100
}
return ret
def convert_nuscenes_scenario(scene, version, nuscenes: NuScenes):
"""
Data will be interpolated to 0.1s time interval, while the time interval of original key frames are 0.5s.
"""
scene_token = scene["token"]
scenario_log_interval = 0.1
scene_info = nuscenes.get("scene", scene_token)
frames = []
current_frame = nuscenes.get("sample", scene_info["first_sample_token"])
while current_frame["token"] != scene_info["last_sample_token"]:
frames.append(parse_frame(current_frame, nuscenes))
current_frame = nuscenes.get("sample", current_frame["next"])
frames.append(parse_frame(current_frame, nuscenes))
assert current_frame["next"] == ""
assert len(frames) == scene_info["nbr_samples"], "Number of sample mismatches! "
result = SD()
result[SD.ID] = scene_info["name"]
result[SD.VERSION] = "nuscenes" + version
result[SD.LENGTH] = (len(frames) - 1) * 5 + 1
result[SD.METADATA] = {}
result[SD.METADATA]["dataset"] = "nuscenes"
result[SD.METADATA][SD.METADRIVE_PROCESSED] = False
result[SD.METADATA]["map"] = nuscenes.get("log", scene_info["log_token"])["location"]
result[SD.METADATA]["date"] = nuscenes.get("log", scene_info["log_token"])["date_captured"]
result[SD.METADATA]["coordinate"] = "right-handed"
result[SD.METADATA]["scenario_token"] = scene_token
result[SD.METADATA][SD.ID] = scene_info["name"]
result[SD.METADATA]["scenario_id"] = scene_info["name"]
result[SD.METADATA]["sample_rate"] = scenario_log_interval
result[SD.METADATA][SD.TIMESTEP] = np.arange(0., (len(frames) - 1) * 0.5 + 0.1, 0.1)
# interpolating to 0.1s interval
result[SD.TRACKS] = get_tracks_from_frames(nuscenes, scene_info, frames, num_to_interpolate=5)
result[SD.METADATA][SD.SDC_ID] = "ego"
# No traffic light in nuscenes at this stage
result[SD.DYNAMIC_MAP_STATES] = {}
# map
map_center = result[SD.TRACKS]["ego"]["state"]["position"][0]
result[SD.MAP_FEATURES] = get_map_features(scene_info, nuscenes, map_center, 250)
return result
def get_nuscenes_scenarios(dataroot, version):
nusc = NuScenes(version=version, dataroot=dataroot)
scenarios = nusc.scene
return scenarios, nusc

View File

View File

@@ -0,0 +1,32 @@
from metadrive.envs.metadrive_env import MetaDriveEnv
import logging
from metadrive.scenario.scenario_description import ScenarioDescription as SD
def convert_pg_scenario(scenario_index, version, env):
"""
Simulate to collect PG Scenarios
:param scenario_index: the index to export
:param version: place holder
:param env: metadrive env instance
"""
logging.disable(logging.INFO)
policy = lambda x: [0, 1] # placeholder
scenarios, done_info = env.export_scenarios(policy, scenario_index=[scenario_index], to_dict=False)
scenario = scenarios[scenario_index]
assert scenario[SD.VERSION] == version, "Data version mismatch"
return scenario
def get_pg_scenarios(num_scenarios, policy, start_seed=0):
env = MetaDriveEnv(
dict(
start_seed=start_seed,
num_scenarios=num_scenarios,
traffic_density=0.2,
agent_policy=policy,
crash_vehicle_done=False,
map=2
)
)
return [i for i in range(num_scenarios)], env

View File

@@ -0,0 +1,229 @@
import ast
import copy
import inspect
import logging
import math
import multiprocessing
import os
import pickle
import shutil
from functools import partial
import numpy as np
import tqdm
from metadrive.scenario import ScenarioDescription as SD
from scenarionet.builder.utils import combine_multiple_dataset
from scenarionet.common_utils import save_summary_anda_mapping
logger = logging.getLogger(__file__)
def nuplan_to_metadrive_vector(vector, nuplan_center=(0, 0)):
"All vec in nuplan should be centered in (0,0) to avoid numerical explosion"
vector = np.array(vector)
vector -= np.asarray(nuplan_center)
return vector
def compute_angular_velocity(initial_heading, final_heading, dt):
"""
Calculate the angular velocity between two headings given in radians.
Parameters:
initial_heading (float): The initial heading in radians.
final_heading (float): The final heading in radians.
dt (float): The time interval between the two headings in seconds.
Returns:
float: The angular velocity in radians per second.
"""
# Calculate the difference in headings
delta_heading = final_heading - initial_heading
# Adjust the delta_heading to be in the range (-π, π]
delta_heading = (delta_heading + math.pi) % (2 * math.pi) - math.pi
# Compute the angular velocity
angular_vel = delta_heading / dt
return angular_vel
def mph_to_kmh(speed_in_mph: float):
speed_in_kmh = speed_in_mph * 1.609344
return speed_in_kmh
def contains_explicit_return(f):
return any(isinstance(node, ast.Return) for node in ast.walk(ast.parse(inspect.getsource(f))))
def write_to_directory(
convert_func,
scenarios,
output_path,
dataset_version,
dataset_name,
force_overwrite=False,
num_workers=8,
**kwargs
):
# make sure dir not exist
save_path = copy.deepcopy(output_path)
if os.path.exists(output_path):
if not force_overwrite:
raise ValueError(
"Directory {} already exists! Abort. "
"\n Try setting force_overwrite=True or adding --overwrite".format(output_path)
)
basename = os.path.basename(output_path)
dir = os.path.dirname(output_path)
for i in range(num_workers):
output_path = os.path.join(dir, "{}_{}".format(basename, str(i)))
if os.path.exists(output_path):
if not force_overwrite:
raise ValueError(
"Directory {} already exists! Abort. "
"\n Try setting force_overwrite=True or adding --overwrite".format(output_path)
)
# get arguments for workers
num_files = len(scenarios)
if num_files < num_workers:
# single process
logger.info("Use one worker, as num_scenario < num_workers:")
num_workers = 1
argument_list = []
output_pathes = []
num_files_each_worker = int(num_files // num_workers)
for i in range(num_workers):
if i == num_workers - 1:
end_idx = num_files
else:
end_idx = (i + 1) * num_files_each_worker
output_path = os.path.join(dir, "{}_{}".format(basename, str(i)))
output_pathes.append(output_path)
argument_list.append([scenarios[i * num_files_each_worker:end_idx], kwargs, i, output_path])
# prefill arguments
func = partial(
writing_to_directory_wrapper,
convert_func=convert_func,
dataset_version=dataset_version,
dataset_name=dataset_name,
force_overwrite=force_overwrite
)
# Run, workers and process result from worker
with multiprocessing.Pool(num_workers) as p:
all_result = list(p.imap(func, argument_list))
combine_multiple_dataset(
save_path, *output_pathes, force_overwrite=force_overwrite, try_generate_missing_file=False
)
return all_result
def writing_to_directory_wrapper(args, convert_func, dataset_version, dataset_name, force_overwrite=False):
return write_to_directory_single_worker(
convert_func=convert_func,
scenarios=args[0],
output_path=args[3],
dataset_version=dataset_version,
dataset_name=dataset_name,
force_overwrite=force_overwrite,
worker_index=args[2],
**args[1]
)
def write_to_directory_single_worker(
convert_func,
scenarios,
output_path,
dataset_version,
dataset_name,
worker_index=0,
force_overwrite=False,
**kwargs
):
"""
Convert a batch of scenarios.
"""
if not contains_explicit_return(convert_func):
raise RuntimeError("The convert function should return a metadata dict")
if "version" in kwargs:
kwargs.pop("version")
logger.info("the specified version in kwargs is replaced by argument: 'dataset_version'")
save_path = copy.deepcopy(output_path)
output_path = output_path + "_tmp"
# meta recorder and data summary
if os.path.exists(output_path):
shutil.rmtree(output_path)
os.makedirs(output_path, exist_ok=False)
# make real save dir
delay_remove = None
if os.path.exists(save_path):
if force_overwrite:
delay_remove = save_path
else:
raise ValueError(
"Directory already exists! Abort."
"\n Try setting force_overwrite=True or using --overwrite"
)
summary_file = SD.DATASET.SUMMARY_FILE
mapping_file = SD.DATASET.MAPPING_FILE
summary_file_path = os.path.join(output_path, summary_file)
mapping_file_path = os.path.join(output_path, mapping_file)
summary = {}
mapping = {}
for scenario in tqdm.tqdm(scenarios, desc="Worker Index: {}".format(worker_index)):
# convert scenario
sd_scenario = convert_func(scenario, dataset_version, **kwargs)
scenario_id = sd_scenario[SD.ID]
export_file_name = SD.get_export_file_name(dataset_name, dataset_version, scenario_id)
# add agents summary
summary_dict = {}
ego_car_id = sd_scenario[SD.METADATA][SD.SDC_ID]
summary_dict[ego_car_id] = SD.get_object_summary(
state_dict=sd_scenario.get_sdc_track()["state"], id=ego_car_id, type=sd_scenario.get_sdc_track()["type"]
)
for track_id, track in sd_scenario[SD.TRACKS].items():
summary_dict[track_id] = SD.get_object_summary(state_dict=track["state"], id=track_id, type=track["type"])
sd_scenario[SD.METADATA][SD.SUMMARY.OBJECT_SUMMARY] = summary_dict
# count some objects occurrence
sd_scenario[SD.METADATA][SD.SUMMARY.NUMBER_SUMMARY] = SD.get_number_summary(sd_scenario)
# update summary/mapping dicy
summary[export_file_name] = copy.deepcopy(sd_scenario[SD.METADATA])
mapping[export_file_name] = "" # in the same dir
# sanity check
sd_scenario = sd_scenario.to_dict()
SD.sanity_check(sd_scenario, check_self_type=True)
# dump
p = os.path.join(output_path, export_file_name)
with open(p, "wb") as f:
pickle.dump(sd_scenario, f)
# store summary file
save_summary_anda_mapping(summary_file_path, mapping_file_path, summary, mapping)
# rename and save
if delay_remove is not None:
assert delay_remove == save_path
shutil.rmtree(delay_remove)
os.rename(output_path, save_path)
return summary, mapping

View File

View File

@@ -0,0 +1,85 @@
TrafficSignal = {
0: 'LANE_STATE_UNKNOWN',
1: 'LANE_STATE_ARROW_STOP',
2: 'LANE_STATE_ARROW_CAUTION',
3: 'LANE_STATE_ARROW_GO',
4: 'LANE_STATE_STOP',
5: 'LANE_STATE_CAUTION',
6: 'LANE_STATE_GO',
7: 'LANE_STATE_FLASHING_STOP',
8: 'LANE_STATE_FLASHING_CAUTION'
}
class WaymoLaneType:
UNKNOWN = 0
LANE_FREEWAY = 1
LANE_SURFACE_STREET = 2
LANE_BIKE_LANE = 3
ENUM_TO_STR = {
UNKNOWN: 'UNKNOWN',
LANE_FREEWAY: 'LANE_FREEWAY',
LANE_SURFACE_STREET: 'LANE_SURFACE_STREET',
LANE_BIKE_LANE: 'LANE_BIKE_LANE'
}
@classmethod
def from_waymo(cls, item):
return cls.ENUM_TO_STR[item]
class WaymoRoadLineType:
UNKNOWN = 0
BROKEN_SINGLE_WHITE = 1
SOLID_SINGLE_WHITE = 2
SOLID_DOUBLE_WHITE = 3
BROKEN_SINGLE_YELLOW = 4
BROKEN_DOUBLE_YELLOW = 5
SOLID_SINGLE_YELLOW = 6
SOLID_DOUBLE_YELLOW = 7
PASSING_DOUBLE_YELLOW = 8
ENUM_TO_STR = {
UNKNOWN: 'UNKNOWN',
BROKEN_SINGLE_WHITE: 'ROAD_LINE_BROKEN_SINGLE_WHITE',
SOLID_SINGLE_WHITE: 'ROAD_LINE_SOLID_SINGLE_WHITE',
SOLID_DOUBLE_WHITE: 'ROAD_LINE_SOLID_DOUBLE_WHITE',
BROKEN_SINGLE_YELLOW: 'ROAD_LINE_BROKEN_SINGLE_YELLOW',
BROKEN_DOUBLE_YELLOW: 'ROAD_LINE_BROKEN_DOUBLE_YELLOW',
SOLID_SINGLE_YELLOW: 'ROAD_LINE_SOLID_SINGLE_YELLOW',
SOLID_DOUBLE_YELLOW: 'ROAD_LINE_SOLID_DOUBLE_YELLOW',
PASSING_DOUBLE_YELLOW: 'ROAD_LINE_PASSING_DOUBLE_YELLOW'
}
@classmethod
def from_waymo(cls, item):
return cls.ENUM_TO_STR[item]
class WaymoRoadEdgeType:
UNKNOWN = 0
# Physical road boundary that doesn't have traffic on the other side (e.g., a curb or the k-rail on the right side of a freeway).
BOUNDARY = 1
# Physical road boundary that separates the car from other traffic (e.g. a k-rail or an island).
MEDIAN = 2
ENUM_TO_STR = {UNKNOWN: 'UNKNOWN', BOUNDARY: 'ROAD_EDGE_BOUNDARY', MEDIAN: 'ROAD_EDGE_MEDIAN'}
@classmethod
def from_waymo(cls, item):
return cls.ENUM_TO_STR[item]
class WaymoAgentType:
UNSET = 0
VEHICLE = 1
PEDESTRIAN = 2
CYCLIST = 3
OTHER = 4
ENUM_TO_STR = {UNSET: 'UNSET', VEHICLE: 'VEHICLE', PEDESTRIAN: 'PEDESTRIAN', CYCLIST: 'CYCLIST', OTHER: 'OTHER'}
@classmethod
def from_waymo(cls, item):
return cls.ENUM_TO_STR[item]

View File

@@ -0,0 +1,471 @@
import logging
import multiprocessing
import os
import pickle
from scenarionet.converter.utils import mph_to_kmh
from scenarionet.converter.waymo.type import WaymoLaneType, WaymoAgentType, WaymoRoadLineType, WaymoRoadEdgeType
logger = logging.getLogger(__name__)
import numpy as np
try:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # FATAL
logging.getLogger('tensorflow').setLevel(logging.FATAL)
import tensorflow as tf
except ImportError as e:
logger.info(e)
try:
from waymo_open_dataset.protos import scenario_pb2
except ImportError as e:
logger.warning(e, "\n Please install waymo_open_dataset package: pip install waymo-open-dataset-tf-2-11-0==1.5.0")
from metadrive.scenario import ScenarioDescription as SD
from metadrive.type import MetaDriveType
SPLIT_KEY = "|"
def extract_poly(message):
x = [i.x for i in message]
y = [i.y for i in message]
z = [i.z for i in message]
coord = np.stack((x, y, z), axis=1).astype("float32")
return coord
def extract_boundaries(fb):
b = []
# b = np.zeros([len(fb), 4], dtype="int64")
for k in range(len(fb)):
c = dict()
c["lane_start_index"] = fb[k].lane_start_index
c["lane_end_index"] = fb[k].lane_end_index
c["boundary_type"] = WaymoRoadLineType.from_waymo(fb[k].boundary_type)
c["boundary_feature_id"] = fb[k].boundary_feature_id
for key in c:
c[key] = str(c[key])
b.append(c)
return b
def extract_neighbors(fb):
nbs = []
for k in range(len(fb)):
nb = dict()
nb["feature_id"] = fb[k].feature_id
nb["self_start_index"] = fb[k].self_start_index
nb["self_end_index"] = fb[k].self_end_index
nb["neighbor_start_index"] = fb[k].neighbor_start_index
nb["neighbor_end_index"] = fb[k].neighbor_end_index
for key in nb:
nb[key] = str(nb[key])
nb["boundaries"] = extract_boundaries(fb[k].boundaries)
nbs.append(nb)
return nbs
def extract_center(f):
center = dict()
f = f.lane
center["speed_limit_mph"] = f.speed_limit_mph
center["speed_limit_kmh"] = mph_to_kmh(f.speed_limit_mph)
center["type"] = WaymoLaneType.from_waymo(f.type)
center["polyline"] = extract_poly(f.polyline)
center["interpolating"] = f.interpolating
center["entry_lanes"] = [x for x in f.entry_lanes]
center["exit_lanes"] = [x for x in f.exit_lanes]
center["left_boundaries"] = extract_boundaries(f.left_boundaries)
center["right_boundaries"] = extract_boundaries(f.right_boundaries)
center["left_neighbor"] = extract_neighbors(f.left_neighbors)
center["right_neighbor"] = extract_neighbors(f.right_neighbors)
return center
def extract_line(f):
line = dict()
f = f.road_line
line["type"] = WaymoRoadLineType.from_waymo(f.type)
line["polyline"] = extract_poly(f.polyline)
return line
def extract_edge(f):
edge = dict()
f_ = f.road_edge
edge["type"] = WaymoRoadEdgeType.from_waymo(f_.type)
edge["polyline"] = extract_poly(f_.polyline)
return edge
def extract_stop(f):
stop = dict()
f = f.stop_sign
stop["type"] = MetaDriveType.STOP_SIGN
stop["lane"] = [x for x in f.lane]
stop["position"] = np.array([f.position.x, f.position.y, f.position.z], dtype="float32")
return stop
def extract_crosswalk(f):
cross_walk = dict()
f = f.crosswalk
cross_walk["type"] = MetaDriveType.CROSSWALK
cross_walk["polygon"] = extract_poly(f.polygon)
return cross_walk
def extract_bump(f):
speed_bump_data = dict()
f = f.speed_bump
speed_bump_data["type"] = MetaDriveType.SPEED_BUMP
speed_bump_data["polygon"] = extract_poly(f.polygon)
return speed_bump_data
def extract_driveway(f):
driveway_data = dict()
f = f.driveway
driveway_data["type"] = MetaDriveType.DRIVEWAY
driveway_data["polygon"] = extract_poly(f.polygon)
return driveway_data
def extract_tracks(tracks, sdc_idx, track_length):
ret = dict()
def _object_state_template(object_id):
return dict(
type=None,
state=dict(
# Never add extra dim if the value is scalar.
position=np.zeros([track_length, 3], dtype=np.float32),
length=np.zeros([track_length], dtype=np.float32),
width=np.zeros([track_length], dtype=np.float32),
height=np.zeros([track_length], dtype=np.float32),
heading=np.zeros([track_length], dtype=np.float32),
velocity=np.zeros([track_length, 2], dtype=np.float32),
valid=np.zeros([track_length], dtype=bool),
),
metadata=dict(track_length=track_length, type=None, object_id=object_id, dataset="waymo")
)
for obj in tracks:
object_id = str(obj.id)
obj_state = _object_state_template(object_id)
waymo_string = WaymoAgentType.from_waymo(obj.object_type) # Load waymo type string
metadrive_type = MetaDriveType.from_waymo(waymo_string) # Transform it to Waymo type string
obj_state["type"] = metadrive_type
for step_count, state in enumerate(obj.states):
if step_count >= track_length:
break
obj_state["state"]["position"][step_count][0] = state.center_x
obj_state["state"]["position"][step_count][1] = state.center_y
obj_state["state"]["position"][step_count][2] = state.center_z
# l = [state.length for state in obj.states]
# w = [state.width for state in obj.states]
# h = [state.height for state in obj.states]
# obj_state["state"]["size"] = np.stack([l, w, h], 1).astype("float32")
obj_state["state"]["length"][step_count] = state.length
obj_state["state"]["width"][step_count] = state.width
obj_state["state"]["height"][step_count] = state.height
# heading = [state.heading for state in obj.states]
obj_state["state"]["heading"][step_count] = state.heading
obj_state["state"]["velocity"][step_count][0] = state.velocity_x
obj_state["state"]["velocity"][step_count][1] = state.velocity_y
obj_state["state"]["valid"][step_count] = state.valid
obj_state["metadata"]["type"] = metadrive_type
ret[object_id] = obj_state
return ret, str(tracks[sdc_idx].id)
def extract_map_features(map_features):
ret = {}
for lane_state in map_features:
lane_id = str(lane_state.id)
if lane_state.HasField("lane"):
ret[lane_id] = extract_center(lane_state)
if lane_state.HasField("road_line"):
ret[lane_id] = extract_line(lane_state)
if lane_state.HasField("road_edge"):
ret[lane_id] = extract_edge(lane_state)
if lane_state.HasField("stop_sign"):
ret[lane_id] = extract_stop(lane_state)
if lane_state.HasField("crosswalk"):
ret[lane_id] = extract_crosswalk(lane_state)
if lane_state.HasField("speed_bump"):
ret[lane_id] = extract_bump(lane_state)
# Supported only in Waymo dataset 1.2.0
if lane_state.HasField("driveway"):
ret[lane_id] = extract_driveway(lane_state)
return ret
def extract_dynamic_map_states(dynamic_map_states, track_length):
processed_dynamics_map_states = {}
def _traffic_light_state_template(object_id):
return dict(
type=MetaDriveType.TRAFFIC_LIGHT,
state=dict(object_state=[None] * track_length),
lane=None,
stop_point=np.zeros([
3,
], dtype=np.float32),
metadata=dict(
track_length=track_length, type=MetaDriveType.TRAFFIC_LIGHT, object_id=object_id, dataset="waymo"
)
)
for step_count, step_states in enumerate(dynamic_map_states):
# Each step_states is the state of all objects in one time step
lane_states = step_states.lane_states
if step_count >= track_length:
break
for object_state in lane_states:
lane = object_state.lane
object_id = str(lane) # Always use string to specify object id
# We will use lane index to serve as the traffic light index.
if object_id not in processed_dynamics_map_states:
processed_dynamics_map_states[object_id] = _traffic_light_state_template(object_id=object_id)
if processed_dynamics_map_states[object_id]["lane"] is not None:
assert lane == processed_dynamics_map_states[object_id]["lane"]
else:
processed_dynamics_map_states[object_id]["lane"] = lane
object_state_string = object_state.State.Name(object_state.state)
processed_dynamics_map_states[object_id]["state"]["object_state"][step_count] = object_state_string
processed_dynamics_map_states[object_id]["stop_point"][0] = object_state.stop_point.x
processed_dynamics_map_states[object_id]["stop_point"][1] = object_state.stop_point.y
processed_dynamics_map_states[object_id]["stop_point"][2] = object_state.stop_point.z
for obj in processed_dynamics_map_states.values():
assert len(obj["state"]["object_state"]) == obj["metadata"]["track_length"]
return processed_dynamics_map_states
class CustomUnpickler(pickle.Unpickler):
def __init__(self, load_old_scenario, *args, **kwargs):
raise DeprecationWarning("Now we don't pickle any customized data type, so this class is deprecated now")
super(CustomUnpickler, self).__init__(*args, **kwargs)
self.load_old_scenario = load_old_scenario
def find_class(self, module, name):
if self.load_old_scenario:
raise ValueError("Old scenario is completely deprecated. Can't load it any more.")
if name == "AgentType":
return AgentTypeClass
elif name == "RoadLineType":
return RoadLineTypeClass
elif name == "RoadEdgeType":
return RoadEdgeTypeClass
return super().find_class(module, name)
else:
return super().find_class(module, name)
# return the nearest point"s index of the line
def nearest_point(point, line):
dist = np.square(line - point)
dist = np.sqrt(dist[:, 0] + dist[:, 1])
return np.argmin(dist)
def extract_width(map, polyline, boundary):
l_width = np.zeros(polyline.shape[0], dtype="float32")
for b in boundary:
boundary_int = {k: int(v) if k != "boundary_type" else v for k, v in b.items()} # All values are int
b_feat_id = str(boundary_int["boundary_feature_id"])
lb = map[b_feat_id]
b_polyline = lb["polyline"][:, :2]
start_p = polyline[boundary_int["lane_start_index"]]
start_index = nearest_point(start_p, b_polyline)
seg_len = boundary_int["lane_end_index"] - boundary_int["lane_start_index"]
end_index = min(start_index + seg_len, lb["polyline"].shape[0] - 1)
length = min(end_index - start_index, seg_len) + 1
self_range = range(boundary_int["lane_start_index"], boundary_int["lane_start_index"] + length)
bound_range = range(start_index, start_index + length)
centerLane = polyline[self_range]
bound = b_polyline[bound_range]
dist = np.square(centerLane - bound)
dist = np.sqrt(dist[:, 0] + dist[:, 1])
l_width[self_range] = dist
return l_width
def compute_width(map):
for map_feat_id, lane in map.items():
if not "LANE" in lane["type"]:
continue
width = np.zeros((lane["polyline"].shape[0], 2), dtype="float32")
width[:, 0] = extract_width(map, lane["polyline"][:, :2], lane["left_boundaries"])
width[:, 1] = extract_width(map, lane["polyline"][:, :2], lane["right_boundaries"])
width[width[:, 0] == 0, 0] = width[width[:, 0] == 0, 1]
width[width[:, 1] == 0, 1] = width[width[:, 1] == 0, 0]
lane["width"] = width
return
def convert_waymo_scenario(scenario, version):
scenario = scenario
md_scenario = SD()
id_end = scenario.scenario_id.find(SPLIT_KEY)
md_scenario[SD.ID] = scenario.scenario_id[:id_end]
md_scenario[SD.VERSION] = version
# Please note that SDC track index is not identical to sdc_id.
# sdc_id is a unique indicator to a track, while sdc_track_index is only the index of the sdc track
# in the tracks datastructure.
track_length = len(list(scenario.timestamps_seconds))
tracks, sdc_id = extract_tracks(scenario.tracks, scenario.sdc_track_index, track_length)
md_scenario[SD.LENGTH] = track_length
md_scenario[SD.TRACKS] = tracks
dynamic_states = extract_dynamic_map_states(scenario.dynamic_map_states, track_length)
md_scenario[SD.DYNAMIC_MAP_STATES] = dynamic_states
map_features = extract_map_features(scenario.map_features)
md_scenario[SD.MAP_FEATURES] = map_features
compute_width(md_scenario[SD.MAP_FEATURES])
md_scenario[SD.METADATA] = {}
md_scenario[SD.METADATA][SD.ID] = md_scenario[SD.ID]
md_scenario[SD.METADATA][SD.COORDINATE] = MetaDriveType.COORDINATE_WAYMO
md_scenario[SD.METADATA][SD.TIMESTEP] = np.asarray(list(scenario.timestamps_seconds), dtype=np.float32)
md_scenario[SD.METADATA][SD.METADRIVE_PROCESSED] = False
md_scenario[SD.METADATA][SD.SDC_ID] = str(sdc_id)
md_scenario[SD.METADATA]["dataset"] = "waymo"
md_scenario[SD.METADATA]["scenario_id"] = scenario.scenario_id[:id_end]
# TODO LQY Can we infer it?
md_scenario[SD.METADATA]["source_file"] = scenario.scenario_id[id_end + 1:]
md_scenario[SD.METADATA]["track_length"] = track_length
# === Waymo specific data. Storing them here ===
md_scenario[SD.METADATA]["current_time_index"] = scenario.current_time_index
md_scenario[SD.METADATA]["sdc_track_index"] = scenario.sdc_track_index
# obj id
md_scenario[SD.METADATA]["objects_of_interest"] = [str(obj) for obj in scenario.objects_of_interest]
track_index = [obj.track_index for obj in scenario.tracks_to_predict]
track_id = [str(scenario.tracks[ind].id) for ind in track_index]
track_difficulty = [obj.difficulty for obj in scenario.tracks_to_predict]
track_obj_type = [tracks[id]["type"] for id in track_id]
md_scenario[SD.METADATA]["tracks_to_predict"] = {
id: {
"track_index": track_index[count],
"track_id": id,
"difficulty": track_difficulty[count],
"object_type": track_obj_type[count]
}
for count, id in enumerate(track_id)
}
return md_scenario
def get_waymo_scenarios(waymo_data_directory, num_workers=8):
# parse raw data from input path to output path,
# there is 1000 raw data in google cloud, each of them produce about 500 pkl file
file_list = os.listdir(waymo_data_directory)
num_files = len(file_list)
if num_files < num_workers:
# single process
logger.info("Use one worker, as num_scenario < num_workers:")
num_workers = 1
argument_list = []
num_files_each_worker = int(num_files // num_workers)
for i in range(num_workers):
if i == num_workers - 1:
end_idx = num_files
else:
end_idx = (i + 1) * num_files_each_worker
argument_list.append([waymo_data_directory, file_list[i * num_files_each_worker:end_idx]])
# Run, workers and process result from worker
with multiprocessing.Pool(num_workers) as p:
all_result = list(p.imap(read_from_files, argument_list))
ret = []
# get result
for r in all_result:
if len(r) == 0:
logger.warning("0 scenarios found")
ret += r
logger.info("\n Find {} waymo scenarios from {} files".format(len(ret), num_files))
return ret
def read_from_files(arg):
waymo_data_directory, file_list = arg[0], arg[1]
scenarios = []
for file_count, file in enumerate(file_list):
file_path = os.path.join(waymo_data_directory, file)
if ("tfrecord" not in file_path) or (not os.path.isfile(file_path)):
continue
for data in tf.data.TFRecordDataset(file_path, compression_type="").as_numpy_iterator():
scenario = scenario_pb2.Scenario()
scenario.ParseFromString(data)
# a trick for loging file name
scenario.scenario_id = scenario.scenario_id + SPLIT_KEY + file
scenarios.append(scenario)
return scenarios

View File

View File

@@ -0,0 +1,22 @@
import argparse
from scenarionet.builder.filters import ScenarioFilter
from scenarionet.builder.utils import combine_multiple_dataset
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--to", required=True, help="Dataset path, a directory")
parser.add_argument('--from_datasets', required=True, nargs='+', default=[])
parser.add_argument("--overwrite", action="store_true", help="If the dataset_path exists, overwrite it")
parser.add_argument("--sdc_moving_dist_min", default=0, help="Selecting case with sdc_moving_dist > this value")
args = parser.parse_args()
filters = [ScenarioFilter.make(ScenarioFilter.sdc_moving_dist, target_dist=20, condition="greater")]
if len(args.from_datasets) != 0:
combine_multiple_dataset(
args.to,
*args.from_datasets,
force_overwrite=args.overwrite,
try_generate_missing_file=True,
filters=filters
)

View File

@@ -0,0 +1,42 @@
import argparse
import os
from scenarionet import SCENARIONET_DATASET_PATH
from scenarionet.converter.nuplan.utils import get_nuplan_scenarios, convert_nuplan_scenario
from scenarionet.converter.utils import write_to_directory
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset_name", "-n", default="nuplan", help="Dataset name, will be used to generate scenario files"
)
parser.add_argument(
"--dataset_path",
"-d",
default=os.path.join(SCENARIONET_DATASET_PATH, "nuplan"),
help="The path of the dataset"
)
parser.add_argument("--version", "-v", default='v1.1', help="version")
parser.add_argument("--overwrite", action="store_true", help="If the dataset_path exists, overwrite it")
parser.add_argument("--num_workers", type=int, default=8, help="number of workers to use")
args = parser.parse_args()
force_overwrite = args.overwrite
dataset_name = args.dataset_name
output_path = args.dataset_path
version = args.version
data_root = os.path.join(os.getenv("NUPLAN_DATA_ROOT"), "nuplan-v1.1/splits/mini")
map_root = os.getenv("NUPLAN_MAPS_ROOT")
scenarios = get_nuplan_scenarios(data_root, map_root, logs=["2021.07.16.20.45.29_veh-35_01095_01486"])
# scenarios = get_nuplan_scenarios(data_root, map_root)
write_to_directory(
convert_func=convert_nuplan_scenario,
scenarios=scenarios,
output_path=output_path,
dataset_version=version,
dataset_name=dataset_name,
force_overwrite=force_overwrite,
num_workers=args.num_workers
)

View File

@@ -0,0 +1,41 @@
import argparse
import os.path
from scenarionet import SCENARIONET_DATASET_PATH
from scenarionet.converter.nuscenes.utils import convert_nuscenes_scenario, get_nuscenes_scenarios
from scenarionet.converter.utils import write_to_directory
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset_name", "-n", default="nuscenes", help="Dataset name, will be used to generate scenario files"
)
parser.add_argument(
"--dataset_path",
"-d",
default=os.path.join(SCENARIONET_DATASET_PATH, "nuscenes"),
help="The path of the dataset"
)
parser.add_argument("--version", "-v", default='v1.0-mini', help="version")
parser.add_argument("--overwrite", action="store_true", help="If the dataset_path exists, overwrite it")
parser.add_argument("--num_workers", type=int, default=8, help="number of workers to use")
args = parser.parse_args()
force_overwrite = args.overwrite
dataset_name = args.dataset_name
output_path = args.dataset_path
version = args.version
dataroot = '/home/shady/data/nuscenes'
scenarios, nusc = get_nuscenes_scenarios(dataroot, version)
write_to_directory(
convert_func=convert_nuscenes_scenario,
scenarios=scenarios,
output_path=output_path,
dataset_version=version,
dataset_name=dataset_name,
force_overwrite=force_overwrite,
nuscenes=nusc,
num_workers=args.num_workers
)

View File

@@ -0,0 +1,40 @@
import argparse
import os.path
import metadrive
from metadrive.policy.idm_policy import IDMPolicy
from scenarionet import SCENARIONET_DATASET_PATH
from scenarionet.converter.pg.utils import get_pg_scenarios, convert_pg_scenario
from scenarionet.converter.utils import write_to_directory
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset_name", "-n", default="pg", help="Dataset name, will be used to generate scenario files"
)
parser.add_argument(
"--dataset_path", "-d", default=os.path.join(SCENARIONET_DATASET_PATH, "pg"), help="The path of the dataset"
)
parser.add_argument("--version", "-v", default=metadrive.constants.DATA_VERSION, help="version")
parser.add_argument("--overwrite", action="store_true", help="If the dataset_path exists, overwrite it")
parser.add_argument("--num_workers", type=int, default=8, help="number of workers to use")
args = parser.parse_args()
force_overwrite = args.overwrite
dataset_name = args.dataset_name
output_path = args.dataset_path
version = args.version
scenario_indices, env = get_pg_scenarios(30, IDMPolicy)
write_to_directory(
convert_func=convert_pg_scenario,
scenarios=scenario_indices,
output_path=output_path,
dataset_version=version,
dataset_name=dataset_name,
force_overwrite=force_overwrite,
env=env,
num_workers=args.num_workers
)

View File

@@ -0,0 +1,40 @@
import argparse
import logging
import os
from scenarionet import SCENARIONET_DATASET_PATH
from scenarionet.converter.utils import write_to_directory
from scenarionet.converter.waymo.utils import convert_waymo_scenario, get_waymo_scenarios
logger = logging.getLogger(__name__)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset_name", "-n", default="waymo", help="Dataset name, will be used to generate scenario files"
)
parser.add_argument(
"--dataset_path", "-d", default=os.path.join(SCENARIONET_DATASET_PATH, "waymo"), help="The path of the dataset"
)
parser.add_argument("--version", "-v", default='v1.2', help="version")
parser.add_argument("--overwrite", action="store_true", help="If the dataset_path exists, overwrite it")
parser.add_argument("--num_workers", type=int, default=8, help="number of workers to use")
args = parser.parse_args()
force_overwrite = args.overwrite
dataset_name = args.dataset_name
output_path = args.dataset_path
version = args.version
waymo_data_directory = os.path.join(SCENARIONET_DATASET_PATH, "../waymo_origin")
scenarios = get_waymo_scenarios(waymo_data_directory)
write_to_directory(
convert_func=convert_waymo_scenario,
scenarios=scenarios,
output_path=output_path,
dataset_version=version,
dataset_name=dataset_name,
force_overwrite=force_overwrite,
num_workers=args.num_workers
)

View File

@@ -0,0 +1,12 @@
import argparse
from scenarionet.verifier.error import ErrorFile
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--file", "-f", required=True, help="The path of the error file")
parser.add_argument("--dataset_path", "-d", required=True, help="The path of the generated dataset")
parser.add_argument("--overwrite", action="store_true", help="If the dataset_path exists, overwrite it")
parser.add_argument("--broken", action="store_true", help="Generate dataset containing only broken files")
args = parser.parse_args()
ErrorFile.generate_dataset(args.file, args.dataset_path, args.overwrite, args.broken)

View File

@@ -0,0 +1,52 @@
import argparse
import os
from metadrive.envs.scenario_env import ScenarioEnv
from metadrive.policy.replay_policy import ReplayEgoCarPolicy
from metadrive.scenario.utils import get_number_of_scenarios
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--dataset_path", "-d", required=True, help="The path of the dataset")
parser.add_argument("--render", action="store_true", help="Enable 3D rendering")
parser.add_argument("--scenario_index", default=None, type=int, help="Specifying a scenario to run")
args = parser.parse_args()
dataset_path = os.path.abspath(args.dataset_path)
num_scenario = get_number_of_scenarios(dataset_path)
if args.scenario_index is not None:
assert args.scenario_index < num_scenario, \
"The specified scenario index exceeds the scenario range: {}!".format(num_scenario)
env = ScenarioEnv(
{
"use_render": args.render,
"agent_policy": ReplayEgoCarPolicy,
"manual_control": False,
"show_interface": True,
"show_logo": False,
"show_fps": False,
"num_scenarios": num_scenario,
"horizon": 1000,
"vehicle_config": dict(
show_navi_mark=False,
no_wheel_friction=True,
lidar=dict(num_lasers=120, distance=50, num_others=4),
lane_line_detector=dict(num_lasers=12, distance=50),
side_detector=dict(num_lasers=160, distance=50)
),
"data_directory": dataset_path,
}
)
for seed in range(num_scenario if args.scenario_index is not None else 1000000):
env.reset(force_seed=seed if args.scenario_index is not None else args.scenario_index)
for t in range(10000):
o, r, d, info = env.step([0, 0])
if env.config["use_render"]:
env.render(text={
"seed": env.engine.global_seed + env.config["start_scenario_index"],
})
if d and info["arrive_dest"]:
print("scenario:{}, success".format(env.engine.global_random_seed))
break

View File

@@ -0,0 +1,13 @@
import argparse
from scenarionet.verifier.utils import verify_loading_into_metadrive, set_random_drop
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--dataset_path", required=True, help="Dataset path, a directory")
parser.add_argument("--result_save_dir", required=True, help="Dataset path, a directory")
parser.add_argument("--num_workers", type=int, default=8, help="number of workers to use")
parser.add_argument("--random_drop", action="store_true", help="Randomly make some scenarios fail. for test only!")
args = parser.parse_args()
set_random_drop(args.random_drop)
verify_loading_into_metadrive(args.dataset_path, args.result_save_dir, num_workers=args.num_workers)

View File

@@ -0,0 +1,24 @@
import os
from scenarionet import SCENARIONET_DATASET_PATH, SCENARIONET_PACKAGE_PATH
from scenarionet.builder.utils import combine_multiple_dataset
from scenarionet.verifier.utils import verify_loading_into_metadrive
def _test_combine_dataset():
dataset_paths = [
os.path.join(SCENARIONET_DATASET_PATH, "nuscenes"),
os.path.join(SCENARIONET_DATASET_PATH, "nuplan"),
os.path.join(SCENARIONET_DATASET_PATH, "waymo"),
os.path.join(SCENARIONET_DATASET_PATH, "pg")
]
combine_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "tmp", "combine")
combine_multiple_dataset(combine_path, *dataset_paths, force_overwrite=True, try_generate_missing_file=True)
# os.makedirs("verify_results", exist_ok=True)
# verify_loading_into_metadrive(combine_path, "verify_results")
# assert success
if __name__ == '__main__':
_test_combine_dataset()

View File

@@ -0,0 +1,57 @@
import os
import os.path
from metadrive.type import MetaDriveType
from scenarionet import SCENARIONET_DATASET_PATH, SCENARIONET_PACKAGE_PATH
from scenarionet.builder.filters import ScenarioFilter
from scenarionet.builder.utils import combine_multiple_dataset
def test_filter_dataset():
"""
It is just a runnable test
"""
dataset_paths = [os.path.join(SCENARIONET_DATASET_PATH, "nuscenes")]
dataset_paths.append(os.path.join(SCENARIONET_DATASET_PATH, "nuplan"))
dataset_paths.append(os.path.join(SCENARIONET_DATASET_PATH, "waymo"))
dataset_paths.append(os.path.join(SCENARIONET_DATASET_PATH, "pg"))
output_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "tmp", "combine")
# ========================= test 1 =========================
# nuscenes data has no light
# light_condition = ScenarioFilter.make(ScenarioFilter.has_traffic_light)
sdc_driving_condition = ScenarioFilter.make(ScenarioFilter.sdc_moving_dist, target_dist=30, condition="greater")
summary, mapping = combine_multiple_dataset(
output_path,
*dataset_paths,
force_overwrite=True,
try_generate_missing_file=True,
filters=[sdc_driving_condition]
)
assert len(summary) > 0
# ========================= test 2 =========================
num_condition = ScenarioFilter.make(
ScenarioFilter.object_number, number_threshold=50, object_type=MetaDriveType.PEDESTRIAN, condition="greater"
)
summary, mapping = combine_multiple_dataset(
output_path, *dataset_paths, force_overwrite=True, try_generate_missing_file=True, filters=[num_condition]
)
assert len(summary) > 0
# ========================= test 3 =========================
traffic_light = ScenarioFilter.make(ScenarioFilter.has_traffic_light)
summary, mapping = combine_multiple_dataset(
output_path, *dataset_paths, force_overwrite=True, try_generate_missing_file=True, filters=[traffic_light]
)
assert len(summary) > 0
if __name__ == '__main__':
test_filter_dataset()

View File

@@ -0,0 +1,73 @@
import copy
import os
import os.path
from metadrive.scenario.scenario_description import ScenarioDescription as SD
from scenarionet import SCENARIONET_DATASET_PATH
from scenarionet import SCENARIONET_PACKAGE_PATH
from scenarionet.builder.utils import combine_multiple_dataset
from scenarionet.common_utils import read_dataset_summary, read_scenario
from scenarionet.common_utils import recursive_equal
from scenarionet.verifier.error import ErrorFile
from scenarionet.verifier.utils import set_random_drop
from scenarionet.verifier.utils import verify_loading_into_metadrive
def test_generate_from_error():
set_random_drop(True)
dataset_paths = [
os.path.join(SCENARIONET_DATASET_PATH, "nuscenes"),
os.path.join(SCENARIONET_DATASET_PATH, "nuplan"),
os.path.join(SCENARIONET_DATASET_PATH, "waymo"),
os.path.join(SCENARIONET_DATASET_PATH, "pg")
]
dataset_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "tmp", "combine")
combine_multiple_dataset(dataset_path, *dataset_paths, force_overwrite=True, try_generate_missing_file=True)
summary, sorted_scenarios, mapping = read_dataset_summary(dataset_path)
for scenario_file in sorted_scenarios:
read_scenario(dataset_path, mapping, scenario_file)
success, logs = verify_loading_into_metadrive(
dataset_path, result_save_dir="../test_dataset", steps_to_run=1000, num_workers=16
)
set_random_drop(False)
# get error file
file_name = ErrorFile.get_error_file_name(dataset_path)
error_file_path = os.path.join("../test_dataset", file_name)
# regenerate
pass_dataset = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "tmp", "passed_scenarios")
fail_dataset = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "tmp", "failed_scenarios")
pass_summary, pass_mapping = ErrorFile.generate_dataset(
error_file_path, pass_dataset, force_overwrite=True, broken_scenario=False
)
fail_summary, fail_mapping = ErrorFile.generate_dataset(
error_file_path, fail_dataset, force_overwrite=True, broken_scenario=True
)
# assert
read_pass_summary, _, read_pass_mapping = read_dataset_summary(pass_dataset)
assert recursive_equal(read_pass_summary, pass_summary)
assert recursive_equal(read_pass_mapping, pass_mapping)
read_fail_summary, _, read_fail_mapping, = read_dataset_summary(fail_dataset)
assert recursive_equal(read_fail_mapping, fail_mapping)
assert recursive_equal(read_fail_summary, fail_summary)
# assert pass+fail = origin
all_summaries = copy.deepcopy(read_pass_summary)
all_summaries.update(fail_summary)
assert recursive_equal(all_summaries, summary)
# test read
for scenario in read_pass_summary:
sd = read_scenario(pass_dataset, read_pass_mapping, scenario)
SD.sanity_check(sd)
for scenario in read_fail_summary:
sd = read_scenario(fail_dataset, read_fail_mapping, scenario)
SD.sanity_check(sd)
if __name__ == '__main__':
test_generate_from_error()

View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
python ../../scripts/combine_dataset.py --to ../tmp/test_combine_dataset --from_datasets ../../../dataset/waymo ../../../dataset/pg ../../../dataset/nuscenes ../../../dataset/nuplan --overwrite
python ../../scripts/verify_dataset.py --dataset_path ../tmp/test_combine_dataset --result_save_dir ../tmp/test_combine_dataset --random_drop --num_workers=16
python ../../scripts/generate_from_error_file.py --file ../tmp/test_combine_dataset/error_scenarios_for_test_combine_dataset.json --overwrite --dataset_path ../tmp/verify_pass
python ../../scripts/generate_from_error_file.py --file ../tmp/test_combine_dataset/error_scenarios_for_test_combine_dataset.json --overwrite --dataset_path ../tmp/verify_fail --broken

View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
nohup python ../../scripts/convert_nuplan.py --overwrite > nuplan.log 2>&1 &
nohup python ../../scripts/convert_nuscenes.py --overwrite > nuscenes.log 2>&1 &
nohup python ../../scripts/convert_pg.py --overwrite > pg.log 2>&1 &
nohup python ../../scripts/convert_waymo.py --overwrite > waymo.log 2>&1 &

View File

@@ -0,0 +1,29 @@
"""
This script aims to convert nuscenes scenarios to ScenarioDescription, so that we can load any nuscenes scenarios into
MetaDrive.
"""
import os.path
from scenarionet import SCENARIONET_PACKAGE_PATH
from scenarionet.converter.nuscenes.utils import convert_nuscenes_scenario, get_nuscenes_scenarios
from scenarionet.converter.utils import write_to_directory
if __name__ == "__main__":
dataset_name = "nuscenes"
output_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "test_dataset", dataset_name)
version = 'v1.0-mini'
force_overwrite = True
dataroot = '/home/shady/data/nuscenes'
scenarios, nusc = get_nuscenes_scenarios(dataroot, version)
for i in range(5):
write_to_directory(
convert_func=convert_nuscenes_scenario,
scenarios=scenarios[i * 2:i * 2 + 2],
output_path=output_path + "_{}".format(i),
dataset_version=version,
dataset_name=dataset_name,
force_overwrite=force_overwrite,
nuscenes=nusc
)

View File

@@ -0,0 +1,58 @@
import os
from metadrive.envs.scenario_env import ScenarioEnv
from metadrive.policy.replay_policy import ReplayEgoCarPolicy
from metadrive.scenario.utils import get_number_of_scenarios
from scenarionet import SCENARIONET_DATASET_PATH, SCENARIONET_PACKAGE_PATH
from scenarionet.builder.utils import combine_multiple_dataset
if __name__ == '__main__':
dataset_paths = [os.path.join(SCENARIONET_DATASET_PATH, "nuscenes")]
dataset_paths.append(os.path.join(SCENARIONET_DATASET_PATH, "nuplan"))
dataset_paths.append(os.path.join(SCENARIONET_DATASET_PATH, "waymo"))
dataset_paths.append(os.path.join(SCENARIONET_DATASET_PATH, "pg"))
combine_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "tmp", "combine")
combine_multiple_dataset(combine_path, *dataset_paths, force_overwrite=True, try_generate_missing_file=True)
env = ScenarioEnv(
{
"use_render": True,
"agent_policy": ReplayEgoCarPolicy,
"manual_control": False,
"show_interface": True,
"debug": False,
"show_logo": False,
"show_fps": False,
"force_reuse_object_name": True,
"num_scenarios": get_number_of_scenarios(combine_path),
"horizon": 1000,
"no_static_vehicles": True,
"vehicle_config": dict(
show_navi_mark=False,
no_wheel_friction=True,
lidar=dict(num_lasers=120, distance=50, num_others=4),
lane_line_detector=dict(num_lasers=12, distance=50),
side_detector=dict(num_lasers=160, distance=50)
),
"data_directory": combine_path,
}
)
success = []
env.reset(force_seed=91)
while True:
env.reset(force_seed=91)
for t in range(10000):
o, r, d, info = env.step([0, 0])
assert env.observation_space.contains(o)
c_lane = env.vehicle.lane
long, lat, = c_lane.local_coordinates(env.vehicle.position)
if env.config["use_render"]:
env.render(text={
"seed": env.engine.global_seed + env.config["start_scenario_index"],
})
if d and info["arrive_dest"]:
print("seed:{}, success".format(env.engine.global_random_seed))
break

View File

@@ -0,0 +1,30 @@
import os
import os.path
from scenarionet import SCENARIONET_PACKAGE_PATH
from scenarionet.builder.utils import combine_multiple_dataset
from scenarionet.common_utils import read_dataset_summary, read_scenario
from scenarionet.verifier.utils import verify_loading_into_metadrive
def test_combine_multiple_dataset():
dataset_name = "nuscenes"
original_dataset_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "test_dataset", dataset_name)
test_dataset_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "test_dataset")
dataset_paths = [original_dataset_path + "_{}".format(i) for i in range(5)]
output_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "tmp", "combine")
combine_multiple_dataset(output_path, *dataset_paths, force_overwrite=True, try_generate_missing_file=True)
dataset_paths.append(output_path)
for dataset_path in dataset_paths:
summary, sorted_scenarios, mapping = read_dataset_summary(dataset_path)
for scenario_file in sorted_scenarios:
read_scenario(dataset_path, mapping, scenario_file)
success, result = verify_loading_into_metadrive(
dataset_path, result_save_dir=test_dataset_path, steps_to_run=1000, num_workers=4
)
assert success
if __name__ == '__main__':
test_combine_multiple_dataset()

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,72 @@
import os
import os.path
from metadrive.type import MetaDriveType
from scenarionet import SCENARIONET_PACKAGE_PATH
from scenarionet.builder.filters import ScenarioFilter
from scenarionet.builder.utils import combine_multiple_dataset
def test_filter_dataset():
dataset_name = "nuscenes"
original_dataset_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "test_dataset", dataset_name)
dataset_paths = [original_dataset_path + "_{}".format(i) for i in range(5)]
output_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "tmp", "combine")
# ========================= test 1 =========================
# nuscenes data has no light
# light_condition = ScenarioFilter.make(ScenarioFilter.has_traffic_light)
sdc_driving_condition = ScenarioFilter.make(ScenarioFilter.sdc_moving_dist, target_dist=30, condition="smaller")
answer = ['sd_nuscenes_v1.0-mini_scene-0553.pkl', '0.pkl', 'sd_nuscenes_v1.0-mini_scene-1100.pkl']
summary, mapping = combine_multiple_dataset(
output_path,
*dataset_paths,
force_overwrite=True,
try_generate_missing_file=True,
filters=[sdc_driving_condition]
)
assert len(answer) == len(summary)
for a in answer:
in_ = False
for s in summary:
if a in s:
in_ = True
break
assert in_, summary.keys()
sdc_driving_condition = ScenarioFilter.make(ScenarioFilter.sdc_moving_dist, target_dist=5, condition="greater")
summary, mapping = combine_multiple_dataset(
output_path,
*dataset_paths,
force_overwrite=True,
try_generate_missing_file=True,
filters=[sdc_driving_condition]
)
assert len(summary) == 8
# ========================= test 2 =========================
num_condition = ScenarioFilter.make(
ScenarioFilter.object_number, number_threshold=50, object_type=MetaDriveType.PEDESTRIAN, condition="greater"
)
answer = ['sd_nuscenes_v1.0-mini_scene-0061.pkl', 'sd_nuscenes_v1.0-mini_scene-1094.pkl']
summary, mapping = combine_multiple_dataset(
output_path, *dataset_paths, force_overwrite=True, try_generate_missing_file=True, filters=[num_condition]
)
assert len(answer) == len(summary)
for a in answer:
assert a in summary
num_condition = ScenarioFilter.make(ScenarioFilter.object_number, number_threshold=50, condition="greater")
summary, mapping = combine_multiple_dataset(
output_path, *dataset_paths, force_overwrite=True, try_generate_missing_file=True, filters=[num_condition]
)
assert len(summary) > 0
if __name__ == '__main__':
test_filter_dataset()

View File

@@ -0,0 +1,67 @@
import copy
import os
import os.path
from metadrive.scenario.scenario_description import ScenarioDescription as SD
from scenarionet import SCENARIONET_PACKAGE_PATH
from scenarionet.builder.utils import combine_multiple_dataset
from scenarionet.common_utils import read_dataset_summary, read_scenario
from scenarionet.common_utils import recursive_equal
from scenarionet.verifier.error import ErrorFile
from scenarionet.verifier.utils import verify_loading_into_metadrive, set_random_drop
def test_generate_from_error():
set_random_drop(True)
dataset_name = "nuscenes"
original_dataset_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "test_dataset", dataset_name)
test_dataset_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "test_dataset")
dataset_paths = [original_dataset_path + "_{}".format(i) for i in range(5)]
dataset_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "tmp", "combine")
combine_multiple_dataset(dataset_path, *dataset_paths, force_overwrite=True, try_generate_missing_file=True)
summary, sorted_scenarios, mapping = read_dataset_summary(dataset_path)
for scenario_file in sorted_scenarios:
read_scenario(dataset_path, mapping, scenario_file)
success, logs = verify_loading_into_metadrive(
dataset_path, result_save_dir=test_dataset_path, steps_to_run=1000, num_workers=3
)
set_random_drop(False)
# get error file
file_name = ErrorFile.get_error_file_name(dataset_path)
error_file_path = os.path.join("test_dataset", file_name)
# regenerate
pass_dataset = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "tmp", "passed_senarios")
fail_dataset = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "tmp", "failed_scenarios")
pass_summary, pass_mapping = ErrorFile.generate_dataset(
error_file_path, pass_dataset, force_overwrite=True, broken_scenario=False
)
fail_summary, fail_mapping = ErrorFile.generate_dataset(
error_file_path, fail_dataset, force_overwrite=True, broken_scenario=True
)
# assert
read_pass_summary, _, read_pass_mapping = read_dataset_summary(pass_dataset)
assert recursive_equal(read_pass_summary, pass_summary)
assert recursive_equal(read_pass_mapping, pass_mapping)
read_fail_summary, _, read_fail_mapping, = read_dataset_summary(fail_dataset)
assert recursive_equal(read_fail_mapping, fail_mapping)
assert recursive_equal(read_fail_summary, fail_summary)
# assert pass+fail = origin
all_summaries = copy.deepcopy(read_pass_summary)
all_summaries.update(fail_summary)
assert recursive_equal(all_summaries, summary)
# test read
for scenario in read_pass_summary:
sd = read_scenario(pass_dataset, read_pass_mapping, scenario)
SD.sanity_check(sd)
for scenario in read_fail_summary:
sd = read_scenario(fail_dataset, read_fail_mapping, scenario)
SD.sanity_check(sd)
if __name__ == '__main__':
test_generate_from_error()

View File

View File

@@ -0,0 +1,103 @@
import json
import shutil
import logging
import os
from typing import List
from metadrive.scenario.scenario_description import ScenarioDescription as SD
from scenarionet.common_utils import save_summary_anda_mapping, read_dataset_summary
logger = logging.getLogger(__name__)
class ErrorDescription:
INDEX = "scenario_index"
PATH = "file_path"
FILE_NAME = "file_name"
ERROR = "error_message"
METADATA = "metadata"
@classmethod
def make(cls, scenario_index, file_path, file_name, error):
logger.warning(
"\n Scenario Error, "
"scenario_index: {}, file_path: {}.\n Error message: {}".format(scenario_index, file_path, str(error))
)
return {cls.INDEX: scenario_index, cls.PATH: file_path, cls.FILE_NAME: file_name, cls.ERROR: str(error)}
class ErrorFile:
PREFIX = "error_scenarios_for"
DATASET = "dataset_path"
ERRORS = "errors"
@classmethod
def get_error_file_name(cls, dataset_path):
return "{}_{}.json".format(cls.PREFIX, os.path.basename(dataset_path))
@classmethod
def dump(cls, save_dir, errors: List, dataset_path):
"""
Save test result
:param save_dir: which dir to save this file
:param errors: error list, containing a list of dict from ErrorDescription.make()
:param dataset_path: dataset_path, the dir of dataset_summary.pkl
"""
file_name = cls.get_error_file_name(dataset_path)
path = os.path.join(save_dir, file_name)
with open(path, "w+") as f:
json.dump({cls.DATASET: dataset_path, cls.ERRORS: errors}, f, indent=4)
return path
@classmethod
def generate_dataset(cls, error_file_path, new_dataset_path, force_overwrite=False, broken_scenario=False):
"""
Generate a new dataset containing all broken scenarios or all good scenarios
:param error_file_path: error file path
:param new_dataset_path: a directory where you want to store your data
:param force_overwrite: if new_dataset_path exists, whether to overwrite
:param broken_scenario: generate broken scenarios. You can generate such a broken scenarios for debugging
:return: dataset summary, dataset mapping
"""
# TODO Add test!
new_dataset_path = os.path.abspath(new_dataset_path)
if os.path.exists(new_dataset_path):
if force_overwrite:
shutil.rmtree(new_dataset_path)
else:
raise ValueError(
"Directory: {} already exists! "
"Set force_overwrite=True to overwrite".format(new_dataset_path)
)
os.makedirs(new_dataset_path, exist_ok=False)
with open(error_file_path, "r+") as f:
error_file = json.load(f)
origin_dataset_path = error_file[cls.DATASET]
origin_summary, origin_list, origin_mapping = read_dataset_summary(origin_dataset_path)
errors = error_file[cls.ERRORS]
# make new summary
new_summary = {}
new_mapping = {}
new_summary_file_path = os.path.join(new_dataset_path, SD.DATASET.SUMMARY_FILE)
new_mapping_file_path = os.path.join(new_dataset_path, SD.DATASET.MAPPING_FILE)
if broken_scenario:
for error in errors:
file_name = error[ErrorDescription.FILE_NAME]
new_summary[file_name] = origin_summary[file_name]
scenario_dir = os.path.join(origin_dataset_path, origin_mapping[file_name])
new_mapping[file_name] = os.path.relpath(scenario_dir, new_dataset_path)
else:
error_scenario = [error[ErrorDescription.FILE_NAME] for error in errors]
for scenario in origin_summary:
if scenario in error_scenario:
continue
new_summary[scenario] = origin_summary[scenario]
scenario_dir = os.path.join(origin_dataset_path, origin_mapping[scenario])
new_mapping[scenario] = os.path.relpath(scenario_dir, new_dataset_path)
save_summary_anda_mapping(new_summary_file_path, new_mapping_file_path, new_summary, new_mapping)
return new_summary, new_mapping

View File

@@ -0,0 +1,116 @@
import logging
import multiprocessing
import os
import numpy as np
from scenarionet.verifier.error import ErrorDescription as ED
from scenarionet.verifier.error import ErrorFile as EF
logger = logging.getLogger(__name__)
import tqdm
from metadrive.envs.scenario_env import ScenarioEnv
from metadrive.policy.replay_policy import ReplayEgoCarPolicy
from metadrive.scenario.utils import get_number_of_scenarios
from functools import partial
# this global variable is for generating broken scenarios for testing
RANDOM_DROP = False
def set_random_drop(drop):
global RANDOM_DROP
RANDOM_DROP = drop
def verify_loading_into_metadrive(dataset_path, result_save_dir, steps_to_run=1000, num_workers=8):
assert os.path.isdir(result_save_dir), "result_save_dir must be a dir, get {}".format(result_save_dir)
os.makedirs(result_save_dir, exist_ok=True)
num_scenario = get_number_of_scenarios(dataset_path)
if num_scenario < num_workers:
# single process
logger.info("Use one worker, as num_scenario < num_workers:")
num_workers = 1
# prepare arguments
argument_list = []
func = partial(loading_wrapper, dataset_path=dataset_path, steps_to_run=steps_to_run)
num_scenario_each_worker = int(num_scenario // num_workers)
for i in range(num_workers):
if i == num_workers - 1:
scenario_num = num_scenario - num_scenario_each_worker * (num_workers - 1)
else:
scenario_num = num_scenario_each_worker
argument_list.append([i * num_scenario_each_worker, scenario_num])
# Run, workers and process result from worker
with multiprocessing.Pool(num_workers) as p:
all_result = list(p.imap(func, argument_list))
success = all([i[0] for i in all_result])
errors = []
for _, error in all_result:
errors += error
# logging
if success:
logger.info("All scenarios can be loaded successfully!")
else:
# save result
path = EF.dump(result_save_dir, errors, dataset_path)
logger.info(
"Fail to load all scenarios. Number of failed scenarios: {}. "
"See: {} more details! ".format(len(errors), path)
)
return success, errors
def loading_into_metadrive(start_scenario_index, num_scenario, dataset_path, steps_to_run, metadrive_config=None):
global RANDOM_DROP
logger.info(
"================ Begin Scenario Loading Verification for scenario {}-{} ================ \n".format(
start_scenario_index, num_scenario + start_scenario_index
)
)
success = True
metadrive_config = metadrive_config or {}
metadrive_config.update(
{
"agent_policy": ReplayEgoCarPolicy,
"num_scenarios": num_scenario,
"horizon": 1000,
"start_scenario_index": start_scenario_index,
"no_static_vehicles": False,
"data_directory": dataset_path,
}
)
env = ScenarioEnv(metadrive_config)
logging.disable(logging.INFO)
error_msgs = []
desc = "Scenarios: {}-{}".format(start_scenario_index, start_scenario_index + num_scenario)
for scenario_index in tqdm.tqdm(range(start_scenario_index, start_scenario_index + num_scenario), desc=desc):
try:
env.reset(force_seed=scenario_index)
arrive = False
if RANDOM_DROP and np.random.rand() < 0.5:
raise ValueError("Random Drop")
for _ in range(steps_to_run):
o, r, d, info = env.step([0, 0])
if d and info["arrive_dest"]:
arrive = True
assert arrive, "Can not arrive destination"
except Exception as e:
file_name = env.engine.data_manager.summary_lookup[scenario_index]
file_path = os.path.join(dataset_path, env.engine.data_manager.mapping[file_name], file_name)
error_msg = ED.make(scenario_index, file_path, file_name, str(e))
error_msgs.append(error_msg)
success = False
# proceed to next scenario
continue
env.close()
return success, error_msgs
def loading_wrapper(arglist, dataset_path, steps_to_run):
assert len(arglist) == 2, "Too much arguments!"
return loading_into_metadrive(arglist[0], arglist[1], dataset_path=dataset_path, steps_to_run=steps_to_run)

91
setup.py Normal file
View File

@@ -0,0 +1,91 @@
# Please don't change the order of following packages!
import os
import sys
from os import path
from setuptools import setup, find_namespace_packages # This should be place at top!
ROOT_DIR = os.path.dirname(__file__)
def is_mac():
return sys.platform == "darwin"
def is_win():
return sys.platform == "win32"
assert sys.version_info.major == 3 and 6 <= sys.version_info.minor < 12, \
"python version >= 3.6, <3.12 is required"
this_directory = path.abspath(path.dirname(__file__))
with open(path.join(this_directory, 'README.md'), encoding='utf-8') as f:
long_description = f.read()
packages = find_namespace_packages(
exclude=("docs", "docs.*", "documentation", "documentation.*", "build.*"))
print("We will install the following packages: ", packages)
""" ===== Remember to modify the EDITION at first ====="""
version = "0.0.1"
install_requires = [
"numpy>=1.21.6, <=1.24.2",
"matplotlib",
"pandas",
"tqdm",
"metadrive-simulator",
"geopandas",
"yapf==0.30.0",
"shapely"
]
setup(
name="scenarionet",
python_requires='>=3.6, <3.12', # do version check with assert
version=version,
description="Scalable Traffic Scenario Management System",
url="https://github.com/metadriverse/ScenarioNet",
author="MetaDrive Team",
author_email="quanyili0057@gmail.com, pzh@cs.ucla.edu",
packages=packages,
install_requires=install_requires,
# extras_require={
# "cuda": cuda_requirement,
# "nuplan": nuplan_requirement,
# "waymo": waymo_requirement,
# "all": nuplan_requirement + cuda_requirement
# },
include_package_data=True,
license="Apache 2.0",
long_description=long_description,
long_description_content_type='text/markdown',
)
"""
How to publish to pypi? Noted by Zhenghao in Dec 27, 2020.
0. Rename version in setup.py
1. Remove old files and ext_modules from setup() to get a clean wheel for all platforms in py3-none-any.wheel
rm -rf dist/ build/ documentation/build/ scenarionet.egg-info/ docs/build/
2. Rename current version to X.Y.Z.rcA, where A is arbitrary value represent "release candidate A".
This is really important since pypi do not support renaming and re-uploading.
Rename version in setup.py
3. Get wheel
python setup.py sdist bdist_wheel
4. Upload to test channel
twine upload --repository testpypi dist/*
5. Test as next line. If failed, change the version name and repeat 1, 2, 3, 4, 5.
pip install --index-url https://test.pypi.org/simple/ scenarionet
6. Rename current version to X.Y.Z in setup.py, rerun 1, 3 steps.
7. Upload to production channel
twine upload dist/*
"""