Source code for embodichain.lab.gym.envs.managers.actions

# ----------------------------------------------------------------------------
# Copyright (c) 2021-2026 DexForce Technology Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------

"""Action terms for processing policy actions into robot control commands.

This module provides concrete implementations of :class:`ActionTerm` that convert
raw policy actions into different control formats (e.g., joint positions, velocities,
forces, or end-effector poses).

The action terms are typically used in conjunction with :class:`ActionManager` which
handles calling the appropriate term based on configuration.

Available action terms:

- :class:`DeltaQposTerm`: Delta joint position (current + scale * action)
- :class:`QposTerm`: Absolute joint position (scale * action)
- :class:`QposDenormalizedTerm`: Normalized action [-1,1] -> joint limits
- :class:`EefPoseTerm`: End-effector pose -> IK -> joint position
- :class:`QvelTerm`: Joint velocity (scale * action)
- :class:`QfTerm`: Joint force/torque (scale * action)
"""

from __future__ import annotations

from typing import TYPE_CHECKING

import torch
from tensordict import TensorDict

from embodichain.lab.sim.types import EnvAction
from embodichain.utils.math import matrix_from_euler, matrix_from_quat
from .action_manager import ActionTerm
from .cfg import ActionTermCfg

# Import ActionTerm from action_manager after it's defined
# This is a late import to avoid circular dependency
if TYPE_CHECKING:
    from embodichain.lab.gym.envs import EmbodiedEnv


__all__ = [
    "DeltaQposTerm",
    "QposTerm",
    "QposDenormalizedTerm",
    "QposNormalizedTerm",
    "EefPoseTerm",
    "QvelTerm",
    "QfTerm",
]


# ----------------------------------------------------------------------------
# Concrete ActionTerm implementations
# ----------------------------------------------------------------------------


[docs] class DeltaQposTerm(ActionTerm): """Delta joint position action: current_qpos + scale * action -> qpos. This action term adds a scaled delta to the current joint positions. Useful for relative position control where the policy outputs position offsets. Args: scale: Scaling factor for the action. Defaults to 1.0. Example: >>> cfg = ActionTermCfg(func=DeltaQposTerm, params={"scale": 0.1}) >>> term = DeltaQposTerm(cfg, env) >>> action = torch.ones(num_envs, dof) * 2.0 >>> result = term.process_action(action) >>> # result["qpos"] = current_qpos + 0.1 * action """
[docs] def __init__(self, cfg: ActionTermCfg, env: EmbodiedEnv): super().__init__(cfg, env) self._scale = cfg.params.get("scale", 1.0)
@property def input_key(self) -> str: return "qpos" @property def action_dim(self) -> int: return len(self._env.active_joint_ids)
[docs] def process_action(self, action: torch.Tensor) -> torch.Tensor: return action * self._scale + self._env.robot.get_qpos()
[docs] class QposTerm(ActionTerm): """Absolute joint position action: scale * action -> qpos. This action term directly uses the scaled action as target joint positions. Useful for absolute position control. Args: scale: Scaling factor for the action. Defaults to 1.0. Example: >>> cfg = ActionTermCfg(func=QposTerm, params={"scale": 1.0}) >>> term = QposTerm(cfg, env) >>> action = torch.ones(num_envs, dof) * 0.5 >>> result = term.process_action(action) >>> # result["qpos"] = 0.5 * action """
[docs] def __init__(self, cfg: ActionTermCfg, env: EmbodiedEnv): super().__init__(cfg, env) self._scale = cfg.params.get("scale", 1.0)
@property def input_key(self) -> str: return "qpos" @property def action_dim(self) -> int: return len(self._env.active_joint_ids)
[docs] def process_action(self, action: torch.Tensor) -> torch.Tensor: qpos = action * self._scale return qpos
[docs] class QposDenormalizedTerm(ActionTerm): """Normalized action in [range[0], range[1]] -> denormalize to joint limits -> qpos. The policy outputs normalized actions in the range [range[0], range[1]] which are then mapped to the joint's position limits. The policy output is scaled by ``params.scale`` before denormalization. With scale=1.0 (default), action in [range[0], range[1]] maps to [low, high]. With scale<1.0, the effective range shrinks toward the center (e.g. scale=0.5 maps to 25%-75% of joint range). Use scale=1.0 for standard normalized control. Args: scale: Scaling factor applied before denormalization. Defaults to 1.0. joint_ids: List of joint IDs to apply the action to. Defaults to all active joints. range: The range of the normalized action. Defaults to [-1.0, 1.0]. Example: >>> cfg = ActionTermCfg(func=QposDenormalizedTerm, params={"scale": 1.0}) >>> term = QposDenormalizedTerm(cfg, env) >>> action = torch.tensor([[-1.0, 1.0], [0.0, 0.0]]) # min/max per joint >>> result = term.process_action(action) >>> # Maps [-1, 1] to [qpos_limits_low, qpos_limits_high] """
[docs] def __init__(self, cfg: ActionTermCfg, env: EmbodiedEnv): super().__init__(cfg, env) self._scale = cfg.params.get("scale", 1.0) self._joint_ids = cfg.params.get("joint_ids", self._env.active_joint_ids) self._range = cfg.params.get("range", [-1.0, 1.0])
@property def input_key(self) -> str: return "qpos" @property def action_dim(self) -> int: return len(self._env.active_joint_ids)
[docs] def process_action(self, action: torch.Tensor) -> torch.Tensor: scaled = action * self._scale qpos_limits = self._env.robot.body_data.qpos_limits[0, self._joint_ids] low = qpos_limits[:, 0] high = qpos_limits[:, 1] scaled[:, self._joint_ids] = low + ( scaled[:, self._joint_ids] - self._range[0] ) / (self._range[1] - self._range[0]) * (high - low) return scaled
class QposNormalizedTerm(ActionTerm): """Normalize action from qpos limits -> [range[0], range[1]]. Map joint positions to a normalized range [range[0], range[1]] based on the joint limits. This is the usually used for post processing the output of action. Args: joint_ids: List of joint IDs to apply the action to. Defaults to all active joints. range: The range of the normalized action. Defaults to [0.0, 1.0]. Example: >>> cfg = ActionTermCfg(func=QposNormalizedTerm) >>> term = QposNormalizedTerm(cfg, env) >>> action = torch.tensor([[-1.0, 1.0], [0.0, 0.0]]) # min/max per joint >>> result = term.process_action(action) >>> # Maps [-1, 1] to [0, 1] based on joint limits """ def __init__(self, cfg: ActionTermCfg, env: EmbodiedEnv): super().__init__(cfg, env) self._joint_ids = cfg.params.get("joint_ids", self._env.active_joint_ids) self._range = cfg.params.get("range", [0.0, 1.0]) @property def input_key(self) -> str: return "qpos" @property def action_dim(self) -> int: return len(self._env.active_joint_ids) def process_action(self, action: torch.Tensor) -> torch.Tensor: qpos_limits = self._env.robot.body_data.qpos_limits[0, self._joint_ids] low = qpos_limits[:, 0] high = qpos_limits[:, 1] action[:, self._joint_ids] = (action[:, self._joint_ids] - low) / ( high - low ) * (self._range[1] - self._range[0]) + self._range[0] return action
[docs] class EefPoseTerm(ActionTerm): """End-effector pose (6D or 7D) -> IK -> qpos. The policy outputs a target end-effector pose which is converted to joint positions using inverse kinematics. Supports two pose representations: - 6D: position (3) + Euler angles (3) - 7D: position (3) + quaternion (4) On IK failure, falls back to current_qpos for that env. Returns ``ik_success`` in the TensorDict so reward/observation can penalize or condition on IK failures. Args: scale: Scaling factor for the pose. Defaults to 1.0. pose_dim: Dimension of the pose (6 for Euler, 7 for quaternion). Defaults to 7. Example: >>> cfg = ActionTermCfg(func=EefPoseTerm, params={"scale": 1.0, "pose_dim": 7}) >>> term = EefPoseTerm(cfg, env) >>> # 7D: position (3) + quaternion (4) >>> action = torch.zeros(num_envs, 7) >>> action[:, :3] = 0.1 # target position >>> action[:, 3] = 1.0 # quaternion w >>> result = term.process_action(action) >>> # result["qpos"] = IK solution >>> # result["ik_success"] = bool tensor indicating IK success """
[docs] def __init__(self, cfg: ActionTermCfg, env: EmbodiedEnv): super().__init__(cfg, env) self._scale = cfg.params.get("scale", 1.0) self._pose_dim = cfg.params.get("pose_dim", 7) # 6 for euler, 7 for quat
@property def input_key(self) -> str: return "eef_pose" @property def action_dim(self) -> int: return self._pose_dim
[docs] def process_action(self, action: torch.Tensor) -> EnvAction: scaled = action * self._scale current_qpos = self._env.robot.get_qpos() batch_size = scaled.shape[0] target_pose = ( torch.eye(4, device=self.device).unsqueeze(0).repeat(batch_size, 1, 1) ) if scaled.shape[-1] == 6: target_pose[:, :3, 3] = scaled[:, :3] target_pose[:, :3, :3] = matrix_from_euler(scaled[:, 3:6]) elif scaled.shape[-1] == 7: target_pose[:, :3, 3] = scaled[:, :3] target_pose[:, :3, :3] = matrix_from_quat(scaled[:, 3:7]) else: raise ValueError( f"EEF pose action must be 6D or 7D, got {scaled.shape[-1]}D" ) # Batch IK: robot.compute_ik supports (n_envs, 4, 4) pose and (n_envs, dof) seed ret, qpos_ik = self._env.robot.compute_ik( pose=target_pose, joint_seed=current_qpos, ) # Fallback to current_qpos where IK failed result_qpos = torch.where( ret.unsqueeze(-1).expand_as(qpos_ik), qpos_ik, current_qpos ) return TensorDict( {"qpos": result_qpos, "ik_success": ret}, batch_size=[batch_size], device=self.device, )
[docs] class QvelTerm(ActionTerm): """Joint velocity action: scale * action -> qvel. This action term outputs target joint velocities. Useful for velocity control tasks. Args: scale: Scaling factor for the action. Defaults to 1.0. Example: >>> cfg = ActionTermCfg(func=QvelTerm, params={"scale": 0.2}) >>> term = QvelTerm(cfg, env) >>> action = torch.ones(num_envs, dof) >>> result = term.process_action(action) >>> # result["qvel"] = 0.2 * action """
[docs] def __init__(self, cfg: ActionTermCfg, env: EmbodiedEnv): super().__init__(cfg, env) self._scale = cfg.params.get("scale", 1.0)
@property def input_key(self) -> str: return "qvel" @property def action_dim(self) -> int: return len(self._env.active_joint_ids)
[docs] def process_action(self, action: torch.Tensor) -> torch.Tensor: return action * self._scale
[docs] class QfTerm(ActionTerm): """Joint force/torque action: scale * action -> qf. This action term outputs target joint forces/torques. Useful for impedance control or force-based tasks. Args: scale: Scaling factor for the action. Defaults to 1.0. Example: >>> cfg = ActionTermCfg(func=QfTerm, params={"scale": 10.0}) >>> term = QfTerm(cfg, env) >>> action = torch.ones(num_envs, dof) >>> result = term.process_action(action) >>> # result["qf"] = 10.0 * action """
[docs] def __init__(self, cfg: ActionTermCfg, env: EmbodiedEnv): super().__init__(cfg, env) self._scale = cfg.params.get("scale", 1.0)
@property def input_key(self) -> str: return "qf" @property def action_dim(self) -> int: return len(self._env.active_joint_ids)
[docs] def process_action(self, action: torch.Tensor) -> torch.Tensor: return action * self._scale