Tasks

In short, a Task is a bit more than environment. Task takes an environment, e.g. CartPole, as an input but it also handles state transformation and reward shaping. A Task also aims to be compatible with OpenAI Gym’s API. Some environments aren’t compatible and so we need to make them.

class ai_traineree.tasks.GymTask(env: Union[str, gym.core.Env], state_transform: Optional[Callable] = None, reward_transform: Optional[Callable] = None, can_render=True, stack_frames: int = 1, skip_start_frames: int = 0, **kwargs)
__init__(env: Union[str, gym.core.Env], state_transform: Optional[Callable] = None, reward_transform: Optional[Callable] = None, can_render=True, stack_frames: int = 1, skip_start_frames: int = 0, **kwargs)
Parameters
  • env (gym-like env instance or str) – Something one might get via env = gym.make(‘CartPole-v0’) where gym is OpenAI gym compatible. If env is passed as a string then it is assumed to be a registred Gym with OpenAI interface. In such a case, we got you.

  • state_transform (function) – Default: None. Function that transform state before it’s returned to the observer(s).

  • reward_transform (function) – Default: None. Function that shapes reward before it’s returned to the observer(s). All arguments are expected to be named; supported names: state, action, reward, done, info.

  • can_render (bool) – Default: True. Whether the task can return task state (different than the step observation). Most common case is to provide the game view as the user would have. By default this flag is set to True since the most common use case is OpenAI gym, specifically Atari games.

  • stack_frames (int) – Default: 1. Number of frames to return when performing a step. By default it only returns current observation (MDP). When greater than 1, the returned observation will incude previous observations.

  • skip_start_frames (int) – Default: 0. Often referred as “noop frames”. Indicates how many initial frames to skip. Every reset() will skip a random number of frames in range`[0, skip_start_frames]`.

Example

>>> def reward_transform(*, reward, state, done):
...     return reward + 100*done - state[0]*0.1
>>> task = GymTask(env='CartPole-v1', reward_transform=reward_transform)
step(action: Union[int, float, List]) Tuple

Each action results in a new state, reward, done flag, and info about env.

Parameters

action – An action that the agent is taking in current environment step.

Returns

The return consists of a next state, a reward in that state, a flag whether the next state is terminal and additional information provided by the environment regarding that state.

Return type

step_tuple (Tuple[torch.Tensor, float, bool, Any])

class ai_traineree.tasks.MultiAgentUnityTask(unity_env: gym.core.Env, uint8_visual: bool = False, flatten_branched: bool = False, allow_multiple_obs: bool = False, termination_mode: str = 'any')

Based on UnityToGymWrapper from the Unity’s ML-Toolkits (permalink).

At the time of writting the official package doesn’t support multi agents. Until it’s clear why it doesn’t support (https://github.com/Unity-Technologies/ml-agents/issues/4120) and whether they plan on adding anything, we’re keeping this version. When the fog of unknown has been blown away, we might consider doing a Pull Request to ml-agents.

__init__(unity_env: gym.core.Env, uint8_visual: bool = False, flatten_branched: bool = False, allow_multiple_obs: bool = False, termination_mode: str = 'any')
Parameters
  • unity_env – The Unity BaseEnv to be wrapped in the gym. Will be closed when the UnityToGymWrapper closes.

  • uint8_visual – Return visual observations as uint8 (0-255) matrices instead of float (0.0-1.0). If True, turn branched discrete action spaces into a Discrete space rather than MultiDiscrete.

  • allow_multiple_obs – If True, return a list of np.ndarrays as observations with the first elements containing the visual observations and the last element containing the array of vector observations. If False, returns a single np.ndarray containing either only a single visual observation or the array of vector observations.

  • termination_mode – A string (enum) suggesting when to end an episode. Supports “any”, “majority” and “all” which are atributes on TerminationMode.

close() None

Override _close in your subclass to perform any necessary cleanup. Environments will automatically close() themselves when garbage collected or when the program exits.

detect_game_over(termianl_steps: List) bool

Determine whether the episode has finished.

Expects the terminal_steps to contain only steps that terminated. Note that other steps are possible in the same iteration. This is to keep consistent with Unity’s framework but likely will go through refactoring.

render(mode='rgb_array')

Depending on the mode it will render the scene and either return it, or display.

Parameters

mode – Currently only rgb_array (default) is supported.

Returns

A tensor containing rendered scene. If asked mode is not supported, None is returned.

reset() List[Union[int, List[float]]]

Resets the state of the environment and returns an initial observation. Returns: observation (object/list): the initial observation of the space.

seed(seed: Optional[Any] = None) None

Sets the seed for this env’s random number generator(s). Currently not implemented.

step(action: List[Any], agent_id: int) Tuple[numpy.ndarray, float, bool, Dict]

Run one timestep of the environment’s dynamics. When end of episode is reached, you are responsible for calling reset() to reset this environment’s state. Accepts an action and returns a tuple (observation, reward, done, info).

Parameters

action (object/list) – an action provided by the environment

Returns

agent’s observation of the current environment reward (float/list) : amount of reward returned after previous action done (boolean/list): whether the episode has ended. info (dict): contains auxiliary diagnostic information.

Return type

observation (object/list)

class ai_traineree.tasks.PettingZooTask(env)
__init__(env) None

Wrapper around PettingZoo’s envs to make it more compatible with EnvRunners.

Note: Direct access to wrapped env is through self.env.

Parameters

env – An instance of PettingZoo env.

Example

>>> from pettingzoo.butterfly import prison_v2 as prison
>>> env = prison.env()
>>> task = PettingZooTask(env)
>>> assert env == task.env