Source code for embodichain.lab.gym.envs.managers.randomization.rendering

# ----------------------------------------------------------------------------
# Copyright (c) 2021-2025 DexForce Technology Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------

from __future__ import annotations

import torch
import os
import random
from typing import TYPE_CHECKING, Literal, Union, Optional, Dict

from embodichain.lab.sim.objects import Light, RigidObject, Articulation
from embodichain.lab.sim.sensors import Camera, StereoCamera
from embodichain.lab.gym.envs.managers.cfg import SceneEntityCfg
from embodichain.lab.gym.envs.managers import Functor, FunctorCfg
from embodichain.lab.sim import (
    VisualMaterial,
    VisualMaterialInst,
    VisualMaterialCfg,
)
from embodichain.utils.string import resolve_matching_names
from embodichain.utils.math import sample_uniform
from embodichain.utils import logger
from embodichain.data import get_data_path

if TYPE_CHECKING:
    from embodichain.lab.gym.envs import EmbodiedEnv


__all__ = [
    "randomize_light",
    "randomize_camera_intrinsics",
    "randomize_visual_material",
]


[docs] def randomize_light( env: EmbodiedEnv, env_ids: Union[torch.Tensor, None], entity_cfg: SceneEntityCfg, position_range: Optional[tuple[list[float], list[float]]] = None, color_range: Optional[tuple[list[float], list[float]]] = None, intensity_range: Optional[tuple[float, float]] = None, ) -> None: """Randomize light properties by adding, scaling, or setting random values. This function allows randomizing light properties in the scene. The function samples random values from the given distribution parameters and adds, scales, or sets the values into the physics simulation based on the operation. The distribution parameters are lists of two elements each, representing the lower and upper bounds of the distribution for the x, y, and z components of the light properties. The function samples random values for each component independently. .. attention:: This function applied the same light properties for all the environments. position_range is the x, y, z value added into light's cfg.init_pos. color_range is the absolute r, g, b value set to the light object. intensity_range is the value added into light's cfg.intensity. .. tip:: This function uses CPU tensors to assign light properties. Args: env (EmbodiedEnv): The environment instance. env_ids (Union[torch.Tensor, None]): The environment IDs to apply the randomization. entity_cfg (SceneEntityCfg): The configuration of the scene entity to randomize. position_range (Optional[tuple[list[float], list[float]]]): The range for the position randomization. color_range (Optional[tuple[list[float], list[float]]]): The range for the color randomization. intensity_range (Optional[tuple[float, float]]): The range for the intensity randomization. """ light: Light = env.sim.get_light(entity_cfg.uid) num_instance = len(env_ids) if position_range: init_pos = light.cfg.init_pos new_pos = ( torch.tensor(init_pos, dtype=torch.float32) .unsqueeze_(0) .repeat(num_instance, 1) ) random_value = sample_uniform( lower=torch.tensor(position_range[0]), upper=torch.tensor(position_range[1]), size=new_pos.shape, ) new_pos += random_value light.set_local_pose(new_pos, env_ids=env_ids) if color_range: color = torch.zeros((num_instance, 3), dtype=torch.float32) random_value = sample_uniform( lower=torch.tensor(color_range[0]), upper=torch.tensor(color_range[1]), size=color.shape, ) color += random_value light.set_color(color, env_ids=env_ids) if intensity_range: init_intensity = light.cfg.intensity new_intensity = ( torch.tensor(init_intensity, dtype=torch.float32) .unsqueeze_(0) .repeat(num_instance, 1) ) random_value = sample_uniform( lower=torch.tensor(intensity_range[0]), upper=torch.tensor(intensity_range[1]), size=new_intensity.shape, ) new_intensity += random_value new_intensity.squeeze_(1) light.set_intensity(new_intensity, env_ids=env_ids)
[docs] def randomize_camera_intrinsics( env: EmbodiedEnv, env_ids: Union[torch.Tensor, None], entity_cfg: SceneEntityCfg, focal_x_range: Optional[tuple[float, float]] = None, focal_y_range: Optional[tuple[float, float]] = None, cx_range: Optional[tuple[float, float]] = None, cy_range: Optional[tuple[float, float]] = None, ) -> None: """Randomize camera intrinsic properties by adding, scaling, or setting random values. This function allows randomizing camera intrinsic parameters in the scene. The function samples random values from the given distribution parameters and adds, scales, or sets the values into the physics simulation based on the operation. The distribution parameters are tuples of two elements each, representing the lower and upper bounds of the distribution for the focal length (fx, fy) and principal point (cx, cy) components of the camera intrinsics. The function samples random values for each component independently. .. attention:: This function applies the same intrinsic properties for all the environments. focal_x_range and focal_y_range are values added to the camera's current fx and fy values. focal_xy_range is a combined range for both fx and fy, where the range is specified as [[fx_min, fy_min], [fx_max, fy_max]]. cx_range and cy_range are values added to the camera's current cx and cy values. .. tip:: This function uses CPU tensors to assign camera intrinsic properties. Args: env (EmbodiedEnv): The environment instance. env_ids (Union[torch.Tensor, None]): The environment IDs to apply the randomization. entity_cfg (SceneEntityCfg): The configuration of the scene entity to randomize. focal_x_range (Optional[tuple[float, float]]): The range for the focal length x randomization. focal_y_range (Optional[tuple[float, float]]): The range for the focal length y randomization. cx_range (Optional[tuple[float, float]]): The range for the principal point x randomization. cy_range (Optional[tuple[float, float]]): The range for the principal point y randomization. """ camera: Union[Camera, StereoCamera] = env.sim.get_sensor(entity_cfg.uid) num_instance = len(env_ids) # Get current intrinsics as baseline current_intrinsics = camera.cfg.intrinsics # (fx, fy, cx, cy) # Create new intrinsics tensor for all instances new_intrinsics = ( torch.tensor(current_intrinsics, dtype=torch.float32) .unsqueeze(0) .repeat(num_instance, 1) ) # Randomize focal length x (fx) if focal_x_range: random_value = sample_uniform( lower=torch.tensor(focal_x_range[0]), upper=torch.tensor(focal_x_range[1]), size=(num_instance,), ) new_intrinsics[:, 0] += random_value # Randomize focal length y (fy) if focal_y_range: random_value = sample_uniform( lower=torch.tensor(focal_y_range[0]), upper=torch.tensor(focal_y_range[1]), size=(num_instance,), ) new_intrinsics[:, 1] += random_value # Randomize principal point x (cx) if cx_range: random_value = sample_uniform( lower=torch.tensor(cx_range[0]), upper=torch.tensor(cx_range[1]), size=(num_instance,), ) new_intrinsics[:, 2] += random_value # Randomize principal point y (cy) if cy_range: random_value = sample_uniform( lower=torch.tensor(cy_range[0]), upper=torch.tensor(cy_range[1]), size=(num_instance,), ) new_intrinsics[:, 3] += random_value camera.set_intrinsics(new_intrinsics, env_ids=env_ids)
[docs] class randomize_visual_material(Functor): """Randomize the the visual material properties of a RigidObject or an Articulation. Note: 1. Currently supported randomized properties include: - base_color: RGB color of the material. Value should be in [0, 1], shape of (3,) - base_color_texture: Texture image for the base color of the material. The textures will be preloaded from the given texture_path during initialization. - metallic: Metallic property of the material. Value should be in [0, 1]. - roughness: Roughness property of the material. Value should be in [0, 1]. - ior: Index of Refraction of the material (only supported in ray tracing mode). 2. The default ground plane can also be randomized by setting entity_cfg.uid to "default_plane". """
[docs] def __init__(self, cfg: FunctorCfg, env: EmbodiedEnv): """Initialize the term. Args: cfg: The configuration of the functor. env: The environment instance. Raises: ValueError: If the asset is not a RigidObject or an Articulation. """ super().__init__(cfg, env) self.entity_cfg: SceneEntityCfg = cfg.params["entity_cfg"] # special case: default ground plane. if self.entity_cfg.uid == "default_plane": pass else: self.entity: Union[RigidObject, Articulation] = env.sim.get_asset( self.entity_cfg.uid ) if not isinstance(self.entity, (RigidObject, Articulation)): raise ValueError( f"Randomization functor 'randomize_visual_material' not supported for asset: '{self.entity_cfg.uid}'" f" with type: '{type(self.entity)}'." ) # TODO: Maybe need to consider two cases: # 1. the texture folder is very large, and we don't want to load all the textures into memory. # 2. the texture is generated on the fly. # Preload textures (currently only base color textures are supported) self.textures = [] texture_path = get_data_path(cfg.params.get("texture_path", None)) if texture_path is not None: from embodichain.utils.utility import read_all_folder_images texture_key = os.path.basename(texture_path) # check if the texture group is already loaded in the global texture cache if texture_key in env.sim.get_texture_cache(): logger.log_info( f"Texture group '{texture_key}' is already loaded in the global texture cache." ) self.textures = env.sim.get_texture_cache(texture_key) else: self.textures = read_all_folder_images(texture_path) # padding the texture with alpha channel if not exist for i in range(len(self.textures)): if self.textures[i].shape[2] == 3: data = torch.as_tensor(self.textures[i]) alpha_channel = ( torch.ones( (data.shape[0], data.shape[1], 1), dtype=data.dtype ) * 255 ) data = torch.cat((data, alpha_channel), dim=2) self.textures[i] = data env.sim.set_texture_cache(texture_key, self.textures) if self.entity_cfg.uid == "default_plane": pass else: # TODO: we may need to get the default material instance from the asset itself. mat: VisualMaterial = env.sim.create_visual_material( cfg=VisualMaterialCfg( base_color=[1.0, 1.0, 1.0, 1.0], uid=f"{self.entity_cfg.uid}_random_mat", ) ) if isinstance(self.entity, RigidObject): self.entity.set_visual_material(mat) elif isinstance(self.entity, Articulation): _, link_names = resolve_matching_names( self.entity_cfg.link_names, self.entity.link_names ) self.entity_cfg.link_names = link_names self.entity.set_visual_material(mat, link_names=link_names)
[docs] @staticmethod def gen_random_base_color_texture(width: int, height: int) -> torch.Tensor: """Generate a random base color texture. Args: width: The width of the texture. height: The height of the texture. Returns: A torch tensor representing the random base color texture with shape (height, width, 4). """ # Generate random RGB values rgb = torch.ones((height, width, 3), dtype=torch.float32) rgb *= torch.rand((1, 1, 3), dtype=torch.float32) rgba = torch.cat((rgb, torch.ones((height, width, 1))), dim=2) rgba = (rgba * 255).to(torch.uint8) return rgba
def _randomize_texture(self, mat_inst: VisualMaterialInst) -> None: if len(self.textures) > 0: # Randomly select a texture from the preloaded textures texture_idx = torch.randint(0, len(self.textures), (1,)).item() mat_inst.set_base_color_texture(texture_data=self.textures[texture_idx]) def _randomize_mat_inst( self, mat_inst: VisualMaterialInst, plan: Dict[str, torch.Tensor], random_texture_prob: float, idx: int = 0, ) -> None: # randomize the material instance pbr properties based on the plan. for key, value in plan.items(): if key == "base_color": mat_inst.set_base_color(value[idx].tolist()) else: getattr(mat_inst, f"set_{key}")(value[idx].item()) # randomize texture or base color based on the probability. if random_texture_prob <= 0.0 or len(self.textures) == 0: return if random.random() < random_texture_prob: self._randomize_texture(mat_inst) else: # set a random base color instead. random_color = torch.rand(3).tolist() random_color.append(1.0) # alpha mat_inst.set_base_color(random_color) def __call__( self, env: EmbodiedEnv, env_ids: Union[torch.Tensor, None], entity_cfg: SceneEntityCfg, random_texture_prob: float = 0.5, texture_path: Optional[str] = None, base_color_range: Optional[tuple[list[float], list[float]]] = None, metallic_range: Optional[tuple[float, float]] = None, roughness_range: Optional[tuple[float, float]] = None, ior_range: Optional[tuple[float, float]] = None, ): from embodichain.lab.sim.utility import is_rt_enabled # resolve environment ids if env_ids is None: env_ids = torch.arange(env.num_envs, device="cpu") else: env_ids = env_ids.cpu() if self.entity_cfg.uid == "default_plane": env_ids = [0] randomize_plan = {} if base_color_range: base_color = sample_uniform( lower=torch.tensor(base_color_range[0], dtype=torch.float32), upper=torch.tensor(base_color_range[1], dtype=torch.float32), size=(len(env_ids), 3), # RGB ) # append alpha channel alpha_channel = torch.ones((len(env_ids), 1), dtype=torch.float32) base_color = torch.cat((base_color, alpha_channel), dim=1) randomize_plan["base_color"] = base_color if metallic_range: metallic = sample_uniform( lower=torch.tensor(metallic_range[0], dtype=torch.float32), upper=torch.tensor(metallic_range[1], dtype=torch.float32), size=(len(env_ids), 1), ) randomize_plan["metallic"] = metallic if roughness_range: roughness = sample_uniform( lower=torch.tensor(roughness_range[0], dtype=torch.float32), upper=torch.tensor(roughness_range[1], dtype=torch.float32), size=(len(env_ids), 1), ) randomize_plan["roughness"] = roughness if ior_range and is_rt_enabled(): ior = sample_uniform( lower=torch.tensor(ior_range[0], dtype=torch.float32), upper=torch.tensor(ior_range[1], dtype=torch.float32), size=(len(env_ids), 1), ) randomize_plan["ior"] = ior # ground plane only has one instance. mat_insts = None if self.entity_cfg.uid == "default_plane": mat_inst = env.sim.get_visual_material("plane_mat").get_default_instance() self._randomize_mat_inst( mat_inst=mat_inst, plan=randomize_plan, random_texture_prob=random_texture_prob, idx=0, ) return elif isinstance(self.entity, RigidObject): mat_insts = self.entity.get_visual_material_inst(env_ids=env_ids) elif isinstance(self.entity, Articulation): mat_insts = self.entity.get_visual_material_inst( env_ids=env_ids, link_names=self.entity_cfg.link_names, ) for i, data in enumerate(mat_insts): if isinstance(self.entity, RigidObject): # For RigidObject, data is the material instance directly mat: VisualMaterialInst = data elif isinstance(self.entity, Articulation): # For Articulation, data is the key-value pair of link name and material instance mat: Dict[str, VisualMaterialInst] = data if isinstance(self.entity, RigidObject): self._randomize_mat_inst( mat_inst=mat, plan=randomize_plan, random_texture_prob=random_texture_prob, idx=i, ) else: for name, mat_inst in mat.items(): self._randomize_mat_inst( mat_inst=mat_inst, plan=randomize_plan, random_texture_prob=random_texture_prob, idx=i, )