|
from equi_diffpo.model.common.normalizer import SingleFieldLinearNormalizer |
|
from equi_diffpo.common.pytorch_util import dict_apply, dict_apply_reduce, dict_apply_split |
|
import numpy as np |
|
|
|
|
|
def get_range_normalizer_from_stat(stat, output_max=1, output_min=-1, range_eps=1e-7): |
|
|
|
input_max = stat['max'] |
|
input_min = stat['min'] |
|
input_range = input_max - input_min |
|
ignore_dim = input_range < range_eps |
|
input_range[ignore_dim] = output_max - output_min |
|
scale = (output_max - output_min) / input_range |
|
offset = output_min - scale * input_min |
|
offset[ignore_dim] = (output_max + output_min) / 2 - input_min[ignore_dim] |
|
|
|
return SingleFieldLinearNormalizer.create_manual( |
|
scale=scale, |
|
offset=offset, |
|
input_stats_dict=stat |
|
) |
|
|
|
def get_range_symmetric_normalizer_from_stat(stat, output_max=1, output_min=-1, range_eps=1e-7): |
|
|
|
input_max = stat['max'] |
|
input_min = stat['min'] |
|
abs_max = np.max([np.abs(stat['max'][:2]), np.abs(stat['min'][:2])]) |
|
input_max[:2] = abs_max |
|
input_min[:2] = -abs_max |
|
input_range = input_max - input_min |
|
ignore_dim = input_range < range_eps |
|
input_range[ignore_dim] = output_max - output_min |
|
scale = (output_max - output_min) / input_range |
|
offset = output_min - scale * input_min |
|
offset[ignore_dim] = (output_max + output_min) / 2 - input_min[ignore_dim] |
|
|
|
return SingleFieldLinearNormalizer.create_manual( |
|
scale=scale, |
|
offset=offset, |
|
input_stats_dict=stat |
|
) |
|
|
|
def get_voxel_identity_normalizer(): |
|
scale = np.array([1], dtype=np.float32) |
|
offset = np.array([0], dtype=np.float32) |
|
stat = { |
|
'min': np.array([0], dtype=np.float32), |
|
'max': np.array([1], dtype=np.float32), |
|
'mean': np.array([0.5], dtype=np.float32), |
|
'std': np.array([np.sqrt(1/12)], dtype=np.float32) |
|
} |
|
return SingleFieldLinearNormalizer.create_manual( |
|
scale=scale, |
|
offset=offset, |
|
input_stats_dict=stat |
|
) |
|
|
|
def get_image_range_normalizer(): |
|
scale = np.array([2], dtype=np.float32) |
|
offset = np.array([-1], dtype=np.float32) |
|
stat = { |
|
'min': np.array([0], dtype=np.float32), |
|
'max': np.array([1], dtype=np.float32), |
|
'mean': np.array([0.5], dtype=np.float32), |
|
'std': np.array([np.sqrt(1/12)], dtype=np.float32) |
|
} |
|
return SingleFieldLinearNormalizer.create_manual( |
|
scale=scale, |
|
offset=offset, |
|
input_stats_dict=stat |
|
) |
|
|
|
def get_identity_normalizer_from_stat(stat): |
|
scale = np.ones_like(stat['min']) |
|
offset = np.zeros_like(stat['min']) |
|
return SingleFieldLinearNormalizer.create_manual( |
|
scale=scale, |
|
offset=offset, |
|
input_stats_dict=stat |
|
) |
|
|
|
def robomimic_abs_action_normalizer_from_stat(stat, rotation_transformer): |
|
result = dict_apply_split( |
|
stat, lambda x: { |
|
'pos': x[...,:3], |
|
'rot': x[...,3:6], |
|
'gripper': x[...,6:] |
|
}) |
|
|
|
def get_pos_param_info(stat, output_max=1, output_min=-1, range_eps=1e-7): |
|
|
|
input_max = stat['max'] |
|
input_min = stat['min'] |
|
input_range = input_max - input_min |
|
ignore_dim = input_range < range_eps |
|
input_range[ignore_dim] = output_max - output_min |
|
scale = (output_max - output_min) / input_range |
|
offset = output_min - scale * input_min |
|
offset[ignore_dim] = (output_max + output_min) / 2 - input_min[ignore_dim] |
|
|
|
return {'scale': scale, 'offset': offset}, stat |
|
|
|
def get_rot_param_info(stat): |
|
example = rotation_transformer.forward(stat['mean']) |
|
scale = np.ones_like(example) |
|
offset = np.zeros_like(example) |
|
info = { |
|
'max': np.ones_like(example), |
|
'min': np.full_like(example, -1), |
|
'mean': np.zeros_like(example), |
|
'std': np.ones_like(example) |
|
} |
|
return {'scale': scale, 'offset': offset}, info |
|
|
|
def get_gripper_param_info(stat): |
|
example = stat['max'] |
|
scale = np.ones_like(example) |
|
offset = np.zeros_like(example) |
|
info = { |
|
'max': np.ones_like(example), |
|
'min': np.full_like(example, -1), |
|
'mean': np.zeros_like(example), |
|
'std': np.ones_like(example) |
|
} |
|
return {'scale': scale, 'offset': offset}, info |
|
|
|
pos_param, pos_info = get_pos_param_info(result['pos']) |
|
rot_param, rot_info = get_rot_param_info(result['rot']) |
|
gripper_param, gripper_info = get_gripper_param_info(result['gripper']) |
|
|
|
param = dict_apply_reduce( |
|
[pos_param, rot_param, gripper_param], |
|
lambda x: np.concatenate(x,axis=-1)) |
|
info = dict_apply_reduce( |
|
[pos_info, rot_info, gripper_info], |
|
lambda x: np.concatenate(x,axis=-1)) |
|
|
|
return SingleFieldLinearNormalizer.create_manual( |
|
scale=param['scale'], |
|
offset=param['offset'], |
|
input_stats_dict=info |
|
) |
|
|
|
|
|
def robomimic_abs_action_only_normalizer_from_stat(stat): |
|
result = dict_apply_split( |
|
stat, lambda x: { |
|
'pos': x[...,:3], |
|
'other': x[...,3:] |
|
}) |
|
|
|
def get_pos_param_info(stat, output_max=1, output_min=-1, range_eps=1e-7): |
|
|
|
input_max = stat['max'] |
|
input_min = stat['min'] |
|
input_range = input_max - input_min |
|
ignore_dim = input_range < range_eps |
|
input_range[ignore_dim] = output_max - output_min |
|
scale = (output_max - output_min) / input_range |
|
offset = output_min - scale * input_min |
|
offset[ignore_dim] = (output_max + output_min) / 2 - input_min[ignore_dim] |
|
|
|
return {'scale': scale, 'offset': offset}, stat |
|
|
|
|
|
def get_other_param_info(stat): |
|
example = stat['max'] |
|
scale = np.ones_like(example) |
|
offset = np.zeros_like(example) |
|
info = { |
|
'max': np.ones_like(example), |
|
'min': np.full_like(example, -1), |
|
'mean': np.zeros_like(example), |
|
'std': np.ones_like(example) |
|
} |
|
return {'scale': scale, 'offset': offset}, info |
|
|
|
pos_param, pos_info = get_pos_param_info(result['pos']) |
|
other_param, other_info = get_other_param_info(result['other']) |
|
|
|
param = dict_apply_reduce( |
|
[pos_param, other_param], |
|
lambda x: np.concatenate(x,axis=-1)) |
|
info = dict_apply_reduce( |
|
[pos_info, other_info], |
|
lambda x: np.concatenate(x,axis=-1)) |
|
|
|
return SingleFieldLinearNormalizer.create_manual( |
|
scale=param['scale'], |
|
offset=param['offset'], |
|
input_stats_dict=info |
|
) |
|
|
|
|
|
def robomimic_abs_action_only_symmetric_normalizer_from_stat(stat): |
|
result = dict_apply_split( |
|
stat, lambda x: { |
|
'pos': x[...,:3], |
|
'other': x[...,3:] |
|
}) |
|
|
|
def get_pos_param_info(stat, output_max=1, output_min=-1, range_eps=1e-7): |
|
|
|
input_max = stat['max'] |
|
input_min = stat['min'] |
|
abs_max = np.max([np.abs(stat['max'][:2]), np.abs(stat['min'][:2])]) |
|
input_max[:2] = abs_max |
|
input_min[:2] = -abs_max |
|
input_range = input_max - input_min |
|
ignore_dim = input_range < range_eps |
|
input_range[ignore_dim] = output_max - output_min |
|
scale = (output_max - output_min) / input_range |
|
offset = output_min - scale * input_min |
|
offset[ignore_dim] = (output_max + output_min) / 2 - input_min[ignore_dim] |
|
|
|
return {'scale': scale, 'offset': offset}, stat |
|
|
|
|
|
def get_other_param_info(stat): |
|
example = stat['max'] |
|
scale = np.ones_like(example) |
|
offset = np.zeros_like(example) |
|
info = { |
|
'max': np.ones_like(example), |
|
'min': np.full_like(example, -1), |
|
'mean': np.zeros_like(example), |
|
'std': np.ones_like(example) |
|
} |
|
return {'scale': scale, 'offset': offset}, info |
|
|
|
pos_param, pos_info = get_pos_param_info(result['pos']) |
|
other_param, other_info = get_other_param_info(result['other']) |
|
|
|
param = dict_apply_reduce( |
|
[pos_param, other_param], |
|
lambda x: np.concatenate(x,axis=-1)) |
|
info = dict_apply_reduce( |
|
[pos_info, other_info], |
|
lambda x: np.concatenate(x,axis=-1)) |
|
|
|
return SingleFieldLinearNormalizer.create_manual( |
|
scale=param['scale'], |
|
offset=param['offset'], |
|
input_stats_dict=info |
|
) |
|
|
|
|
|
def robomimic_abs_action_only_dual_arm_normalizer_from_stat(stat): |
|
Da = stat['max'].shape[-1] |
|
Dah = Da // 2 |
|
result = dict_apply_split( |
|
stat, lambda x: { |
|
'pos0': x[...,:3], |
|
'other0': x[...,3:Dah], |
|
'pos1': x[...,Dah:Dah+3], |
|
'other1': x[...,Dah+3:] |
|
}) |
|
|
|
def get_pos_param_info(stat, output_max=1, output_min=-1, range_eps=1e-7): |
|
|
|
input_max = stat['max'] |
|
input_min = stat['min'] |
|
input_range = input_max - input_min |
|
ignore_dim = input_range < range_eps |
|
input_range[ignore_dim] = output_max - output_min |
|
scale = (output_max - output_min) / input_range |
|
offset = output_min - scale * input_min |
|
offset[ignore_dim] = (output_max + output_min) / 2 - input_min[ignore_dim] |
|
|
|
return {'scale': scale, 'offset': offset}, stat |
|
|
|
|
|
def get_other_param_info(stat): |
|
example = stat['max'] |
|
scale = np.ones_like(example) |
|
offset = np.zeros_like(example) |
|
info = { |
|
'max': np.ones_like(example), |
|
'min': np.full_like(example, -1), |
|
'mean': np.zeros_like(example), |
|
'std': np.ones_like(example) |
|
} |
|
return {'scale': scale, 'offset': offset}, info |
|
|
|
pos0_param, pos0_info = get_pos_param_info(result['pos0']) |
|
pos1_param, pos1_info = get_pos_param_info(result['pos1']) |
|
other0_param, other0_info = get_other_param_info(result['other0']) |
|
other1_param, other1_info = get_other_param_info(result['other1']) |
|
|
|
param = dict_apply_reduce( |
|
[pos0_param, other0_param, pos1_param, other1_param], |
|
lambda x: np.concatenate(x,axis=-1)) |
|
info = dict_apply_reduce( |
|
[pos0_info, other0_info, pos1_info, other1_info], |
|
lambda x: np.concatenate(x,axis=-1)) |
|
|
|
return SingleFieldLinearNormalizer.create_manual( |
|
scale=param['scale'], |
|
offset=param['offset'], |
|
input_stats_dict=info |
|
) |
|
|
|
|
|
def array_to_stats(arr: np.ndarray): |
|
stat = { |
|
'min': np.min(arr, axis=0), |
|
'max': np.max(arr, axis=0), |
|
'mean': np.mean(arr, axis=0), |
|
'std': np.std(arr, axis=0) |
|
} |
|
return stat |
|
|