Customization
To customize a driving task, one can either adapt existing tasks by modifying the configurations or create a new task from scratch.
Configurations
The simplest way to customize your own task is to modify the configurations in car_dreamer/configs/common.yaml and car_dreamer/configs/tasks.yaml. The common configurations define the observations, terminal conditions, etc., that are re-used by all different tasks; task configurations define each the observation, action, terminal conditions, reward scales, display options, for each specific task. See configurations for more details.
Creating a new environment
In this section, we will guide you through the process of creating a new task script, defining a new observation source, and implementing a new planner.
Create a new file carla_custom_env.py in car_dreamer/. Define a class CarlaCustomEnv inherited from CarlaBaseEnv, and implement the following methods. Specifically, you can call the APIs from car_dreamer.toolkit.WorldManager to spawn vehicles, and use various planners to generate waypoints for the ego vehicle.
- class CarlaBaseEnv(config)
- abstract apply_control(action) None
Override this method to apply control to actors. This method will be called before the simulator ticks.
- get_ego_vehicle() Actor
Override this method to return the ego vehicle. The default behavior is to return self.ego
- abstract get_terminal_conditions() Dict[str, bool]
Override this method to define the terminal condition. If one of the keys in the returned dictionary gives True, the episode will be terminated.
- abstract on_reset() None
Override this method to perform additional reset operations. Specifically, you can spawn actors and plan routes here.
A typical implementation looks like:
import carla
from .carla_base_env import CarlaBaseEnv
from .toolkit import RandomPlanner
class CarlaCustomEnv(CarlaBaseEnv):
def on_reset(self):
# spawn the ego vehicle at a specific location in Town04
# self.ego is a must to inherit from CarlaBaseEnv
spawn_transform = carla.Transform(carla.Location(x=5.8, y=100, z=0.1), carla.Rotation(yaw=-90))
self.ego = self._world_manager.spawn_actor(transform=spawn_transform)
# use random planner to generate waypoints for the ego vehicle
# self.ego_planner is a must to inherit from CarlaWptEnv or CarlaWptFixedEnv
self.ego_planner = RandomPlanner(vehicle=self.ego)
self.waypoints, self.planner_stats = self.ego_planner.run_step()
def apply_control(self, action):
# apply control to the ego vehicle according to action
control = self.get_vehicle_control(action)
self.ego.apply_control(control)
def on_step(self):
# run the planner to generate waypoints for the ego vehicle
self.waypoints, self.planner_stats = self.ego_planner.run_step()
self.num_waypoint_reached = self.planner_stats['num_completed']
def reward(self):
r_waypoint = self.num_waypoint_reached * self._config.waypoint_reward
return r_waypoint, {
'r_waypoint': r_waypoint,
}
def get_terminal_conditions(self):
return {
'is_collision': self.is_collision(),
'time_exceeded': self._time_step > self._config.time_limit,
}
def get_state(self):
return {
'timesteps': self._time_step,
'waypoints': self.waypoints,
}
You can also inherit from CarlaWptEnv or CarlaWptFixedEnv where some of the methods have been implemented.
After you create the environment, the task will be automatically registered in gym with id CarlaCustomEnv-v0. Note that you must follow the same naming style in order to be recognized by the toolkit (Carla{TaskName}Env and carla_{task_name}_env.py).
Finally, create a task configuration in car_dreamer/configs/tasks.yaml. It will be available in self._config in the environment class. For example:
carla_custom:
env:
name: 'CarlaCustomEnv-v0'
observation.enabled: [camera, collision, birdeye_wpt]
# specify value for constants used in the environment
waypoint_reward: 1.0
time_limit: 500
Now you will be able to create the task by car_dreamer.create_task('carla_custom') where carla_custom is the task name in tasks.yaml.
Defining a new observation source
For observations, we have defined several items in car_dreamer/configs/common.yaml. Define env.observation.enabled to specify the items you want to include in the observation data. See observation configuration for more details.
To customize another observation source, create a new handler in car_dreamer/toolkit/observer/handlers/ inherited from BaseHandler. Then implement the following methods:
- class BaseHandler(world: WorldManager, config)
Base class for data endpoint handlers.
For example, if you want to include the speed and the number of remaining waypoints in the observation data, you can implement a new handler as follows:
from gym import spaces
import numpy as np
from .base_handler import BaseHandler
from ...carla_manager import get_vehicle_velocity
class CustomHandler(BaseHandler):
def get_observation_space(self):
return {
'num_waypoints': spaces.Box(low=0, high=65535, shape=(1,), dtype=np.uint16),
'speed': spaces.Box(low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32),
}
def get_observation(self, env_state):
# 'waypoints' should be returned by the environment
num_waypoints = len(env_state['waypoints'])
# speed is directly obtained from CARLA
speed = get_vehicle_velocity(self.ego)
return {
'num_waypoints': np.array([num_waypoints], dtype=np.uint16),
'speed': np.array(speed, dtype=np.float32),
}
def destroy(self):
pass
def reset(self, ego):
self.ego = ego
Add a new enum in car_dreamer/toolkit/observer/utils.py:
from .handlers.custom_handler import CustomHandler
class HandlerType(Enum):
...
CUSTOM = 'custom'
HANDLER_DICT = {
...
HandlerType.CUSTOM: CustomHandler,
}
Add a new configuration item in car_dreamer/configs/common.yaml where custom is the handler name above (CUSTOM = 'custom'):
env:
...
observation:
...
custom:
handler: 'custom'
Finally, you can include the new observation source by adding custom to env.observation.enabled in the task configuration.
Creating a new planner
All planners should expose the interface:
- BasePlanner.run_step()
Run one step of the route planner, extending the route and removing expired waypoints.
- Returns:
tuple(list of waypoints
(x, y, yaw), additional stats)
We have implemented a base class BasePlanner for you to inherit from. All you need to do is to define how to initialize and extend the route when running each step:
- abstract BasePlanner.init_route()
Initialize waypoints queue. It is called only once at the first step.
- abstract BasePlanner.extend_route()
Extend waypoints queue. It is called at every step.
Additionally, you can call the following methods to get, add, or pop waypoints stored in the planner:
- class BasePlanner(vehicle: Actor, max_waypoints=60, reach_threshold=0.5)
Base class for route planner. All route planners exposes a
run_stepmethod. To inherit, implementinit_routeandextend_route.- add_waypoint(waypoint)
Add waypoints to the queue.
- Parameters:
waypoints – waypoint
(x, y, yaw)or carla.Waypoint
- clear_waypoints()
Clear waypoints queue.
- from_carla_waypoint(waypoint)
Convert a carla.Waypoint to a waypoint
(x, y, yaw).- Parameters:
waypoint – carla.Waypoint
- Returns:
tuple
(x, y, yaw)
- get_all_waypoints()
Get all waypoints in the queue.
- Returns:
list of waypoints
(x, y, yaw)
- get_waypoint_num()
Get the number of waypoints in the queue.
- Returns:
int, number of waypoints in the queue
- pop_waypoint()
Pop a waypoint from the queue.