Customization

To customize a driving task, one can either adapt existing tasks by modifying the configurations or create a new task from scratch.

Configurations

The simplest way to customize your own task is to modify the configurations in car_dreamer/configs/common.yaml and car_dreamer/configs/tasks.yaml. The common configurations define the observations, terminal conditions, etc., that are re-used by all different tasks; task configurations define each the observation, action, terminal conditions, reward scales, display options, for each specific task. See configurations for more details.

Creating a new environment

In this section, we will guide you through the process of creating a new task script, defining a new observation source, and implementing a new planner.

Create a new file carla_custom_env.py in car_dreamer/. Define a class CarlaCustomEnv inherited from CarlaBaseEnv, and implement the following methods. Specifically, you can call the APIs from car_dreamer.toolkit.WorldManager to spawn vehicles, and use various planners to generate waypoints for the ego vehicle.

class CarlaBaseEnv(config)
abstract apply_control(action) None

Override this method to apply control to actors. This method will be called before the simulator ticks.

get_ego_vehicle() Actor

Override this method to return the ego vehicle. The default behavior is to return self.ego

get_state() Dict

Return the environment state. Implement this method to define the env state.

abstract get_terminal_conditions() Dict[str, bool]

Override this method to define the terminal condition. If one of the keys in the returned dictionary gives True, the episode will be terminated.

abstract on_reset() None

Override this method to perform additional reset operations. Specifically, you can spawn actors and plan routes here.

abstract on_step() None

Override this method to perform additional operations at each step. Specifically, you can update the planner and the route here. This method will be called after the simulator ticks.

abstract reward() Tuple[float, Dict]

Override this method to define the reward function.

A typical implementation looks like:

import carla

from .carla_base_env import CarlaBaseEnv
from .toolkit import RandomPlanner

class CarlaCustomEnv(CarlaBaseEnv):
    def on_reset(self):
        # spawn the ego vehicle at a specific location in Town04
        # self.ego is a must to inherit from CarlaBaseEnv
        spawn_transform = carla.Transform(carla.Location(x=5.8, y=100, z=0.1), carla.Rotation(yaw=-90))
        self.ego = self._world_manager.spawn_actor(transform=spawn_transform)

        # use random planner to generate waypoints for the ego vehicle
        # self.ego_planner is a must to inherit from CarlaWptEnv or CarlaWptFixedEnv
        self.ego_planner = RandomPlanner(vehicle=self.ego)
        self.waypoints, self.planner_stats = self.ego_planner.run_step()

    def apply_control(self, action):
        # apply control to the ego vehicle according to action
        control = self.get_vehicle_control(action)
        self.ego.apply_control(control)

    def on_step(self):
        # run the planner to generate waypoints for the ego vehicle
        self.waypoints, self.planner_stats = self.ego_planner.run_step()
        self.num_waypoint_reached = self.planner_stats['num_completed']

    def reward(self):
        r_waypoint = self.num_waypoint_reached * self._config.waypoint_reward
        return r_waypoint, {
            'r_waypoint': r_waypoint,
        }

    def get_terminal_conditions(self):
        return {
            'is_collision': self.is_collision(),
            'time_exceeded': self._time_step > self._config.time_limit,
        }

    def get_state(self):
        return {
            'timesteps': self._time_step,
            'waypoints': self.waypoints,
        }

You can also inherit from CarlaWptEnv or CarlaWptFixedEnv where some of the methods have been implemented.

After you create the environment, the task will be automatically registered in gym with id CarlaCustomEnv-v0. Note that you must follow the same naming style in order to be recognized by the toolkit (Carla{TaskName}Env and carla_{task_name}_env.py).

Finally, create a task configuration in car_dreamer/configs/tasks.yaml. It will be available in self._config in the environment class. For example:

carla_custom:
    env:
    name: 'CarlaCustomEnv-v0'
    observation.enabled: [camera, collision, birdeye_wpt]
    # specify value for constants used in the environment
    waypoint_reward: 1.0
    time_limit: 500

Now you will be able to create the task by car_dreamer.create_task('carla_custom') where carla_custom is the task name in tasks.yaml.

Defining a new observation source

For observations, we have defined several items in car_dreamer/configs/common.yaml. Define env.observation.enabled to specify the items you want to include in the observation data. See observation configuration for more details.

To customize another observation source, create a new handler in car_dreamer/toolkit/observer/handlers/ inherited from BaseHandler. Then implement the following methods:

class BaseHandler(world: WorldManager, config)

Base class for data endpoint handlers.

abstract destroy() None

Clean up resources

abstract get_observation(env_state: Dict) Tuple[Dict, Dict]

Fetch observations from the environment.

Parameters:

env_state – The state returned by carla_manager.CarlaBaseEnv.get_state.

abstract get_observation_space() Dict

Get the observation space for this data endpoint.

abstract reset(ego: Actor) None

Reset the handler

For example, if you want to include the speed and the number of remaining waypoints in the observation data, you can implement a new handler as follows:

from gym import spaces
import numpy as np

from .base_handler import BaseHandler
from ...carla_manager import get_vehicle_velocity

class CustomHandler(BaseHandler):

    def get_observation_space(self):
        return {
             'num_waypoints': spaces.Box(low=0, high=65535, shape=(1,), dtype=np.uint16),
             'speed': spaces.Box(low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32),
           }

    def get_observation(self, env_state):
        # 'waypoints' should be returned by the environment
        num_waypoints = len(env_state['waypoints'])
        # speed is directly obtained from CARLA
        speed = get_vehicle_velocity(self.ego)
        return {
            'num_waypoints': np.array([num_waypoints], dtype=np.uint16),
             'speed': np.array(speed, dtype=np.float32),
        }

    def destroy(self):
        pass

    def reset(self, ego):
        self.ego = ego

Add a new enum in car_dreamer/toolkit/observer/utils.py:

from .handlers.custom_handler import CustomHandler

class HandlerType(Enum):
    ...
    CUSTOM = 'custom'

HANDLER_DICT = {
    ...
    HandlerType.CUSTOM: CustomHandler,
}

Add a new configuration item in car_dreamer/configs/common.yaml where custom is the handler name above (CUSTOM = 'custom'):

env:
    ...
    observation:
        ...
        custom:
            handler: 'custom'

Finally, you can include the new observation source by adding custom to env.observation.enabled in the task configuration.

Creating a new planner

All planners should expose the interface:

BasePlanner.run_step()

Run one step of the route planner, extending the route and removing expired waypoints.

Returns:

tuple(list of waypoints (x, y, yaw), additional stats)

We have implemented a base class BasePlanner for you to inherit from. All you need to do is to define how to initialize and extend the route when running each step:

abstract BasePlanner.init_route()

Initialize waypoints queue. It is called only once at the first step.

abstract BasePlanner.extend_route()

Extend waypoints queue. It is called at every step.

Additionally, you can call the following methods to get, add, or pop waypoints stored in the planner:

class BasePlanner(vehicle: Actor, max_waypoints=60, reach_threshold=0.5)

Base class for route planner. All route planners exposes a run_step method. To inherit, implement init_route and extend_route.

add_waypoint(waypoint)

Add waypoints to the queue.

Parameters:

waypoints – waypoint (x, y, yaw) or carla.Waypoint

clear_waypoints()

Clear waypoints queue.

from_carla_waypoint(waypoint)

Convert a carla.Waypoint to a waypoint (x, y, yaw).

Parameters:

waypoint – carla.Waypoint

Returns:

tuple (x, y, yaw)

get_all_waypoints()

Get all waypoints in the queue.

Returns:

list of waypoints (x, y, yaw)

get_waypoint_num()

Get the number of waypoints in the queue.

Returns:

int, number of waypoints in the queue

pop_waypoint()

Pop a waypoint from the queue.