|
import numpy as np |
|
import copy |
|
|
|
import h5py |
|
import robomimic.utils.obs_utils as ObsUtils |
|
import robomimic.utils.file_utils as FileUtils |
|
import robomimic.utils.env_utils as EnvUtils |
|
import robomimic.utils.tensor_utils as TensorUtils |
|
from scipy.spatial.transform import Rotation |
|
|
|
from robomimic.config import config_factory |
|
|
|
|
|
class RobomimicAbsoluteActionConverter: |
|
def __init__(self, dataset_path, algo_name='bc'): |
|
|
|
config = config_factory(algo_name=algo_name) |
|
|
|
|
|
|
|
ObsUtils.initialize_obs_utils_with_config(config) |
|
|
|
env_meta = FileUtils.get_env_metadata_from_dataset(dataset_path) |
|
abs_env_meta = copy.deepcopy(env_meta) |
|
abs_env_meta['env_kwargs']['controller_configs']['control_delta'] = False |
|
|
|
env = EnvUtils.create_env_from_metadata( |
|
env_meta=env_meta, |
|
render=False, |
|
render_offscreen=False, |
|
use_image_obs=False, |
|
) |
|
assert len(env.env.robots) in (1, 2) |
|
|
|
abs_env = EnvUtils.create_env_from_metadata( |
|
env_meta=abs_env_meta, |
|
render=False, |
|
render_offscreen=False, |
|
use_image_obs=False, |
|
) |
|
assert not abs_env.env.robots[0].controller.use_delta |
|
|
|
self.env = env |
|
self.abs_env = abs_env |
|
self.file = h5py.File(dataset_path, 'r') |
|
|
|
def __len__(self): |
|
return len(self.file['data']) |
|
|
|
def convert_actions(self, |
|
states: np.ndarray, |
|
actions: np.ndarray) -> np.ndarray: |
|
""" |
|
Given state and delta action sequence |
|
generate equivalent goal position and orientation for each step |
|
keep the original gripper action intact. |
|
""" |
|
|
|
|
|
|
|
stacked_actions = actions.reshape(*actions.shape[:-1],-1,7) |
|
|
|
env = self.env |
|
|
|
action_goal_pos = np.zeros( |
|
stacked_actions.shape[:-1]+(3,), |
|
dtype=stacked_actions.dtype) |
|
action_goal_ori = np.zeros( |
|
stacked_actions.shape[:-1]+(3,), |
|
dtype=stacked_actions.dtype) |
|
action_gripper = stacked_actions[...,[-1]] |
|
for i in range(len(states)): |
|
_ = env.reset_to({'states': states[i]}) |
|
|
|
|
|
for idx, robot in enumerate(env.env.robots): |
|
|
|
robot.control(stacked_actions[i,idx], policy_step=True) |
|
|
|
|
|
controller = robot.controller |
|
action_goal_pos[i,idx] = controller.goal_pos |
|
action_goal_ori[i,idx] = Rotation.from_matrix( |
|
controller.goal_ori).as_rotvec() |
|
|
|
stacked_abs_actions = np.concatenate([ |
|
action_goal_pos, |
|
action_goal_ori, |
|
action_gripper |
|
], axis=-1) |
|
abs_actions = stacked_abs_actions.reshape(actions.shape) |
|
return abs_actions |
|
|
|
def convert_idx(self, idx): |
|
file = self.file |
|
demo = file[f'data/demo_{idx}'] |
|
|
|
states = demo['states'][:] |
|
actions = demo['actions'][:] |
|
|
|
|
|
abs_actions = self.convert_actions(states, actions) |
|
return abs_actions |
|
|
|
def convert_and_eval_idx(self, idx): |
|
env = self.env |
|
abs_env = self.abs_env |
|
file = self.file |
|
|
|
eval_skip_steps = 1 |
|
|
|
demo = file[f'data/demo_{idx}'] |
|
|
|
states = demo['states'][:] |
|
actions = demo['actions'][:] |
|
|
|
|
|
abs_actions = self.convert_actions(states, actions) |
|
|
|
|
|
robot0_eef_pos = demo['obs']['robot0_eef_pos'][:] |
|
robot0_eef_quat = demo['obs']['robot0_eef_quat'][:] |
|
|
|
delta_error_info = self.evaluate_rollout_error( |
|
env, states, actions, robot0_eef_pos, robot0_eef_quat, |
|
metric_skip_steps=eval_skip_steps) |
|
abs_error_info = self.evaluate_rollout_error( |
|
abs_env, states, abs_actions, robot0_eef_pos, robot0_eef_quat, |
|
metric_skip_steps=eval_skip_steps) |
|
|
|
info = { |
|
'delta_max_error': delta_error_info, |
|
'abs_max_error': abs_error_info |
|
} |
|
return abs_actions, info |
|
|
|
@staticmethod |
|
def evaluate_rollout_error(env, |
|
states, actions, |
|
robot0_eef_pos, |
|
robot0_eef_quat, |
|
metric_skip_steps=1): |
|
|
|
|
|
|
|
rollout_next_states = list() |
|
rollout_next_eef_pos = list() |
|
rollout_next_eef_quat = list() |
|
obs = env.reset_to({'states': states[0]}) |
|
for i in range(len(states)): |
|
obs = env.reset_to({'states': states[i]}) |
|
obs, reward, done, info = env.step(actions[i]) |
|
obs = env.get_observation() |
|
rollout_next_states.append(env.get_state()['states']) |
|
rollout_next_eef_pos.append(obs['robot0_eef_pos']) |
|
rollout_next_eef_quat.append(obs['robot0_eef_quat']) |
|
rollout_next_states = np.array(rollout_next_states) |
|
rollout_next_eef_pos = np.array(rollout_next_eef_pos) |
|
rollout_next_eef_quat = np.array(rollout_next_eef_quat) |
|
|
|
next_state_diff = states[1:] - rollout_next_states[:-1] |
|
max_next_state_diff = np.max(np.abs(next_state_diff[metric_skip_steps:])) |
|
|
|
next_eef_pos_diff = robot0_eef_pos[1:] - rollout_next_eef_pos[:-1] |
|
next_eef_pos_dist = np.linalg.norm(next_eef_pos_diff, axis=-1) |
|
max_next_eef_pos_dist = next_eef_pos_dist[metric_skip_steps:].max() |
|
|
|
next_eef_rot_diff = Rotation.from_quat(robot0_eef_quat[1:]) \ |
|
* Rotation.from_quat(rollout_next_eef_quat[:-1]).inv() |
|
next_eef_rot_dist = next_eef_rot_diff.magnitude() |
|
max_next_eef_rot_dist = next_eef_rot_dist[metric_skip_steps:].max() |
|
|
|
info = { |
|
'state': max_next_state_diff, |
|
'pos': max_next_eef_pos_dist, |
|
'rot': max_next_eef_rot_dist |
|
} |
|
return info |
|
|
|
class RobomimicObsConverter: |
|
def __init__(self, dataset_path, algo_name='bc'): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
env_meta = FileUtils.get_env_metadata_from_dataset(dataset_path) |
|
|
|
|
|
env = EnvUtils.create_env_for_data_processing( |
|
env_meta=env_meta, |
|
|
|
camera_names=['birdview', 'agentview', 'sideview', 'robot0_eye_in_hand'], |
|
camera_height=84, |
|
camera_width=84, |
|
reward_shaping=False, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.env = env |
|
self.file = h5py.File(dataset_path, 'r') |
|
|
|
def __len__(self): |
|
return len(self.file['data']) |
|
|
|
def convert_obs(self, initial_state, states): |
|
obss = [] |
|
self.env.reset() |
|
obs = self.env.reset_to(initial_state) |
|
obss.append(obs) |
|
for i in range(1, len(states)): |
|
obs = self.env.reset_to({'states': states[i]}) |
|
obss.append(obs) |
|
return TensorUtils.list_of_flat_dict_to_dict_of_list(obss) |
|
|
|
def convert_idx(self, idx): |
|
file = self.file |
|
demo = file[f'data/demo_{idx}'] |
|
|
|
|
|
states = demo['states'][:] |
|
initial_state = dict(states=states[0]) |
|
initial_state["model"] = demo.attrs["model_file"] |
|
|
|
|
|
obss = self.convert_obs(initial_state, states) |
|
del obss['birdview_image'] |
|
del obss['birdview_depth'] |
|
del obss['agentview_depth'] |
|
del obss['sideview_image'] |
|
del obss['sideview_depth'] |
|
del obss['robot0_eye_in_hand_depth'] |
|
return obss |
|
|