# ----------------------------------------------------------------------------
# Copyright (c) 2021-2025 DexForce Technology Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------
import os
import torch
import numpy as np
from typing import Optional, Union, Tuple, Any, List, TYPE_CHECKING
from itertools import product
from copy import deepcopy
from embodichain.utils import configclass, logger
from embodichain.lab.sim.solvers import SolverCfg, BaseSolver
from embodichain.lab.sim.utility.import_utils import (
lazy_import_pinocchio,
lazy_import_casadi,
# lazy_import_pinocchio_casadi,
)
from embodichain.lab.sim.utility.solver_utils import (
build_reduced_pinocchio_robot,
validate_iteration_params,
compute_pinocchio_fk,
)
if TYPE_CHECKING:
from typing import Self
[docs]
@configclass
class PinocchioSolverCfg(SolverCfg):
class_type: str = "PinocchioSolver"
mesh_path: str = None
# Solver iteration parameters
pos_eps: float = 5e-4 # Tolerance for convergence for position
rot_eps: float = 5e-4 # Tolerance for convergence for rotation
max_iterations: int = 1000 # Maximum number of iterations for the solver
dt: float = 0.1 # Time step for numerical integration
damp: float = 1e-6 # Damping factor to prevent numerical instability
# Constraint configuration
is_only_position_constraint: bool = (
False # Whether to only consider position constraints
)
# Sampling configuration
num_samples: int = (
30 # Number of samples to generate different joint seeds for IK iterations
)
[docs]
def init_solver(self, **kwargs) -> "PinocchioSolver":
"""Initialize the solver with the configuration.
Args:
**kwargs: Additional keyword arguments that may be used for solver initialization.
Returns:
PinocchioSolver: An initialized solver instance.
"""
solver = PinocchioSolver(cfg=self, **kwargs)
# Set the Tool Center Point (TCP) for the solver
solver.set_tcp(self._get_tcp_as_numpy())
return solver
[docs]
class PinocchioSolver(BaseSolver):
[docs]
def __init__(self, cfg: PinocchioSolverCfg, **kwargs):
super().__init__(cfg=cfg, **kwargs)
self.pin = lazy_import_pinocchio()
self.casadi = lazy_import_casadi()
# self.cpin = lazy_import_pinocchio_casadi()
# Set Tool Center Point (TCP)
self.tcp = cfg.tcp
# Set IK solver parameters
self.pos_eps = cfg.pos_eps
self.rot_eps = cfg.rot_eps
self.max_iterations = cfg.max_iterations
self.dt = cfg.dt
self.damp = cfg.damp
self.is_only_position_constraint = cfg.is_only_position_constraint
self.num_samples = cfg.num_samples
# Set mesh path if not provided
if cfg.mesh_path is None:
urdf_dir = os.path.dirname(cfg.urdf_path)
cfg.mesh_path = urdf_dir
# Load full robot model from URDF
self.entire_robot = self.pin.RobotWrapper.BuildFromURDF(
cfg.urdf_path, cfg.mesh_path, root_joint=None
)
# Get all joint names and degrees of freedom (excluding 'universe')
self.all_joint_names = self.entire_robot.model.names.tolist()[
1:
] # Exclude 'universe' joint
self.all_dof = (
self.entire_robot.model.njoints - 1
) # Degrees of freedom of robot joints
# Build reduced robot model (only relevant joints unlocked)
self.robot = build_reduced_pinocchio_robot(self.entire_robot, self.joint_names)
self.joint_names = self.robot.model.names.tolist()[
1:
] # Exclude 'universe' joint
self.dof = (
self.robot.model.njoints - 1
) # Degrees of freedom of reduced robot joints
self.upper_position_limits = self.robot.model.upperPositionLimit
self.lower_position_limits = self.robot.model.lowerPositionLimit
self.ik_nearest_weight = np.ones(self.dof)
# TODO: The Casadi-based solver is currently disabled due to stability issues.
# Note: Casadi-based optimization is currently prone to divergence and requires further debugging and optimization.
if __debug__ and False:
# Creating Casadi models and data for symbolic computing
self.cmodel = self.cpin.Model(self.robot.model)
self.cdata = self.cmodel.createData()
self.cq = self.casadi.SX.sym("q", self.robot.model.nq, 1)
self.cTf = self.casadi.SX.sym("Tf", 4, 4)
self.cpin.framesForwardKinematics(self.cmodel, self.cdata, self.cq)
self.ee_id = self.robot.model.getFrameId(self.end_link_name)
# Define error functions for position and orientation
self.translational_error = self.casadi.Function(
"translational_error",
[self.cq, self.cTf],
[self.cdata.oMf[self.ee_id].translation - self.cTf[:3, 3]],
)
self.rotational_error = self.casadi.Function(
"rotational_error",
[self.cq, self.cTf],
[
self.cpin.log3(
self.cdata.oMf[self.ee_id].rotation @ self.cTf[:3, :3].T
)
],
)
# Set up CasADi optimization problem
self.opti = self.casadi.Opti()
self.var_q = self.opti.variable(self.robot.model.nq)
self.var_q_last = self.opti.parameter(self.robot.model.nq)
self.param_tf = self.opti.parameter(4, 4)
self.translational_cost = self.casadi.sumsqr(
self.translational_error(self.var_q, self.param_tf)
)
self.rotation_cost = self.casadi.sumsqr(
self.rotational_error(self.var_q, self.param_tf)
)
self.regularization_cost = self.casadi.sumsqr(self.var_q)
self.smooth_cost = self.casadi.sumsqr(self.var_q - self.var_q_last)
# Add joint position constraints to ensure the solution stays within physical joint limits.
self.opti.subject_to(
self.opti.bounded(
self.robot.model.lowerPositionLimit,
self.var_q,
self.robot.model.upperPositionLimit,
)
)
# Define the objective function for IK optimization:
# - Prioritize end-effector position accuracy (high weight)
# - Include orientation accuracy
# - Add regularization to avoid extreme joint values
# - Encourage smoothness between consecutive solutions
self.opti.minimize(
100 * self.translational_cost
+ 50 * self.rotation_cost
+ 0.02 * self.regularization_cost
+ 0.1 * self.smooth_cost
)
# Set solver options for IPOPT
opts = {
"ipopt": {
"print_level": 0,
"max_iter": self.max_iterations,
"tol": self.pos_eps,
},
"print_time": False,
"calc_lam_p": True,
}
self.opti.solver("ipopt", opts)
# Initialize joint positions to zero
self.init_qpos = np.zeros(self.robot.model.nq)
# Perform forward kinematics with zero configuration
self.pin.forwardKinematics(self.robot.model, self.robot.data, self.init_qpos)
# Retrieve the pose of the specified root link
frame_index = self.robot.model.getFrameId(self.root_link_name)
root_base_pose = self.robot.model.frames[frame_index].placement
self.root_base_xpos = np.eye(4)
self.root_base_xpos[:3, :3] = root_base_pose.rotation
self.root_base_xpos[:3, 3] = root_base_pose.translation.T
[docs]
def set_tcp(self, tcp: np.ndarray):
self.tcp = tcp
[docs]
def get_iteration_params(self) -> dict:
r"""Returns the current iteration parameters.
Returns:
dict: A dictionary containing the current values of:
- pos_eps (float): Pos convergence threshold
- rot_eps (float): Rot convergence threshold
- max_iterations (int): Maximum number of iterations.
- dt (float): Time step size.
- damp (float): Damping factor.
- num_samples (int): Number of samples.
- is_only_position_constraint (bool): Flag to indicate whether the solver should only consider position constraints.
"""
return {
"pos_eps": self._pos_eps,
"rot_eps": self._rot_eps,
"max_iterations": self._max_iterations,
"dt": self._dt,
"damp": self._damp,
"num_samples": self._num_samples,
}
[docs]
def set_iteration_params(
self,
pos_eps: float = 5e-4,
rot_eps: float = 5e-4,
max_iterations: int = 1000,
dt: float = 0.1,
damp: float = 1e-6,
num_samples: int = 30,
is_only_position_constraint: bool = False,
) -> bool:
r"""Sets the iteration parameters for the kinematics solver.
Args:
pos_eps (float): Pos convergence threshold, must be positive.
rot_eps (float): Rot convergence threshold, must be positive.
max_iterations (int): Maximum number of iterations, must be positive.
dt (float): Time step size, must be positive.
damp (float): Damping factor, must be non-negative.
num_samples (int): Number of samples, must be positive.
is_only_position_constraint (bool): Flag to indicate whether the solver should only consider position constraints.
Returns:
bool: True if all parameters are valid and set, False otherwise.
"""
# Validate parameters
if not validate_iteration_params(
pos_eps, rot_eps, max_iterations, dt, damp, num_samples
):
return False
# Set parameters if all are valid
self.pos_eps = pos_eps
self.rot_eps = rot_eps
self.max_iterations = max_iterations
self.dt = dt
self.damp = damp
self.num_samples = num_samples
self.is_only_position_constraint = is_only_position_constraint
if False:
opts = {
"ipopt": {
"print_level": 0,
"max_iter": self.max_iterations,
"tol": self.pos_eps,
},
"print_time": False,
"calc_lam_p": False,
}
self.opti.solver("ipopt", opts)
return True
[docs]
def qpos_to_limits(
self,
q: np.ndarray,
joint_seed: np.ndarray,
):
"""Adjusts the joint positions (q) to be within specified limits and as close as possible to the joint seed,
while minimizing the total weighted difference.
Args:
q (np.ndarray): The original joint positions.
joint_seed (np.ndarray): The desired (seed) joint positions.
Returns:
np.ndarray: The adjusted joint positions within the specified limits.
"""
best_qpos_limit = np.copy(q)
best_total_q_diff = float("inf")
# Initialize a list for possible values for each joint
possible_arrays = []
if self.ik_nearest_weight is None:
self.ik_nearest_weight = np.ones_like(best_qpos_limit)
# Generate possible values for each joint
dof_num = len(q)
for i in range(dof_num):
current_possible_values = []
# Calculate how many 2π fits into the adjustment to the limits
lower_adjustment = (q[i] - self.lower_position_limits[i]) // (2 * np.pi)
upper_adjustment = (self.upper_position_limits[i] - q[i]) // (2 * np.pi)
# Consider the current value and its periodic adjustments
for offset in range(
int(lower_adjustment) - 1, int(upper_adjustment) + 2
): # Adjust by calculated limits
adjusted_value = q[i] + offset * (2 * np.pi)
# Check if the adjusted value is within limits
if (
self.lower_position_limits[i]
<= adjusted_value
<= self.upper_position_limits[i]
):
current_possible_values.append(adjusted_value)
# Also check the original value
if self.lower_position_limits[i] <= q[i] <= self.upper_position_limits[i]:
current_possible_values.append(q[i])
if not current_possible_values:
return [] # If no possible values for an active joint
possible_arrays.append(current_possible_values)
# Generate all possible combinations
all_possible_combinations = product(*possible_arrays)
# Check each combination and calculate the absolute difference sum
for combination in all_possible_combinations:
total_q_diff = np.sum(
np.abs(np.array(combination) - joint_seed) * self.ik_nearest_weight
)
# If a smaller difference sum is found, update the best solution
if total_q_diff < best_total_q_diff:
best_total_q_diff = total_q_diff
best_qpos_limit = np.array(combination)
return best_qpos_limit
[docs]
def get_ik(
self,
target_xpos: Optional[Union[torch.Tensor, np.ndarray]],
qpos_seed: np.ndarray = None,
qvel_seed: np.ndarray = None,
return_all_solutions: bool = False,
**kwargs,
) -> Tuple[bool, np.ndarray]:
"""Solve inverse kinematics (IK) for the robot to achieve the specified end-effector pose.
Args:
target_xpos (torch.Tensor or np.ndarray): Desired end-effector pose as a (4, 4) homogeneous transformation matrix.
qpos_seed (np.ndarray, optional): Initial joint positions used as the seed for optimization. If None, uses zero configuration.
qvel_seed (np.ndarray, optional): Initial joint velocities (not used in current implementation).
return_all_solutions (bool, optional): If True, return all valid IK solutions found; otherwise, return only the best solution. Default is False.
**kwargs: Additional keyword arguments for future extensions.
Returns:
Tuple[bool, np.ndarray]:
- success (bool or torch.BoolTensor): True if a valid solution is found, False otherwise.
- qpos (np.ndarray or torch.Tensor): Joint positions that achieve the target pose. If no solution, returns the seed joint positions.
"""
if qpos_seed is not None:
if isinstance(qpos_seed, torch.Tensor):
self.init_qpos = qpos_seed.detach().cpu().numpy()
else:
self.init_qpos = np.array(qpos_seed)
if isinstance(target_xpos, torch.Tensor):
target_xpos = target_xpos.detach().cpu().numpy()
if target_xpos.ndim == 3:
target_xpos = target_xpos[0]
target_xpos = self.root_base_xpos @ target_xpos
compute_xpos = target_xpos @ np.linalg.inv(self.tcp_xpos)
frame_index = self.robot.model.getFrameId(self.end_link_name)
joint_index = self.robot.model.frames[frame_index].parent
l2w = self.pin.SE3()
l2w.translation[:] = compute_xpos[:3, 3]
l2w.rotation[:] = compute_xpos[:3, :3]
l2j = self.robot.model.frames[frame_index].placement
oMdes = l2w * l2j.inverse()
# Deep copy joint seed to avoid modifying the original seed
q = deepcopy(self.init_qpos).astype(np.float64).flatten()
for i in range(self.max_iterations):
# Perform forward kinematics to compute the current pose
self.pin.forwardKinematics(self.robot.model, self.robot.data, q)
current_pose_se3 = self.robot.data.oMi[joint_index]
if self.is_only_position_constraint:
# Fix the rotation part of the pose
fixed_pose = np.eye(4)
fixed_pose[:3, :3] = compute_xpos[:3, :3] # Use target rotation
fixed_pose[:3, 3] = (
current_pose_se3.translation.T
) # Use current position
fixed_pose_SE3 = self.pin.SE3(fixed_pose)
current_pose_se3 = self.pin.SE3(fixed_pose_SE3)
iMd = current_pose_se3.actInv(oMdes) # Calculate the pose error
err = self.pin.log6(iMd).vector # Get the error vector
# Check position convergence
pos_converged = np.linalg.norm(err[:3]) < self.pos_eps
if self.is_only_position_constraint:
if pos_converged:
# Convergence achieved, apply joint limits
q = self.qpos_to_limits(q, self.init_qpos)
if 0 == len(q):
continue
return torch.tensor([True], dtype=torch.bool), torch.from_numpy(
q
).to(dtype=torch.float32)
else:
# Check rotation convergence
rot_converged = np.linalg.norm(err[3:]) < self.rot_eps
# Check for overall convergence
if pos_converged and rot_converged:
# Convergence achieved, apply joint limits
q = self.qpos_to_limits(q, self.init_qpos)
if 0 == len(q):
continue
return torch.tensor([True], dtype=torch.bool), torch.from_numpy(
q
).to(dtype=torch.float32)
# Compute the Jacobian
J = self.pin.computeJointJacobian(
self.robot.model, self.robot.data, q, joint_index
)
Jlog = self.pin.Jlog6(iMd.inverse())
J = -Jlog @ J
# Damped least squares
JJt = J @ J.T
JJt[np.diag_indices_from(JJt)] += self.damp
# Compute the velocity update
v = -(J.T @ np.linalg.solve(JJt, err))
# Update joint positions
new_q = self.pin.integrate(self.robot.model, q, v * self.dt)
q = new_q
# Return failure and the last computed joint positions
return torch.tensor([False], dtype=torch.bool), torch.from_numpy(
np.array(q)
).to(dtype=torch.float32)
# TODO: The Casadi-based solver is currently disabled due to stability issues.
# Note: Casadi-based optimization is currently prone to divergence and requires further debugging and optimization.
if __debug__ and False:
self.opti.set_initial(self.var_q, self.init_qpos)
self.opti.set_value(self.param_tf, compute_xpos)
try:
num_iter = 1 if self.max_iterations == 1 else self.max_iterations
for i in range(num_iter):
self.opti.set_value(self.var_q_last, self.init_qpos)
sol = self.opti.solve()
sol_q = self.opti.value(self.var_q)
# self.smooth_filter.add_data(sol_q)
# sol_q = self.smooth_filter.filtered_data
self.init_qpos = sol_q
# if qvel_seed is not None:
# v = qvel_seed * 0.0
# else:
# v = (sol_q - self.init_qpos) * 0.0
# sol_tauff = pin.rnea(
# self.robot.model,
# self.robot.data,
# sol_q,
# v,
# np.zeros(self.robot.model.nv),
# )
temp_xpos = self._get_fk(sol_q)
err = temp_xpos - target_xpos
pos_converged = np.linalg.norm(err[:3]) < self.pos_eps
print(f"Iter {i}: pos_err={np.linalg.norm(err[:3])}")
if self.is_only_position_constraint:
if pos_converged:
break
else:
rot_converged = np.linalg.norm(err[:3, :3]) < self.rot_eps
if pos_converged and rot_converged:
break
if return_all_solutions:
logger.log_warning(
"return_all_solutions=True is not supported in DifferentialSolver. Returning the best solution only."
)
return torch.tensor(True, dtype=torch.bool), torch.from_numpy(sol_q).to(
dtype=torch.float32
)
except Exception as e:
logger.log_warning(f"IK solver failed to converge. Debug info: {e}")
sol_q = self.opti.debug.value(self.var_q)
# self.smooth_filter.add_data(sol_q)
# sol_q = self.smooth_filter.filtered_data
self.init_qpos = sol_q
# if qvel_seed is not None:
# v = qvel_seed * 0.0
# else:
# v = (sol_q - self.init_qpos) * 0.0
# sol_tauff = pin.rnea(
# self.robot.model,
# self.robot.data,
# sol_q,
# v,
# np.zeros(self.robot.model.nv),
# )
logger.log_debug(
f"sol_q:{sol_q} \nmotorstate: \n{qpos_seed} \nwrist_pose: \n{target_xpos}"
)
if return_all_solutions:
logger.log_warning(
"return_all_solutions=True is not supported in DifferentialSolver. Returning the best solution only."
)
return torch.tensor(False, dtype=torch.bool), torch.from_numpy(
np.array(qpos_seed)
).to(dtype=torch.float32)
def _get_fk(
self,
qpos: Optional[Union[torch.Tensor, np.ndarray]],
**kwargs,
) -> np.ndarray:
"""Compute the forward kinematics for the robot given joint positions.
Args:
qpos (torch.Tensor or np.ndarray): Joint positions, shape should be (nq,).
**kwargs: Additional keyword arguments (not used).
Returns:
np.ndarray: The resulting end-effector pose as a (4, 4) homogeneous transformation matrix.
"""
return compute_pinocchio_fk(
self.pin, self.robot, qpos, self.end_link_name, self.tcp_xpos
)