move tools from md

This commit is contained in:
QuanyiLi
2023-05-06 14:12:10 +01:00
parent 8d7978230c
commit f42487cc05
22 changed files with 1962 additions and 0 deletions

11
.gitignore vendored Normal file
View File

@@ -0,0 +1,11 @@
/scenarionet.egg-info/
*.pyc
*.egg-info
/.idea/
*.glf
*.rar
/docs/build/
/docs/.idea/
/build/
/dist/
/documentation/build/

4
scenarionet/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
import os
SCENARIONET_PACKAGE_PATH = os.path.dirname(os.path.abspath(__file__))
SCENARIONET_REPO_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

View File

View File

View File

@@ -0,0 +1,105 @@
"""
This script aims to convert nuplan scenarios to ScenarioDescription, so that we can load any nuplan scenarios into
MetaDrive.
"""
import copy
import os
import pickle
import shutil
import tqdm
from metadrive.engine.asset_loader import AssetLoader
from metadrive.scenario.scenario_description import ScenarioDescription
from metadrive.utils.nuplan.utils import get_nuplan_scenarios, convert_one_scenario
from metadrive.utils.utils import dict_recursive_remove_array
def convert_scenarios(output_path, dataset_params, worker_index=None, force_overwrite=False):
save_path = copy.deepcopy(output_path)
output_path = output_path + "_tmp"
# meta recorder and data summary
if os.path.exists(output_path):
shutil.rmtree(output_path)
os.makedirs(output_path, exist_ok=False)
# make real save dir
delay_remove = None
if os.path.exists(save_path):
if force_overwrite:
delay_remove = save_path
else:
raise ValueError("Directory already exists! Abort")
metadata_recorder = {}
total_scenarios = 0
desc = ""
summary_file = "dataset_summary.pkl"
if worker_index is not None:
desc += "Worker {} ".format(worker_index)
summary_file = "dataset_summary_worker{}.pkl".format(worker_index)
# Init.
scenarios = get_nuplan_scenarios(dataset_params)
for scenario in tqdm.tqdm(scenarios):
sd_scenario = convert_one_scenario(scenario)
sd_scenario = sd_scenario.to_dict()
ScenarioDescription.sanity_check(sd_scenario, check_self_type=True)
export_file_name = "sd_{}_{}.pkl".format("nuplan", scenario.scenario_name)
p = os.path.join(output_path, export_file_name)
with open(p, "wb") as f:
pickle.dump(sd_scenario, f)
metadata_recorder[export_file_name] = copy.deepcopy(sd_scenario[ScenarioDescription.METADATA])
# rename and save
if delay_remove is not None:
shutil.rmtree(delay_remove)
os.rename(output_path, save_path)
summary_file = os.path.join(save_path, summary_file)
with open(summary_file, "wb") as file:
pickle.dump(dict_recursive_remove_array(metadata_recorder), file)
print("Summary is saved at: {}".format(summary_file))
if delay_remove is not None:
assert delay_remove == save_path, delay_remove + " vs. " + save_path
if __name__ == "__main__":
# 14 types
all_scenario_types = "[behind_pedestrian_on_pickup_dropoff, \
near_multiple_vehicles, \
high_magnitude_jerk, \
crossed_by_vehicle, \
following_lane_with_lead, \
changing_lane_to_left, \
accelerating_at_traffic_light_without_lead, \
stopping_at_stop_sign_with_lead, \
traversing_narrow_lane, \
waiting_for_pedestrian_to_cross, \
starting_left_turn, \
starting_high_speed_turn, \
starting_unprotected_cross_turn, \
starting_protected_noncross_turn, \
on_pickup_dropoff]"
dataset_params = [
# builder setting
"scenario_builder=nuplan_mini",
"scenario_builder.scenario_mapping.subsample_ratio_override=0.5", # 10 hz
# filter
"scenario_filter=all_scenarios", # simulate only one log
"scenario_filter.remove_invalid_goals=true",
"scenario_filter.shuffle=true",
"scenario_filter.log_names=['2021.07.16.20.45.29_veh-35_01095_01486']",
# "scenario_filter.scenario_types={}".format(all_scenario_types),
# "scenario_filter.scenario_tokens=[]",
# "scenario_filter.map_names=[]",
# "scenario_filter.num_scenarios_per_type=1",
# "scenario_filter.limit_total_scenarios=1000",
# "scenario_filter.expand_scenarios=true",
# "scenario_filter.limit_scenarios_per_type=10", # use 10 scenarios per scenario type
"scenario_filter.timestamp_threshold_s=20", # minial scenario duration (s)
]
output_path = AssetLoader.file_path("nuplan", return_raw_style=False)
worker_index = None
force_overwrite = True
convert_scenarios(output_path, dataset_params, worker_index=worker_index, force_overwrite=force_overwrite)

View File

@@ -0,0 +1,462 @@
import logging
import os
import tempfile
from dataclasses import dataclass
from os.path import join
import numpy as np
from nuplan.common.actor_state.agent import Agent
from nuplan.common.actor_state.static_object import StaticObject
from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType
from nuplan.common.maps.maps_datatypes import TrafficLightStatusType
from shapely.geometry.linestring import LineString
from shapely.geometry.multilinestring import MultiLineString
from metadrive.scenario import ScenarioDescription as SD
from metadrive.type import MetaDriveType
from metadrive.utils.coordinates_shift import nuplan_to_metadrive_vector
from metadrive.utils.math import compute_angular_velocity
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from metadrive.utils import is_win
try:
import geopandas as gpd
from nuplan.common.actor_state.state_representation import Point2D
from nuplan.common.maps.maps_datatypes import SemanticMapLayer, StopLineType
from shapely.ops import unary_union
from nuplan.planning.scenario_builder.nuplan_db.nuplan_scenario import NuPlanScenario
import hydra
from nuplan.planning.scenario_builder.nuplan_db.nuplan_scenario import NuPlanScenario
from nuplan.planning.script.builders.scenario_building_builder import build_scenario_builder
from nuplan.planning.script.builders.scenario_filter_builder import build_scenario_filter
from nuplan.planning.script.utils import set_up_common_builder
import nuplan
except ImportError:
logger.warning("Can not import nuplan-devkit")
NUPLAN_PACKAGE_PATH = os.path.dirname(nuplan.__file__)
EGO = "ego"
def get_nuplan_scenarios(dataset_parameters, nuplan_package_path=NUPLAN_PACKAGE_PATH):
"""
Return a list of nuplan scenarios according to dataset_parameters
"""
base_config_path = os.path.join(nuplan_package_path, "planning", "script")
simulation_hydra_paths = construct_simulation_hydra_paths(base_config_path)
# Initialize configuration management system
hydra.core.global_hydra.GlobalHydra.instance().clear() # reinitialize hydra if already initialized
hydra.initialize_config_dir(config_dir=simulation_hydra_paths.config_path)
save_dir = tempfile.mkdtemp()
ego_controller = 'perfect_tracking_controller' # [log_play_back_controller, perfect_tracking_controller]
observation = 'box_observation' # [box_observation, idm_agents_observation, lidar_pc_observation]
# Compose the configuration
overrides = [
f'group={save_dir}',
'worker=sequential',
f'ego_controller={ego_controller}',
f'observation={observation}',
f'hydra.searchpath=[{simulation_hydra_paths.common_dir}, {simulation_hydra_paths.experiment_dir}]',
'output_dir=${group}/${experiment}',
'metric_dir=${group}/${experiment}',
*dataset_parameters,
]
overrides.extend(
[
f'job_name=planner_tutorial', 'experiment=${experiment_name}/${job_name}',
f'experiment_name=planner_tutorial'
]
)
# get config
cfg = hydra.compose(config_name=simulation_hydra_paths.config_name, overrides=overrides)
profiler_name = 'building_simulation'
common_builder = set_up_common_builder(cfg=cfg, profiler_name=profiler_name)
# Build scenario builder
scenario_builder = build_scenario_builder(cfg=cfg)
scenario_filter = build_scenario_filter(cfg.scenario_filter)
# get scenarios from database
return scenario_builder.get_scenarios(scenario_filter, common_builder.worker)
def construct_simulation_hydra_paths(base_config_path: str):
"""
Specifies relative paths to simulation configs to pass to hydra to declutter tutorial.
:param base_config_path: Base config path.
:return: Hydra config path.
"""
common_dir = "file://" + join(base_config_path, 'config', 'common')
config_name = 'default_simulation'
config_path = join(base_config_path, 'config', 'simulation')
experiment_dir = "file://" + join(base_config_path, 'experiments')
return HydraConfigPaths(common_dir, config_name, config_path, experiment_dir)
@dataclass
class HydraConfigPaths:
"""
Stores relative hydra paths to declutter tutorial.
"""
common_dir: str
config_name: str
config_path: str
experiment_dir: str
def extract_centerline(map_obj, nuplan_center):
path = map_obj.baseline_path.discrete_path
points = np.array([nuplan_to_metadrive_vector([pose.x, pose.y], nuplan_center) for pose in path])
return points
def get_points_from_boundary(boundary, center):
path = boundary.discrete_path
points = [(pose.x, pose.y) for pose in path]
points = nuplan_to_metadrive_vector(points, center)
return points
def get_line_type(nuplan_type):
return MetaDriveType.LINE_BROKEN_SINGLE_WHITE
# Always return broken line type
if nuplan_type == 2:
return MetaDriveType.LINE_SOLID_SINGLE_WHITE
elif nuplan_type == 0:
return MetaDriveType.LINE_BROKEN_SINGLE_WHITE
elif nuplan_type == 3:
return MetaDriveType.LINE_UNKNOWN
else:
raise ValueError("Unknown line tyep: {}".format(nuplan_type))
def extract_map_features(map_api, center, radius=250):
ret = {}
np.seterr(all='ignore')
# Center is Important !
layer_names = [
SemanticMapLayer.LANE_CONNECTOR,
SemanticMapLayer.LANE,
SemanticMapLayer.CROSSWALK,
SemanticMapLayer.INTERSECTION,
SemanticMapLayer.STOP_LINE,
SemanticMapLayer.WALKWAYS,
SemanticMapLayer.CARPARK_AREA,
SemanticMapLayer.ROADBLOCK,
SemanticMapLayer.ROADBLOCK_CONNECTOR,
# unsupported yet
# SemanticMapLayer.STOP_SIGN,
# SemanticMapLayer.DRIVABLE_AREA,
]
center_for_query = Point2D(*center)
nearest_vector_map = map_api.get_proximal_map_objects(center_for_query, radius, layer_names)
boundaries = map_api._get_vector_map_layer(SemanticMapLayer.BOUNDARIES)
# Filter out stop polygons in turn stop
if SemanticMapLayer.STOP_LINE in nearest_vector_map:
stop_polygons = nearest_vector_map[SemanticMapLayer.STOP_LINE]
nearest_vector_map[SemanticMapLayer.STOP_LINE] = [
stop_polygon for stop_polygon in stop_polygons if stop_polygon.stop_line_type != StopLineType.TURN_STOP
]
block_polygons = []
for layer in [SemanticMapLayer.ROADBLOCK, SemanticMapLayer.ROADBLOCK_CONNECTOR]:
for block in nearest_vector_map[layer]:
for lane_meta_data in block.interior_edges:
if not hasattr(lane_meta_data, "baseline_path"):
continue
if isinstance(lane_meta_data.polygon.boundary, MultiLineString):
# logger.warning("Stop using boundaries! Use exterior instead!")
boundary = gpd.GeoSeries(lane_meta_data.polygon.boundary).explode(index_parts=True)
sizes = []
for idx, polygon in enumerate(boundary[0]):
sizes.append(len(polygon.xy[1]))
points = boundary[0][np.argmax(sizes)].xy
elif isinstance(lane_meta_data.polygon.boundary, LineString):
points = lane_meta_data.polygon.boundary.xy
polygon = [[points[0][i], points[1][i]] for i in range(len(points[0]))]
polygon = nuplan_to_metadrive_vector(polygon, nuplan_center=[center[0], center[1]])
ret[lane_meta_data.id] = {
SD.TYPE: MetaDriveType.LANE_SURFACE_STREET,
SD.POLYLINE: extract_centerline(lane_meta_data, center),
SD.POLYGON: polygon
}
if layer == SemanticMapLayer.ROADBLOCK_CONNECTOR:
continue
left = lane_meta_data.left_boundary
if left.id not in ret:
line_type = get_line_type(int(boundaries.loc[[str(left.id)]]["boundary_type_fid"]))
if line_type != MetaDriveType.LINE_UNKNOWN:
ret[left.id] = {SD.TYPE: line_type, SD.POLYLINE: get_points_from_boundary(left, center)}
if layer == SemanticMapLayer.ROADBLOCK:
block_polygons.append(block.polygon)
interpolygons = [block.polygon for block in nearest_vector_map[SemanticMapLayer.INTERSECTION]]
# logger.warning("Stop using boundaries! Use exterior instead!")
boundaries = gpd.GeoSeries(unary_union(interpolygons + block_polygons)).boundary.explode(index_parts=True)
# boundaries.plot()
# plt.show()
for idx, boundary in enumerate(boundaries[0]):
block_points = np.array(list(i for i in zip(boundary.coords.xy[0], boundary.coords.xy[1])))
block_points = nuplan_to_metadrive_vector(block_points, center)
id = "boundary_{}".format(idx)
ret[id] = {SD.TYPE: MetaDriveType.LINE_SOLID_SINGLE_WHITE, SD.POLYLINE: block_points}
np.seterr(all='warn')
return ret
def set_light_status(status):
if status == TrafficLightStatusType.GREEN:
return MetaDriveType.LIGHT_GREEN
elif status == TrafficLightStatusType.RED:
return MetaDriveType.LIGHT_RED
elif status == TrafficLightStatusType.YELLOW:
return MetaDriveType.LIGHT_YELLOW
elif status == TrafficLightStatusType.UNKNOWN:
return MetaDriveType.LIGHT_UNKNOWN
def set_light_position(scenario, lane_id, center, target_position=8):
lane = scenario.map_api.get_map_object(str(lane_id), SemanticMapLayer.LANE_CONNECTOR)
assert lane is not None, "Can not find lane: {}".format(lane_id)
path = lane.baseline_path.discrete_path
acc_length = 0
point = [path[0].x, path[0].y]
for k, point in enumerate(path[1:], start=1):
previous_p = path[k - 1]
acc_length += np.linalg.norm([point.x - previous_p.x, point.y - previous_p.y])
if acc_length > target_position:
break
return [point.x - center[0], point.y - center[1]]
def extract_traffic_light(scenario, center):
length = scenario.get_number_of_iterations()
frames = [
{str(t.lane_connector_id): t.status
for t in scenario.get_traffic_light_status_at_iteration(i)} for i in range(length)
]
all_lights = set()
for frame in frames:
all_lights.update(frame.keys())
lights = {
k: {
"type": MetaDriveType.TRAFFIC_LIGHT,
"state": {
SD.TRAFFIC_LIGHT_STATUS: [MetaDriveType.LIGHT_UNKNOWN] * length
},
SD.TRAFFIC_LIGHT_POSITION: None,
SD.TRAFFIC_LIGHT_LANE: str(k),
"metadata": dict(track_length=length, type=None, object_id=str(k), lane_id=str(k), dataset="nuplan")
}
for k in list(all_lights)
}
for k, frame in enumerate(frames):
for lane_id, status in frame.items():
lane_id = str(lane_id)
lights[lane_id]["state"][SD.TRAFFIC_LIGHT_STATUS][k] = set_light_status(status)
if lights[lane_id][SD.TRAFFIC_LIGHT_POSITION] is None:
assert isinstance(lane_id, str), "Lane ID should be str"
lights[lane_id][SD.TRAFFIC_LIGHT_POSITION] = set_light_position(scenario, lane_id, center)
lights[lane_id][SD.METADATA][SD.TYPE] = MetaDriveType.TRAFFIC_LIGHT
return lights
def get_traffic_obj_type(nuplan_type):
if nuplan_type == TrackedObjectType.VEHICLE:
return MetaDriveType.VEHICLE
elif nuplan_type == TrackedObjectType.TRAFFIC_CONE:
return MetaDriveType.TRAFFIC_CONE
elif nuplan_type == TrackedObjectType.PEDESTRIAN:
return MetaDriveType.PEDESTRIAN
elif nuplan_type == TrackedObjectType.BICYCLE:
return MetaDriveType.CYCLIST
elif nuplan_type == TrackedObjectType.BARRIER:
return MetaDriveType.TRAFFIC_BARRIER
elif nuplan_type == TrackedObjectType.EGO:
raise ValueError("Ego should not be in detected resukts")
else:
return None
def parse_object_state(obj_state, nuplan_center):
ret = {}
ret["position"] = nuplan_to_metadrive_vector([obj_state.center.x, obj_state.center.y], nuplan_center)
ret["heading"] = obj_state.center.heading
ret["velocity"] = nuplan_to_metadrive_vector([obj_state.velocity.x, obj_state.velocity.y])
ret["valid"] = 1
ret["length"] = obj_state.box.length
ret["width"] = obj_state.box.width
ret["height"] = obj_state.box.height
return ret
def parse_ego_vehicle_state(state, nuplan_center):
center = nuplan_center
ret = {}
ret["position"] = nuplan_to_metadrive_vector([state.waypoint.x, state.waypoint.y], center)
ret["heading"] = state.waypoint.heading
ret["velocity"] = nuplan_to_metadrive_vector([state.agent.velocity.x, state.agent.velocity.y])
ret["angular_velocity"] = state.dynamic_car_state.angular_velocity
ret["valid"] = 1
ret["length"] = state.agent.box.length
ret["width"] = state.agent.box.width
ret["height"] = state.agent.box.height
return ret
def parse_ego_vehicle_state_trajectory(scenario, nuplan_center):
data = [
parse_ego_vehicle_state(scenario.get_ego_state_at_iteration(i), nuplan_center)
for i in range(scenario.get_number_of_iterations())
]
for i in range(len(data) - 1):
data[i]["angular_velocity"] = compute_angular_velocity(
initial_heading=data[i]["heading"], final_heading=data[i + 1]["heading"], dt=scenario.database_interval
)
return data
def extract_traffic(scenario: NuPlanScenario, center):
episode_len = scenario.get_number_of_iterations()
detection_ret = []
all_objs = set()
all_objs.add(EGO)
for frame_data in [scenario.get_tracked_objects_at_iteration(i).tracked_objects for i in range(episode_len)]:
new_frame_data = {}
for obj in frame_data:
new_frame_data[obj.track_token] = obj
all_objs.add(obj.track_token)
detection_ret.append(new_frame_data)
tracks = {
k: dict(
type=MetaDriveType.UNSET,
state=dict(
position=np.zeros(shape=(episode_len, 3)),
heading=np.zeros(shape=(episode_len, )),
velocity=np.zeros(shape=(episode_len, 2)),
valid=np.zeros(shape=(episode_len, )),
length=np.zeros(shape=(episode_len, 1)),
width=np.zeros(shape=(episode_len, 1)),
height=np.zeros(shape=(episode_len, 1))
),
metadata=dict(track_length=episode_len, nuplan_type=None, type=None, object_id=k, nuplan_id=k)
)
for k in list(all_objs)
}
tracks_to_remove = set()
for frame_idx, frame in enumerate(detection_ret):
for nuplan_id, obj_state, in frame.items():
assert isinstance(obj_state, Agent) or isinstance(obj_state, StaticObject)
obj_type = get_traffic_obj_type(obj_state.tracked_object_type)
if obj_type is None:
tracks_to_remove.add(nuplan_id)
continue
tracks[nuplan_id][SD.TYPE] = obj_type
if tracks[nuplan_id][SD.METADATA]["nuplan_type"] is None:
tracks[nuplan_id][SD.METADATA]["nuplan_type"] = int(obj_state.tracked_object_type)
tracks[nuplan_id][SD.METADATA]["type"] = obj_type
state = parse_object_state(obj_state, center)
tracks[nuplan_id]["state"]["position"][frame_idx] = [state["position"][0], state["position"][1], 0.0]
tracks[nuplan_id]["state"]["heading"][frame_idx] = state["heading"]
tracks[nuplan_id]["state"]["velocity"][frame_idx] = state["velocity"]
tracks[nuplan_id]["state"]["valid"][frame_idx] = 1
tracks[nuplan_id]["state"]["length"][frame_idx] = state["length"]
tracks[nuplan_id]["state"]["width"][frame_idx] = state["width"]
tracks[nuplan_id]["state"]["height"][frame_idx] = state["height"]
for track in list(tracks_to_remove):
tracks.pop(track)
# ego
sdc_traj = parse_ego_vehicle_state_trajectory(scenario, center)
ego_track = tracks[EGO]
for frame_idx, obj_state in enumerate(sdc_traj):
obj_type = MetaDriveType.VEHICLE
ego_track[SD.TYPE] = obj_type
if ego_track[SD.METADATA]["nuplan_type"] is None:
ego_track[SD.METADATA]["nuplan_type"] = int(TrackedObjectType.EGO)
ego_track[SD.METADATA]["type"] = obj_type
state = obj_state
ego_track["state"]["position"][frame_idx] = [state["position"][0], state["position"][1], 0.0]
ego_track["state"]["valid"][frame_idx] = 1
ego_track["state"]["heading"][frame_idx] = state["heading"]
# this velocity is in ego car frame, abort
# ego_track["state"]["velocity"][frame_idx] = state["velocity"]
ego_track["state"]["length"][frame_idx] = state["length"]
ego_track["state"]["width"][frame_idx] = state["width"]
ego_track["state"]["height"][frame_idx] = state["height"]
# get velocity here
vel = ego_track["state"]["position"][1:] - ego_track["state"]["position"][:-1]
ego_track["state"]["velocity"][:-1] = vel[..., :2] / 0.1
ego_track["state"]["velocity"][-1] = ego_track["state"]["velocity"][-2]
# check
assert EGO in tracks
for track_id in tracks:
assert tracks[track_id][SD.TYPE] != MetaDriveType.UNSET
return tracks
def convert_one_scenario(scenario: NuPlanScenario):
"""
Data will be interpolated to 0.1s time interval, while the time interval of original key frames are 0.5s.
"""
scenario_log_interval = scenario.database_interval
assert abs(scenario_log_interval - 0.1) < 1e-3, "Log interval should be 0.1 or Interpolating is required! " \
"By setting NuPlan subsample ratio can address this"
result = SD()
result[SD.ID] = scenario.scenario_name
result[SD.VERSION] = "nuplan" + scenario.map_version
result[SD.LENGTH] = scenario.get_number_of_iterations()
# metadata
result[SD.METADATA] = {}
result[SD.METADATA]["dataset"] = "nuplan"
result[SD.METADATA]["map"] = scenario.map_api.map_name
result[SD.METADATA][SD.METADRIVE_PROCESSED] = False
result[SD.METADATA]["map_version"] = scenario.map_version
result[SD.METADATA]["log_name"] = scenario.log_name
result[SD.METADATA]["scenario_extraction_info"] = scenario._scenario_extraction_info.__dict__
result[SD.METADATA]["ego_vehicle_parameters"] = scenario.ego_vehicle_parameters.__dict__
result[SD.METADATA]["coordinate"] = "right-handed"
result[SD.METADATA]["scenario_token"] = scenario.scenario_name
result[SD.METADATA]["scenario_id"] = scenario.scenario_name
result[SD.METADATA]["scenario_type"] = scenario.scenario_type
result[SD.METADATA]["sample_rate"] = scenario_log_interval
result[SD.METADATA][SD.TIMESTEP] = np.asarray([i * scenario_log_interval for i in range(result[SD.LENGTH])])
# centered all positions to ego car
state = scenario.get_ego_state_at_iteration(0)
scenario_center = [state.waypoint.x, state.waypoint.y]
result[SD.TRACKS] = extract_traffic(scenario, scenario_center)
result[SD.METADATA][SD.SDC_ID] = EGO
# traffic light
result[SD.DYNAMIC_MAP_STATES] = extract_traffic_light(scenario, scenario_center)
# map
result[SD.MAP_FEATURES] = extract_map_features(scenario.map_api, scenario_center)
return result

View File

@@ -0,0 +1,77 @@
"""
This script aims to convert nuscenes scenarios to ScenarioDescription, so that we can load any nuscenes scenarios into
MetaDrive.
"""
import copy
import os
import pickle
import tqdm
from metadrive.engine.asset_loader import AssetLoader
from metadrive.scenario.scenario_description import ScenarioDescription
from metadrive.utils.nuscenes.utils import convert_one_scenario
from metadrive.utils.utils import dict_recursive_remove_array
import shutil
try:
from nuscenes import NuScenes
except ImportError:
print("Can not find nuscenes-devkit")
def convert_scenarios(version, dataroot, output_path, worker_index=None, verbose=True, force_overwrite=False):
save_path = copy.deepcopy(output_path)
output_path = output_path + "_tmp"
# meta recorder and data summary
if os.path.exists(output_path):
shutil.rmtree(output_path)
os.makedirs(output_path, exist_ok=False)
# make real save dir
delay_remove = None
if os.path.exists(save_path):
if force_overwrite:
delay_remove = save_path
else:
raise ValueError("Directory already exists! Abort")
metadata_recorder = {}
total_scenarios = 0
desc = ""
summary_file = "dataset_summary.pkl"
if worker_index is not None:
desc += "Worker {} ".format(worker_index)
summary_file = "dataset_summary_worker{}.pkl".format(worker_index)
# Init.
nusc = NuScenes(version=version, verbose=verbose, dataroot=dataroot)
scenes = nusc.scene
for scene in tqdm.tqdm(scenes):
sd_scene = convert_one_scenario(scene["token"], nusc)
sd_scene = sd_scene.to_dict()
ScenarioDescription.sanity_check(sd_scene, check_self_type=True)
export_file_name = "sd_{}_{}.pkl".format("nuscenes_" + version, scene["token"])
p = os.path.join(output_path, export_file_name)
with open(p, "wb") as f:
pickle.dump(sd_scene, f)
metadata_recorder[export_file_name] = copy.deepcopy(sd_scene[ScenarioDescription.METADATA])
# rename and save
if delay_remove is not None:
shutil.rmtree(delay_remove)
os.rename(output_path, save_path)
summary_file = os.path.join(save_path, summary_file)
with open(summary_file, "wb") as file:
pickle.dump(dict_recursive_remove_array(metadata_recorder), file)
print("Summary is saved at: {}".format(summary_file))
assert delay_remove == save_path
if __name__ == "__main__":
output_path = AssetLoader.file_path("nuscenes", return_raw_style=False)
version = 'v1.0-mini'
verbose = True
dataroot = '/home/shady/data/nuscenes'
worker_index = None
force_overwrite = True
convert_scenarios(version, dataroot, output_path, force_overwrite=force_overwrite)

View File

@@ -0,0 +1,78 @@
ALL_TYPE = {
"noise": 'noise',
"human.pedestrian.adult": 'adult',
"human.pedestrian.child": 'child',
"human.pedestrian.wheelchair": 'wheelchair',
"human.pedestrian.stroller": 'stroller',
"human.pedestrian.personal_mobility": 'p.mobility',
"human.pedestrian.police_officer": 'police',
"human.pedestrian.construction_worker": 'worker',
"animal": 'animal',
"vehicle.car": 'car',
"vehicle.motorcycle": 'motorcycle',
"vehicle.bicycle": 'bicycle',
"vehicle.bus.bendy": 'bus.bendy',
"vehicle.bus.rigid": 'bus.rigid',
"vehicle.truck": 'truck',
"vehicle.construction": 'constr. veh',
"vehicle.emergency.ambulance": 'ambulance',
"vehicle.emergency.police": 'police car',
"vehicle.trailer": 'trailer',
"movable_object.barrier": 'barrier',
"movable_object.trafficcone": 'trafficcone',
"movable_object.pushable_pullable": 'push/pullable',
"movable_object.debris": 'debris',
"static_object.bicycle_rack": 'bicycle racks',
"flat.driveable_surface": 'driveable',
"flat.sidewalk": 'sidewalk',
"flat.terrain": 'terrain',
"flat.other": 'flat.other',
"static.manmade": 'manmade',
"static.vegetation": 'vegetation',
"static.other": 'static.other',
"vehicle.ego": "ego"
}
NOISE_TYPE = {
"noise": 'noise',
"animal": 'animal',
"static_object.bicycle_rack": 'bicycle racks',
"movable_object.pushable_pullable": 'push/pullable',
"movable_object.debris": 'debris',
"static.manmade": 'manmade',
"static.vegetation": 'vegetation',
"static.other": 'static.other',
}
HUMAN_TYPE = {
"human.pedestrian.adult": 'adult',
"human.pedestrian.child": 'child',
"human.pedestrian.wheelchair": 'wheelchair',
"human.pedestrian.stroller": 'stroller',
"human.pedestrian.personal_mobility": 'p.mobility',
"human.pedestrian.police_officer": 'police',
"human.pedestrian.construction_worker": 'worker',
}
BICYCLE_TYPE = {
"vehicle.bicycle": 'bicycle',
"vehicle.motorcycle": 'motorcycle',
}
VEHICLE_TYPE = {
"vehicle.car": 'car',
"vehicle.bus.bendy": 'bus.bendy',
"vehicle.bus.rigid": 'bus.rigid',
"vehicle.truck": 'truck',
"vehicle.construction": 'constr. veh',
"vehicle.emergency.ambulance": 'ambulance',
"vehicle.emergency.police": 'police car',
"vehicle.trailer": 'trailer',
"vehicle.ego": "ego",
}
OBSTACLE_TYPE = {
"movable_object.barrier": 'barrier',
"movable_object.trafficcone": 'trafficcone',
}
TERRAIN_TYPE = {
"flat.driveable_surface": 'driveable',
"flat.sidewalk": 'sidewalk',
"flat.terrain": 'terrain',
"flat.other": 'flat.other'
}

View File

@@ -0,0 +1,382 @@
import copy
import geopandas as gpd
import numpy as np
from nuscenes import NuScenes
from nuscenes.can_bus.can_bus_api import NuScenesCanBus
from nuscenes.eval.common.utils import quaternion_yaw
from nuscenes.map_expansion.arcline_path_utils import discretize_lane
from nuscenes.map_expansion.map_api import NuScenesMap
from pyquaternion import Quaternion
from shapely.ops import unary_union, cascaded_union
import logging
from metadrive.scenario import ScenarioDescription as SD
from metadrive.type import MetaDriveType
from metadrive.utils.nuscenes.detection_type import ALL_TYPE, HUMAN_TYPE, BICYCLE_TYPE, VEHICLE_TYPE
logger = logging.getLogger(__name__)
EGO = "ego"
def get_metadrive_type(obj_type):
meta_type = obj_type
md_type = None
if ALL_TYPE[obj_type] == "barrier":
md_type = MetaDriveType.TRAFFIC_BARRIER
elif ALL_TYPE[obj_type] == "trafficcone":
md_type = MetaDriveType.TRAFFIC_CONE
elif obj_type in VEHICLE_TYPE:
md_type = MetaDriveType.VEHICLE
elif obj_type in HUMAN_TYPE:
md_type = MetaDriveType.PEDESTRIAN
elif obj_type in BICYCLE_TYPE:
md_type = MetaDriveType.CYCLIST
# assert meta_type != MetaDriveType.UNSET and meta_type != "noise"
return md_type, meta_type
def parse_frame(frame, nuscenes: NuScenes):
ret = {}
for obj_id in frame["anns"]:
obj = nuscenes.get("sample_annotation", obj_id)
# velocity = nuscenes.box_velocity(obj_id)[:2]
# if np.nan in velocity:
velocity = np.array([0.0, 0.0])
ret[obj["instance_token"]] = {
"position": obj["translation"],
"obj_id": obj["instance_token"],
"heading": quaternion_yaw(Quaternion(*obj["rotation"])),
"rotation": obj["rotation"],
"velocity": velocity,
"size": obj["size"],
"visible": obj["visibility_token"],
"attribute": [nuscenes.get("attribute", i)["name"] for i in obj["attribute_tokens"]],
"type": obj["category_name"]
}
ego_token = nuscenes.get("sample_data", frame["data"]["LIDAR_TOP"])["ego_pose_token"]
ego_state = nuscenes.get("ego_pose", ego_token)
ret[EGO] = {
"position": ego_state["translation"],
"obj_id": EGO,
"heading": quaternion_yaw(Quaternion(*ego_state["rotation"])),
"rotation": ego_state["rotation"],
"type": "vehicle.car",
"velocity": np.array([0.0, 0.0]),
# size https://en.wikipedia.org/wiki/Renault_Zoe
"size": [4.08, 1.73, 1.56],
}
return ret
def interpolate_heading(heading_data, old_valid, new_valid, num_to_interpolate=5):
new_heading_theta = np.zeros_like(new_valid)
for k, valid in enumerate(old_valid[:-1]):
if abs(valid) > 1e-1 and abs(old_valid[k + 1]) > 1e-1:
diff = (heading_data[k + 1] - heading_data[k] + np.pi) % (2 * np.pi) - np.pi
# step = diff
interpolate_heading = np.linspace(heading_data[k], heading_data[k] + diff, 6)
new_heading_theta[k * num_to_interpolate:(k + 1) * num_to_interpolate] = interpolate_heading[:-1]
elif abs(valid) > 1e-1 and abs(old_valid[k + 1]) < 1e-1:
new_heading_theta[k * num_to_interpolate:(k + 1) * num_to_interpolate] = heading_data[k]
new_heading_theta[-1] = heading_data[-1]
return new_heading_theta * new_valid
def _interpolate_one_dim(data, old_valid, new_valid, num_to_interpolate=5):
new_data = np.zeros_like(new_valid)
for k, valid in enumerate(old_valid[:-1]):
if abs(valid) > 1e-1 and abs(old_valid[k + 1]) > 1e-1:
diff = data[k + 1] - data[k]
# step = diff
interpolate_data = np.linspace(data[k], data[k] + diff, num_to_interpolate + 1)
new_data[k * num_to_interpolate:(k + 1) * num_to_interpolate] = interpolate_data[:-1]
elif abs(valid) > 1e-1 and abs(old_valid[k + 1]) < 1e-1:
new_data[k * num_to_interpolate:(k + 1) * num_to_interpolate] = data[k]
new_data[-1] = data[-1]
return new_data * new_valid
def interpolate(origin_y, valid, new_valid):
if len(origin_y.shape) == 1:
ret = _interpolate_one_dim(origin_y, valid, new_valid)
elif len(origin_y.shape) == 2:
ret = []
for dim in range(origin_y.shape[-1]):
new_y = _interpolate_one_dim(origin_y[..., dim], valid, new_valid)
new_y = np.expand_dims(new_y, axis=-1)
ret.append(new_y)
ret = np.concatenate(ret, axis=-1)
else:
raise ValueError("Y has shape {}, Can not interpolate".format(origin_y.shape))
return ret
def get_tracks_from_frames(nuscenes: NuScenes, scene_info, frames, num_to_interpolate=5):
episode_len = len(frames)
# Fill tracks
all_objs = set()
for frame in frames:
all_objs.update(frame.keys())
tracks = {
k: dict(
type=MetaDriveType.UNSET,
state=dict(
position=np.zeros(shape=(episode_len, 3)),
heading=np.zeros(shape=(episode_len, )),
velocity=np.zeros(shape=(episode_len, 2)),
valid=np.zeros(shape=(episode_len, )),
length=np.zeros(shape=(episode_len, 1)),
width=np.zeros(shape=(episode_len, 1)),
height=np.zeros(shape=(episode_len, 1))
),
metadata=dict(track_length=episode_len, type=MetaDriveType.UNSET, object_id=k, original_id=k)
)
for k in list(all_objs)
}
tracks_to_remove = set()
for frame_idx in range(episode_len):
# Record all agents' states (position, velocity, ...)
for id, state in frames[frame_idx].items():
# Fill type
md_type, meta_type = get_metadrive_type(state["type"])
tracks[id]["type"] = md_type
tracks[id][SD.METADATA]["type"] = meta_type
if md_type is None or md_type == MetaDriveType.UNSET:
tracks_to_remove.add(id)
continue
tracks[id]["type"] = md_type
tracks[id][SD.METADATA]["type"] = meta_type
# Introducing the state item
tracks[id]["state"]["position"][frame_idx] = state["position"]
tracks[id]["state"]["heading"][frame_idx] = state["heading"]
tracks[id]["state"]["velocity"][frame_idx] = tracks[id]["state"]["velocity"][frame_idx]
tracks[id]["state"]["valid"][frame_idx] = 1
tracks[id]["state"]["length"][frame_idx] = state["size"][1]
tracks[id]["state"]["width"][frame_idx] = state["size"][0]
tracks[id]["state"]["height"][frame_idx] = state["size"][2]
tracks[id]["metadata"]["original_id"] = id
tracks[id]["metadata"]["object_id"] = id
for track in tracks_to_remove:
track_data = tracks.pop(track)
obj_type = track_data[SD.METADATA]["type"]
print("\nWARNING: Can not map type: {} to any MetaDrive Type".format(obj_type))
new_episode_len = (episode_len - 1) * num_to_interpolate + 1
# interpolate
interpolate_tracks = {}
for id, track, in tracks.items():
interpolate_tracks[id] = copy.deepcopy(track)
interpolate_tracks[id]["metadata"]["track_length"] = new_episode_len
# valid first
new_valid = np.zeros(shape=(new_episode_len, ))
if track["state"]["valid"][0]:
new_valid[0] = 1
for k, valid in enumerate(track["state"]["valid"][1:], start=1):
if valid:
if abs(new_valid[(k - 1) * num_to_interpolate] - 1) < 1e-2:
start_idx = (k - 1) * num_to_interpolate + 1
else:
start_idx = k * num_to_interpolate
new_valid[start_idx:k * num_to_interpolate + 1] = 1
interpolate_tracks[id]["state"]["valid"] = new_valid
# position
interpolate_tracks[id]["state"]["position"] = interpolate(
track["state"]["position"], track["state"]["valid"], new_valid
)
if id == "ego":
# We can get it from canbus
canbus = NuScenesCanBus(dataroot=nuscenes.dataroot)
imu_pos = np.asarray([state["pos"] for state in canbus.get_messages(scene_info["name"], "pose")[::5]])
interpolate_tracks[id]["state"]["position"][:len(imu_pos)] = imu_pos
# velocity
interpolate_tracks[id]["state"]["velocity"] = interpolate(
track["state"]["velocity"], track["state"]["valid"], new_valid
)
vel = interpolate_tracks[id]["state"]["position"][1:] - interpolate_tracks[id]["state"]["position"][:-1]
interpolate_tracks[id]["state"]["velocity"][:-1] = vel[..., :2] / 0.1
for k, valid in enumerate(new_valid[1:], start=1):
if valid == 0 or not valid or abs(valid) < 1e-2:
interpolate_tracks[id]["state"]["velocity"][k] = np.array([0., 0.])
interpolate_tracks[id]["state"]["velocity"][k - 1] = np.array([0., 0.])
# speed outlier check
max_vel = np.max(np.linalg.norm(interpolate_tracks[id]["state"]["velocity"], axis=-1))
assert max_vel < 50, "Abnormal velocity!"
if max_vel > 30:
print("\nWARNING: Too large peed for {}: {}".format(id, max_vel))
# heading
# then update position
new_heading = interpolate_heading(track["state"]["heading"], track["state"]["valid"], new_valid)
interpolate_tracks[id]["state"]["heading"] = new_heading
if id == "ego":
# We can get it from canbus
canbus = NuScenesCanBus(dataroot=nuscenes.dataroot)
imu_heading = np.asarray(
[
quaternion_yaw(Quaternion(state["orientation"]))
for state in canbus.get_messages(scene_info["name"], "pose")[::5]
]
)
interpolate_tracks[id]["state"]["heading"][:len(imu_heading)] = imu_heading
for k, v in track["state"].items():
if k in ["valid", "heading", "position", "velocity"]:
continue
else:
interpolate_tracks[id]["state"][k] = interpolate(v, track["state"]["valid"], new_valid)
# if id == "ego":
# ego is valid all time, so we can calculate the velocity in this way
return interpolate_tracks
def get_map_features(scene_info, nuscenes: NuScenes, map_center, radius=250, points_distance=1):
"""
Extract map features from nuscenes data. The objects in specified region will be returned. Sampling rate determines
the distance between 2 points when extracting lane center line.
"""
ret = {}
map_name = nuscenes.get("log", scene_info["log_token"])["location"]
map_api = NuScenesMap(dataroot=nuscenes.dataroot, map_name=map_name)
layer_names = [
# "line",
# "polygon",
# "node",
'drivable_area',
'road_segment',
'road_block',
'lane',
# 'ped_crossing',
# 'walkway',
# 'stop_line',
# 'carpark_area',
'lane_connector',
'road_divider',
'lane_divider',
'traffic_light'
]
map_objs = map_api.get_records_in_radius(map_center[0], map_center[1], radius, layer_names)
# build map boundary
polygons = []
# for id in map_objs["drivable_area"]:
# seg_info = map_api.get("drivable_area", id)
# assert seg_info["token"] == id
# for polygon_token in seg_info["polygon_tokens"]:
# polygon = map_api.extract_polygon(polygon_token)
# polygons.append(polygon)
for id in map_objs["road_segment"]:
seg_info = map_api.get("road_segment", id)
assert seg_info["token"] == id
polygon = map_api.extract_polygon(seg_info["polygon_token"])
polygons.append(polygon)
for id in map_objs["road_block"]:
seg_info = map_api.get("road_block", id)
assert seg_info["token"] == id
polygon = map_api.extract_polygon(seg_info["polygon_token"])
polygons.append(polygon)
polygons = [geom if geom.is_valid else geom.buffer(0) for geom in polygons]
# logger.warning("Stop using boundaries! Use exterior instead!")
boundaries = gpd.GeoSeries(unary_union(polygons)).boundary.explode(index_parts=True)
for idx, boundary in enumerate(boundaries[0]):
block_points = np.array(list(i for i in zip(boundary.coords.xy[0], boundary.coords.xy[1])))
id = "boundary_{}".format(idx)
ret[id] = {SD.TYPE: MetaDriveType.LINE_SOLID_SINGLE_WHITE, SD.POLYLINE: block_points}
for id in map_objs["lane_divider"]:
line_info = map_api.get("lane_divider", id)
assert line_info["token"] == id
line = map_api.extract_line(line_info["line_token"]).coords.xy
line = [[line[0][i], line[1][i]] for i in range(len(line[0]))]
ret[id] = {SD.TYPE: MetaDriveType.LINE_BROKEN_SINGLE_WHITE, SD.POLYLINE: line}
for id in map_objs["road_divider"]:
line_info = map_api.get("road_divider", id)
assert line_info["token"] == id
line = map_api.extract_line(line_info["line_token"]).coords.xy
line = [[line[0][i], line[1][i]] for i in range(len(line[0]))]
ret[id] = {SD.TYPE: MetaDriveType.LINE_SOLID_SINGLE_YELLOW, SD.POLYLINE: line}
for id in map_objs["lane"]:
lane_info = map_api.get("lane", id)
assert lane_info["token"] == id
boundary = map_api.extract_polygon(lane_info["polygon_token"]).boundary.xy
boundary_polygon = [[boundary[0][i], boundary[1][i]] for i in range(len(boundary[0]))]
# boundary_polygon += [[boundary[0][i], boundary[1][i]] for i in range(len(boundary[0]))]
ret[id] = {
SD.TYPE: MetaDriveType.LANE_SURFACE_STREET,
SD.POLYLINE: discretize_lane(map_api.arcline_path_3[id], resolution_meters=points_distance),
SD.POLYGON: boundary_polygon,
# TODO Add speed limit if needed
"speed_limit_kmh": 100
}
for id in map_objs["lane_connector"]:
lane_info = map_api.get("lane_connector", id)
assert lane_info["token"] == id
# boundary = map_api.extract_polygon(lane_info["polygon_token"]).boundary.xy
# boundary_polygon = [[boundary[0][i], boundary[1][i], 0.1] for i in range(len(boundary[0]))]
# boundary_polygon += [[boundary[0][i], boundary[1][i], 0.] for i in range(len(boundary[0]))]
ret[id] = {
SD.TYPE: MetaDriveType.LANE_SURFACE_STREET,
SD.POLYLINE: discretize_lane(map_api.arcline_path_3[id], resolution_meters=points_distance),
# SD.POLYGON: boundary_polygon,
"speed_limit_kmh": 100
}
return ret
def convert_one_scenario(scene_token: str, nuscenes: NuScenes):
"""
Data will be interpolated to 0.1s time interval, while the time interval of original key frames are 0.5s.
"""
scenario_log_interval = 0.1
scene_info = nuscenes.get("scene", scene_token)
frames = []
current_frame = nuscenes.get("sample", scene_info["first_sample_token"])
while current_frame["token"] != scene_info["last_sample_token"]:
frames.append(parse_frame(current_frame, nuscenes))
current_frame = nuscenes.get("sample", current_frame["next"])
frames.append(parse_frame(current_frame, nuscenes))
assert current_frame["next"] == ""
assert len(frames) == scene_info["nbr_samples"], "Number of sample mismatches! "
result = SD()
result[SD.ID] = scene_info["name"]
result[SD.VERSION] = "nuscenes" + nuscenes.version
result[SD.LENGTH] = (len(frames) - 1) * 5 + 1
result[SD.METADATA] = {}
result[SD.METADATA]["dataset"] = "nuscenes"
result[SD.METADATA][SD.METADRIVE_PROCESSED] = False
result[SD.METADATA]["map"] = nuscenes.get("log", scene_info["log_token"])["location"]
result[SD.METADATA]["date"] = nuscenes.get("log", scene_info["log_token"])["date_captured"]
result[SD.METADATA]["coordinate"] = "right-handed"
result[SD.METADATA]["scenario_token"] = scene_token
result[SD.METADATA]["scenario_id"] = scene_info["name"]
result[SD.METADATA]["sample_rate"] = scenario_log_interval
result[SD.METADATA][SD.TIMESTEP] = np.arange(0., (len(frames) - 1) * 0.5 + 0.1, 0.1)
# interpolating to 0.1s interval
result[SD.TRACKS] = get_tracks_from_frames(nuscenes, scene_info, frames, num_to_interpolate=5)
result[SD.METADATA][SD.SDC_ID] = "ego"
# TODO Traffic Light
result[SD.DYNAMIC_MAP_STATES] = {}
# map
map_center = result[SD.TRACKS]["ego"]["state"]["position"][0]
result[SD.MAP_FEATURES] = get_map_features(scene_info, nuscenes, map_center, 250)
return result

View File

View File

View File

View File

@@ -0,0 +1,262 @@
"""
This script takes --folder as input. It is the folder storing a batch of tfrecord file.
This script will create the output folder "processed_data" sharing the same level as `--folder`.
-- folder
-- processed_data
"""
import argparse
from metadrive.utils.utils import dict_recursive_remove_array
import copy
import os
import pickle
from collections import defaultdict
import numpy as np
from metadrive.constants import DATA_VERSION
try:
import tensorflow as tf
except ImportError:
pass
try:
from waymo_open_dataset.protos import scenario_pb2
except ImportError:
try:
from metadrive.utils.waymo.protos import scenario_pb2 # Local files that only in PZH's computer.
except ImportError:
print(
"Please install waymo_open_dataset package through metadrive dependencies: "
"pip install waymo-open-dataset-tf-2-11-0==1.5.0"
)
from metadrive.scenario import ScenarioDescription as SD
from metadrive.type import MetaDriveType
from metadrive.utils.waymo.utils import extract_tracks, extract_dynamic_map_states, extract_map_features, \
compute_width
import sys
def validate_sdc_track(sdc_state):
"""
This function filters the scenario based on SDC information.
Rule 1: Filter out if the trajectory length < 10
Rule 2: Filter out if the whole trajectory last < 5s, assuming sampling frequency = 10Hz.
"""
valid_array = sdc_state["valid"]
sdc_trajectory = sdc_state["position"][valid_array, :2]
sdc_track_length = [
np.linalg.norm(sdc_trajectory[i] - sdc_trajectory[i + 1]) for i in range(sdc_trajectory.shape[0] - 1)
]
sdc_track_length = sum(sdc_track_length)
# Rule 1
if sdc_track_length < 10:
return False
print("sdc_track_length: ", sdc_track_length)
# Rule 2
if valid_array.sum() < 50:
return False
return True
def _get_agent_summary(state_dict, id, type):
track = state_dict["position"]
valid_track = track[state_dict["valid"], :2]
distance = float(sum(np.linalg.norm(valid_track[i] - valid_track[i + 1]) for i in range(valid_track.shape[0] - 1)))
valid_length = int(sum(state_dict["valid"]))
continuous_valid_length = 0
for v in state_dict["valid"]:
if v:
continuous_valid_length += 1
if continuous_valid_length > 0 and not v:
break
return {
"type": type,
"object_id": str(id),
"track_length": int(len(track)),
"distance": float(distance),
"valid_length": int(valid_length),
"continuous_valid_length": int(continuous_valid_length)
}
def _get_number_summary(scenario):
number_summary_dict = {}
number_summary_dict["object"] = len(scenario[SD.TRACKS])
number_summary_dict["dynamic_object_states"] = len(scenario[SD.DYNAMIC_MAP_STATES])
number_summary_dict["map_features"] = len(scenario[SD.MAP_FEATURES])
number_summary_dict["object_types"] = set(v["type"] for v in scenario[SD.TRACKS].values())
object_types_counter = defaultdict(int)
for v in scenario[SD.TRACKS].values():
object_types_counter[v["type"]] += 1
number_summary_dict["object_types_counter"] = dict(object_types_counter)
# Number of different dynamic object states
dynamic_object_states_types = set()
dynamic_object_states_counter = defaultdict(int)
for v in scenario[SD.DYNAMIC_MAP_STATES].values():
for step_state in v["state"]["object_state"]:
if step_state is None:
continue
dynamic_object_states_types.add(step_state)
dynamic_object_states_counter[step_state] += 1
number_summary_dict["dynamic_object_states_types"] = dynamic_object_states_types
number_summary_dict["dynamic_object_states_counter"] = dict(dynamic_object_states_counter)
return number_summary_dict
def parse_data(file_list, input_path, output_path, worker_index=None):
scenario = scenario_pb2.Scenario()
metadata_recorder = {}
total_scenarios = 0
desc = ""
summary_file = "dataset_summary.pkl"
if worker_index is not None:
desc += "Worker {} ".format(worker_index)
summary_file = "dataset_summary_worker{}.pkl".format(worker_index)
for file_count, file in enumerate(file_list):
file_path = os.path.join(input_path, file)
if ("tfrecord" not in file_path) or (not os.path.isfile(file_path)):
continue
dataset = tf.data.TFRecordDataset(file_path, compression_type="")
total = sum(1 for _ in dataset.as_numpy_iterator())
for j, data in enumerate(dataset.as_numpy_iterator()):
scenario.ParseFromString(data)
md_scenario = SD()
md_scenario[SD.ID] = scenario.scenario_id
md_scenario[SD.VERSION] = DATA_VERSION
# Please note that SDC track index is not identical to sdc_id.
# sdc_id is a unique indicator to a track, while sdc_track_index is only the index of the sdc track
# in the tracks datastructure.
track_length = len(list(scenario.timestamps_seconds))
tracks, sdc_id = extract_tracks(scenario.tracks, scenario.sdc_track_index, track_length)
md_scenario[SD.LENGTH] = track_length
md_scenario[SD.TRACKS] = tracks
dynamic_states = extract_dynamic_map_states(scenario.dynamic_map_states, track_length)
md_scenario[SD.DYNAMIC_MAP_STATES] = dynamic_states
map_features = extract_map_features(scenario.map_features)
md_scenario[SD.MAP_FEATURES] = map_features
compute_width(md_scenario[SD.MAP_FEATURES])
md_scenario[SD.METADATA] = {}
md_scenario[SD.METADATA][SD.COORDINATE] = MetaDriveType.COORDINATE_WAYMO
md_scenario[SD.METADATA][SD.TIMESTEP] = np.asarray(list(scenario.timestamps_seconds), dtype=np.float32)
md_scenario[SD.METADATA][SD.METADRIVE_PROCESSED] = False
md_scenario[SD.METADATA][SD.SDC_ID] = str(sdc_id)
md_scenario[SD.METADATA]["dataset"] = "waymo"
md_scenario[SD.METADATA]["scenario_id"] = scenario.scenario_id
md_scenario[SD.METADATA]["source_file"] = str(file)
md_scenario[SD.METADATA]["track_length"] = track_length
# === Waymo specific data. Storing them here ===
md_scenario[SD.METADATA]["current_time_index"] = scenario.current_time_index
md_scenario[SD.METADATA]["sdc_track_index"] = scenario.sdc_track_index
# obj id
md_scenario[SD.METADATA]["objects_of_interest"] = [str(obj) for obj in scenario.objects_of_interest]
track_index = [obj.track_index for obj in scenario.tracks_to_predict]
track_id = [str(scenario.tracks[ind].id) for ind in track_index]
track_difficulty = [obj.difficulty for obj in scenario.tracks_to_predict]
track_obj_type = [tracks[id]["type"] for id in track_id]
md_scenario[SD.METADATA]["tracks_to_predict"] = {
id: {
"track_index": track_index[count],
"track_id": id,
"difficulty": track_difficulty[count],
"object_type": track_obj_type[count]
}
for count, id in enumerate(track_id)
}
export_file_name = "sd_{}_{}.pkl".format(file, scenario.scenario_id)
summary_dict = {}
summary_dict["sdc"] = _get_agent_summary(
state_dict=md_scenario.get_sdc_track()["state"], id=sdc_id, type=md_scenario.get_sdc_track()["type"]
)
for track_id, track in md_scenario[SD.TRACKS].items():
summary_dict[track_id] = _get_agent_summary(state_dict=track["state"], id=track_id, type=track["type"])
md_scenario[SD.METADATA]["object_summary"] = summary_dict
# Count some objects occurrence
md_scenario[SD.METADATA]["number_summary"] = _get_number_summary(md_scenario)
metadata_recorder[export_file_name] = copy.deepcopy(md_scenario[SD.METADATA])
md_scenario = md_scenario.to_dict()
SD.sanity_check(md_scenario, check_self_type=True)
p = os.path.join(output_path, export_file_name)
with open(p, "wb") as f:
pickle.dump(md_scenario, f)
total_scenarios += 1
if j == total - 1:
print(
f"{desc}Collected {total_scenarios} scenarios. File {file_count + 1}/{len(file_list)} has "
f"{total} Scenarios. The last one is saved at: {p}"
)
summary_file = os.path.join(output_path, summary_file)
with open(summary_file, "wb") as file:
pickle.dump(dict_recursive_remove_array(metadata_recorder), file)
print("Summary is saved at: {}".format(summary_file))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True, help="The data folder storing raw tfrecord from Waymo dataset.")
parser.add_argument(
"--output", default="processed_data", type=str, help="The data folder storing raw tfrecord from Waymo dataset."
)
args = parser.parse_args()
scenario_data_path = args.input
output_path: str = os.path.dirname(scenario_data_path)
output_path = os.path.join(output_path, args.output)
os.makedirs(output_path, exist_ok=True)
raw_data_path = scenario_data_path
# parse raw data from input path to output path,
# there is 1000 raw data in google cloud, each of them produce about 500 pkl file
file_list = os.listdir(raw_data_path)
parse_data(file_list, raw_data_path, output_path)
sys.exit()
# file_path = AssetLoader.file_path("waymo", "processed", "0.pkl", return_raw_style=False)
# data = read_waymo_data(file_path)
# draw_waymo_map(data)

View File

@@ -0,0 +1,5 @@
This folder contains files from a linux compiled `waymo-open-dataset-tf-2.11.0==1.5.0` source files.
We copied them here for compatibility in Windows computer.
The files will not be pushed to MetaDrive public repo for the sake of license.

View File

@@ -0,0 +1,4 @@
#!/usr/bin/env bash
for i in 0 1 2 3 4 5 6 7 8 9; do
nohup python /home/qyli/metadrive/metadrive/utils/waymo/script/convert_waymo_to_metadrive.py ./scenario_${i} > ${i}.log 2>&1 &
done

View File

@@ -0,0 +1,69 @@
import os
import signal
import sys
from tqdm import tqdm
from metadrive.envs.real_data_envs.waymo_env import WaymoEnv
from metadrive.policy.idm_policy import WaymoIDMPolicy
try:
from metadrive.utils.waymo.waymo_type import WaymoAgentType
from metadrive.utils.waymo.waymo_type import WaymoRoadLineType, WaymoRoadEdgeType
finally:
pass
def handler(signum, frame):
raise Exception("end of time")
if __name__ == "__main__":
scenario_data_path = sys.argv[1]
start = int(sys.argv[2])
processed_data_path = scenario_data_path + "_filtered"
if not os.path.exists(processed_data_path):
os.mkdir(processed_data_path)
if not os.path.exists(scenario_data_path) or not os.path.exists(processed_data_path):
raise ValueError("Path Not exist")
num_scenarios = len(os.listdir(scenario_data_path))
max_step = 1500
min_step = 50
env = WaymoEnv(
{
"use_render": False,
"agent_policy": WaymoIDMPolicy,
"data_directory": scenario_data_path,
"start_scenario_index": start * 1000,
"num_scenarios": num_scenarios,
"store_map": False,
# "manual_control": True,
# "debug":True,
"no_traffic": True,
"horizon": 1500,
}
)
try:
env.reset()
except:
pass
finally:
pass
for i in tqdm(range(num_scenarios)):
try:
signal.signal(signal.SIGALRM, handler)
signal.alarm(10)
env.reset(force_seed=i)
while True:
o, r, d, info = env.step([0, 0])
if d or env.episode_step > max_step:
if info["arrive_dest"] and env.episode_step > min_step:
os.rename(
os.path.join(scenario_data_path, "{}.pkl".format(i + start * 1000)),
os.path.join(processed_data_path, "{}.pkl".format(i + start * 1000))
)
break
except:
# print("\n No Route or Timeout, Fail, Seed: {}".format(i))
pass

View File

@@ -0,0 +1,4 @@
#!/usr/bin/env bash
for i in 0 1 2 3 4 5 6 7 8 9; do
nohup python /home/lfeng/metadrive/metadrive/utils/waymo/filter_cases.py /home/lfeng/waymo/scenarios_processed_${i} ${i} > ${i}.log 2>&1 &
done

View File

@@ -0,0 +1,35 @@
import pickle
from metadrive.engine.asset_loader import AssetLoader
if __name__ == '__main__':
with open("waymo120/0408_output_final/dataset_summary.pkl", "rb") as f:
summary_dict = pickle.load(f)
new_summary = {}
for obj_id, summary in summary_dict.items():
if summary["number_summary"]["dynamic_object_states"] == 0:
continue
if summary["object_summary"]["sdc"]["distance"] < 80 or \
summary["object_summary"]["sdc"]["continuous_valid_length"] < 50:
continue
if len(summary["number_summary"]["object_types"]) < 3:
continue
if summary["number_summary"]["object"] < 80:
continue
new_summary[obj_id] = summary
if len(new_summary) >= 3:
break
file_path = AssetLoader.file_path("waymo", "dataset_summary.pkl", return_raw_style=False)
with open(file_path, "wb") as f:
pickle.dump(new_summary, f)
print(new_summary.keys())

View File

@@ -0,0 +1,379 @@
import matplotlib.pyplot as plt
from metadrive.scenario.utils import read_scenario_data
from matplotlib.pyplot import figure
from metadrive.type import MetaDriveType
from metadrive.utils.math import mph_to_kmh
from metadrive.utils.waymo.waymo_type import WaymoLaneType, WaymoAgentType
from metadrive.utils.waymo.waymo_type import WaymoRoadLineType, WaymoRoadEdgeType
try:
import tensorflow as tf
except ImportError:
pass
try:
from metadrive.utils.waymo.protos import scenario_pb2
except ImportError:
pass
import pickle
import numpy as np
from metadrive.scenario.scenario_description import ScenarioDescription
def extract_poly(message):
x = [i.x for i in message]
y = [i.y for i in message]
z = [i.z for i in message]
coord = np.stack((x, y, z), axis=1).astype("float32")
return coord
def extract_boundaries(fb):
b = []
# b = np.zeros([len(fb), 4], dtype="int64")
for k in range(len(fb)):
c = dict()
c["lane_start_index"] = fb[k].lane_start_index
c["lane_end_index"] = fb[k].lane_end_index
c["boundary_type"] = WaymoRoadLineType.from_waymo(fb[k].boundary_type)
c["boundary_feature_id"] = fb[k].boundary_feature_id
for key in c:
c[key] = str(c[key])
b.append(c)
return b
def extract_neighbors(fb):
nbs = []
for k in range(len(fb)):
nb = dict()
nb["feature_id"] = fb[k].feature_id
nb["self_start_index"] = fb[k].self_start_index
nb["self_end_index"] = fb[k].self_end_index
nb["neighbor_start_index"] = fb[k].neighbor_start_index
nb["neighbor_end_index"] = fb[k].neighbor_end_index
for key in nb:
nb[key] = str(nb[key])
nb["boundaries"] = extract_boundaries(fb[k].boundaries)
nbs.append(nb)
return nbs
def extract_center(f):
center = dict()
f = f.lane
center["speed_limit_mph"] = f.speed_limit_mph
center["speed_limit_kmh"] = mph_to_kmh(f.speed_limit_mph)
center["type"] = WaymoLaneType.from_waymo(f.type)
center["polyline"] = extract_poly(f.polyline)
center["interpolating"] = f.interpolating
center["entry_lanes"] = [x for x in f.entry_lanes]
center["exit_lanes"] = [x for x in f.exit_lanes]
center["left_boundaries"] = extract_boundaries(f.left_boundaries)
center["right_boundaries"] = extract_boundaries(f.right_boundaries)
center["left_neighbor"] = extract_neighbors(f.left_neighbors)
center["right_neighbor"] = extract_neighbors(f.right_neighbors)
return center
def extract_line(f):
line = dict()
f = f.road_line
line["type"] = WaymoRoadLineType.from_waymo(f.type)
line["polyline"] = extract_poly(f.polyline)
return line
def extract_edge(f):
edge = dict()
f_ = f.road_edge
# TODO: Need to transform this to MetaDrive version
edge["type"] = WaymoRoadEdgeType.from_waymo(f_.type)
edge["polyline"] = extract_poly(f_.polyline)
return edge
def extract_stop(f):
stop = dict()
f = f.stop_sign
stop["type"] = MetaDriveType.STOP_SIGN
stop["lane"] = [x for x in f.lane]
stop["position"] = np.array([f.position.x, f.position.y, f.position.z], dtype="float32")
return stop
def extract_crosswalk(f):
cross_walk = dict()
f = f.crosswalk
cross_walk["type"] = MetaDriveType.CROSSWALK
cross_walk["polygon"] = extract_poly(f.polygon)
return cross_walk
def extract_bump(f):
speed_bump_data = dict()
f = f.speed_bump
speed_bump_data["type"] = MetaDriveType.SPEED_BUMP
speed_bump_data["polygon"] = extract_poly(f.polygon)
return speed_bump_data
def extract_driveway(f):
driveway_data = dict()
f = f.driveway
driveway_data["type"] = MetaDriveType.DRIVEWAY
driveway_data["polygon"] = extract_poly(f.polygon)
return driveway_data
def extract_tracks(tracks, sdc_idx, track_length):
ret = dict()
def _object_state_template(object_id):
return dict(
type=None,
state=dict(
# Never add extra dim if the value is scalar.
position=np.zeros([track_length, 3], dtype=np.float32),
length=np.zeros([track_length], dtype=np.float32),
width=np.zeros([track_length], dtype=np.float32),
height=np.zeros([track_length], dtype=np.float32),
heading=np.zeros([track_length], dtype=np.float32),
velocity=np.zeros([track_length, 2], dtype=np.float32),
valid=np.zeros([track_length], dtype=bool),
),
metadata=dict(track_length=track_length, type=None, object_id=object_id, dataset="waymo")
)
for obj in tracks:
object_id = str(obj.id)
obj_state = _object_state_template(object_id)
waymo_string = WaymoAgentType.from_waymo(obj.object_type) # Load waymo type string
metadrive_type = MetaDriveType.from_waymo(waymo_string) # Transform it to Waymo type string
obj_state["type"] = metadrive_type
for step_count, state in enumerate(obj.states):
if step_count >= track_length:
break
obj_state["state"]["position"][step_count][0] = state.center_x
obj_state["state"]["position"][step_count][1] = state.center_y
obj_state["state"]["position"][step_count][2] = state.center_z
# l = [state.length for state in obj.states]
# w = [state.width for state in obj.states]
# h = [state.height for state in obj.states]
# obj_state["state"]["size"] = np.stack([l, w, h], 1).astype("float32")
obj_state["state"]["length"][step_count] = state.length
obj_state["state"]["width"][step_count] = state.width
obj_state["state"]["height"][step_count] = state.height
# heading = [state.heading for state in obj.states]
obj_state["state"]["heading"][step_count] = state.heading
obj_state["state"]["velocity"][step_count][0] = state.velocity_x
obj_state["state"]["velocity"][step_count][1] = state.velocity_y
obj_state["state"]["valid"][step_count] = state.valid
obj_state["metadata"]["type"] = metadrive_type
ret[object_id] = obj_state
return ret, str(tracks[sdc_idx].id)
def extract_map_features(map_features):
ret = {}
for lane_state in map_features:
lane_id = str(lane_state.id)
if lane_state.HasField("lane"):
ret[lane_id] = extract_center(lane_state)
if lane_state.HasField("road_line"):
ret[lane_id] = extract_line(lane_state)
if lane_state.HasField("road_edge"):
ret[lane_id] = extract_edge(lane_state)
if lane_state.HasField("stop_sign"):
ret[lane_id] = extract_stop(lane_state)
if lane_state.HasField("crosswalk"):
ret[lane_id] = extract_crosswalk(lane_state)
if lane_state.HasField("speed_bump"):
ret[lane_id] = extract_bump(lane_state)
# Supported only in Waymo dataset 1.2.0
if lane_state.HasField("driveway"):
ret[lane_id] = extract_driveway(lane_state)
return ret
def extract_dynamic_map_states(dynamic_map_states, track_length):
processed_dynamics_map_states = {}
def _traffic_light_state_template(object_id):
return dict(
type=MetaDriveType.TRAFFIC_LIGHT,
state=dict(object_state=[None] * track_length),
lane=None,
stop_point=np.zeros([
3,
], dtype=np.float32),
metadata=dict(
track_length=track_length, type=MetaDriveType.TRAFFIC_LIGHT, object_id=object_id, dataset="waymo"
)
)
for step_count, step_states in enumerate(dynamic_map_states):
# Each step_states is the state of all objects in one time step
lane_states = step_states.lane_states
if step_count >= track_length:
break
for object_state in lane_states:
lane = object_state.lane
object_id = str(lane) # Always use string to specify object id
# We will use lane index to serve as the traffic light index.
if object_id not in processed_dynamics_map_states:
processed_dynamics_map_states[object_id] = _traffic_light_state_template(object_id=object_id)
if processed_dynamics_map_states[object_id]["lane"] is not None:
assert lane == processed_dynamics_map_states[object_id]["lane"]
else:
processed_dynamics_map_states[object_id]["lane"] = lane
object_state_string = object_state.State.Name(object_state.state)
processed_dynamics_map_states[object_id]["state"]["object_state"][step_count] = object_state_string
processed_dynamics_map_states[object_id]["stop_point"][0] = object_state.stop_point.x
processed_dynamics_map_states[object_id]["stop_point"][1] = object_state.stop_point.y
processed_dynamics_map_states[object_id]["stop_point"][2] = object_state.stop_point.z
for obj in processed_dynamics_map_states.values():
assert len(obj["state"]["object_state"]) == obj["metadata"]["track_length"]
return processed_dynamics_map_states
class CustomUnpickler(pickle.Unpickler):
def __init__(self, load_old_scenario, *args, **kwargs):
raise DeprecationWarning("Now we don't pickle any customized data type, so this class is deprecated now")
super(CustomUnpickler, self).__init__(*args, **kwargs)
self.load_old_scenario = load_old_scenario
def find_class(self, module, name):
if self.load_old_scenario:
raise ValueError("Old scenario is completely deprecated. Can't load it any more.")
if name == "AgentType":
return AgentTypeClass
elif name == "RoadLineType":
return RoadLineTypeClass
elif name == "RoadEdgeType":
return RoadEdgeTypeClass
return super().find_class(module, name)
else:
return super().find_class(module, name)
def read_waymo_data(file_path):
return read_scenario_data(file_path)
def draw_waymo_map(data):
"""
TODO: Need this function in future.
"""
figure(figsize=(8, 6), dpi=500)
for key, value in data[ScenarioDescription.MAP_FEATURES].items():
if value.get("type", None) == "center_lane":
plt.scatter([x[0] for x in value["polyline"]], [y[1] for y in value["polyline"]], s=0.5)
elif value.get("type", None) == "road_edge":
plt.scatter([x[0] for x in value["polyline"]], [y[1] for y in value["polyline"]], s=0.5, c=(0, 0, 0))
# elif value.get("type", None) == "road_line":
# plt.scatter([x[0] for x in value["polyline"]], [y[1] for y in value["polyline"]], s=0.5, c=(0.8,0.8,0.8))
plt.show()
# return the nearest point"s index of the line
def nearest_point(point, line):
dist = np.square(line - point)
dist = np.sqrt(dist[:, 0] + dist[:, 1])
return np.argmin(dist)
def extract_width(map, polyline, boundary):
l_width = np.zeros(polyline.shape[0], dtype="float32")
for b in boundary:
boundary_int = {k: int(v) if k != "boundary_type" else v for k, v in b.items()} # All values are int
b_feat_id = str(boundary_int["boundary_feature_id"])
lb = map[b_feat_id]
b_polyline = lb["polyline"][:, :2]
start_p = polyline[boundary_int["lane_start_index"]]
start_index = nearest_point(start_p, b_polyline)
seg_len = boundary_int["lane_end_index"] - boundary_int["lane_start_index"]
end_index = min(start_index + seg_len, lb["polyline"].shape[0] - 1)
length = min(end_index - start_index, seg_len) + 1
self_range = range(boundary_int["lane_start_index"], boundary_int["lane_start_index"] + length)
bound_range = range(start_index, start_index + length)
centerLane = polyline[self_range]
bound = b_polyline[bound_range]
dist = np.square(centerLane - bound)
dist = np.sqrt(dist[:, 0] + dist[:, 1])
l_width[self_range] = dist
return l_width
def compute_width(map):
for map_feat_id, lane in map.items():
if not "LANE" in lane["type"]:
continue
width = np.zeros((lane["polyline"].shape[0], 2), dtype="float32")
width[:, 0] = extract_width(map, lane["polyline"][:, :2], lane["left_boundaries"])
width[:, 1] = extract_width(map, lane["polyline"][:, :2], lane["right_boundaries"])
width[width[:, 0] == 0, 0] = width[width[:, 0] == 0, 1]
width[width[:, 1] == 0, 1] = width[width[:, 1] == 0, 0]
lane["width"] = width
return
# parse raw data from input path to output path
# def convert_polyline_to_metadrive(waymo_polyline, coordinate_transform=True):
# """
# Waymo lane is in a different coordinate system, using them after converting
# """
# convert_polyline_to_metadrive(waymo_polyline, coordinate_transform)

View File

@@ -0,0 +1,85 @@
TrafficSignal = {
0: 'LANE_STATE_UNKNOWN',
1: 'LANE_STATE_ARROW_STOP',
2: 'LANE_STATE_ARROW_CAUTION',
3: 'LANE_STATE_ARROW_GO',
4: 'LANE_STATE_STOP',
5: 'LANE_STATE_CAUTION',
6: 'LANE_STATE_GO',
7: 'LANE_STATE_FLASHING_STOP',
8: 'LANE_STATE_FLASHING_CAUTION'
}
class WaymoLaneType:
UNKNOWN = 0
LANE_FREEWAY = 1
LANE_SURFACE_STREET = 2
LANE_BIKE_LANE = 3
ENUM_TO_STR = {
UNKNOWN: 'UNKNOWN',
LANE_FREEWAY: 'LANE_FREEWAY',
LANE_SURFACE_STREET: 'LANE_SURFACE_STREET',
LANE_BIKE_LANE: 'LANE_BIKE_LANE'
}
@classmethod
def from_waymo(cls, item):
return cls.ENUM_TO_STR[item]
class WaymoRoadLineType:
UNKNOWN = 0
BROKEN_SINGLE_WHITE = 1
SOLID_SINGLE_WHITE = 2
SOLID_DOUBLE_WHITE = 3
BROKEN_SINGLE_YELLOW = 4
BROKEN_DOUBLE_YELLOW = 5
SOLID_SINGLE_YELLOW = 6
SOLID_DOUBLE_YELLOW = 7
PASSING_DOUBLE_YELLOW = 8
ENUM_TO_STR = {
UNKNOWN: 'UNKNOWN',
BROKEN_SINGLE_WHITE: 'ROAD_LINE_BROKEN_SINGLE_WHITE',
SOLID_SINGLE_WHITE: 'ROAD_LINE_SOLID_SINGLE_WHITE',
SOLID_DOUBLE_WHITE: 'ROAD_LINE_SOLID_DOUBLE_WHITE',
BROKEN_SINGLE_YELLOW: 'ROAD_LINE_BROKEN_SINGLE_YELLOW',
BROKEN_DOUBLE_YELLOW: 'ROAD_LINE_BROKEN_DOUBLE_YELLOW',
SOLID_SINGLE_YELLOW: 'ROAD_LINE_SOLID_SINGLE_YELLOW',
SOLID_DOUBLE_YELLOW: 'ROAD_LINE_SOLID_DOUBLE_YELLOW',
PASSING_DOUBLE_YELLOW: 'ROAD_LINE_PASSING_DOUBLE_YELLOW'
}
@classmethod
def from_waymo(cls, item):
return cls.ENUM_TO_STR[item]
class WaymoRoadEdgeType:
UNKNOWN = 0
# Physical road boundary that doesn't have traffic on the other side (e.g., a curb or the k-rail on the right side of a freeway).
BOUNDARY = 1
# Physical road boundary that separates the car from other traffic (e.g. a k-rail or an island).
MEDIAN = 2
ENUM_TO_STR = {UNKNOWN: 'UNKNOWN', BOUNDARY: 'ROAD_EDGE_BOUNDARY', MEDIAN: 'ROAD_EDGE_MEDIAN'}
@classmethod
def from_waymo(cls, item):
return cls.ENUM_TO_STR[item]
class WaymoAgentType:
UNSET = 0
VEHICLE = 1
PEDESTRIAN = 2
CYCLIST = 3
OTHER = 4
ENUM_TO_STR = {UNSET: 'UNSET', VEHICLE: 'VEHICLE', PEDESTRIAN: 'PEDESTRIAN', CYCLIST: 'CYCLIST', OTHER: 'OTHER'}
@classmethod
def from_waymo(cls, item):
return cls.ENUM_TO_STR[item]