|
""" |
|
Script to extract observations from low-dimensional simulation states in a robosuite dataset. |
|
|
|
Args: |
|
dataset (str): path to input hdf5 dataset |
|
|
|
output_name (str): name of output hdf5 dataset |
|
|
|
n (int): if provided, stop after n trajectories are processed |
|
|
|
shaped (bool): if flag is set, use dense rewards |
|
|
|
camera_names (str or [str]): camera name(s) to use for image observations. |
|
Leave out to not use image observations. |
|
|
|
camera_height (int): height of image observation. |
|
|
|
camera_width (int): width of image observation |
|
|
|
done_mode (int): how to write done signal. If 0, done is 1 whenever s' is a success state. |
|
If 1, done is 1 at the end of each trajectory. If 2, both. |
|
|
|
copy_rewards (bool): if provided, copy rewards from source file instead of inferring them |
|
|
|
copy_dones (bool): if provided, copy dones from source file instead of inferring them |
|
|
|
Example usage: |
|
|
|
# extract low-dimensional observations |
|
python dataset_states_to_obs.py --dataset /path/to/demo.hdf5 --output_name low_dim.hdf5 --done_mode 2 |
|
|
|
# extract 84x84 image observations |
|
python dataset_states_to_obs.py --dataset /path/to/demo.hdf5 --output_name image.hdf5 \ |
|
--done_mode 2 --camera_names agentview robot0_eye_in_hand --camera_height 84 --camera_width 84 |
|
|
|
# (space saving option) extract 84x84 image observations with compression and without |
|
# extracting next obs (not needed for pure imitation learning algos) |
|
python dataset_states_to_obs.py --dataset /path/to/demo.hdf5 --output_name image.hdf5 \ |
|
--done_mode 2 --camera_names agentview robot0_eye_in_hand --camera_height 84 --camera_width 84 \ |
|
--compress --exclude-next-obs |
|
|
|
# use dense rewards, and only annotate the end of trajectories with done signal |
|
python dataset_states_to_obs.py --dataset /path/to/demo.hdf5 --output_name image_dense_done_1.hdf5 \ |
|
--done_mode 1 --dense --camera_names agentview robot0_eye_in_hand --camera_height 84 --camera_width 84 |
|
""" |
|
import os |
|
import json |
|
import h5py |
|
import argparse |
|
import numpy as np |
|
from copy import deepcopy |
|
|
|
import robomimic.utils.tensor_utils as TensorUtils |
|
import robomimic.utils.file_utils as FileUtils |
|
import robomimic.utils.env_utils as EnvUtils |
|
from robomimic.envs.env_base import EnvBase |
|
import multiprocessing |
|
multiprocessing.set_start_method('spawn', force=True) |
|
|
|
|
|
def extract_trajectory( |
|
env_meta, |
|
args, |
|
initial_state, |
|
states, |
|
actions, |
|
): |
|
""" |
|
Helper function to extract observations, rewards, and dones along a trajectory using |
|
the simulator environment. |
|
|
|
Args: |
|
env (instance of EnvBase): environment |
|
initial_state (dict): initial simulation state to load |
|
states (np.array): array of simulation states to load to extract information |
|
actions (np.array): array of actions |
|
done_mode (int): how to write done signal. If 0, done is 1 whenever s' is a |
|
success state. If 1, done is 1 at the end of each trajectory. |
|
If 2, do both. |
|
""" |
|
done_mode = args.done_mode |
|
if env_meta['env_name'].startswith('PickPlace'): |
|
camera_names=['birdview', 'agentview', 'robot0_eye_in_hand'] |
|
else: |
|
camera_names=['birdview', 'agentview', 'sideview', 'robot0_eye_in_hand'] |
|
env = EnvUtils.create_env_for_data_processing( |
|
env_meta=env_meta, |
|
|
|
camera_names=camera_names, |
|
camera_height=args.camera_height, |
|
camera_width=args.camera_width, |
|
reward_shaping=args.shaped, |
|
) |
|
assert states.shape[0] == actions.shape[0] |
|
|
|
|
|
env.reset() |
|
obs = env.reset_to(initial_state) |
|
|
|
traj = dict( |
|
obs=[], |
|
next_obs=[], |
|
rewards=[], |
|
dones=[], |
|
actions=np.array(actions), |
|
states=np.array(states), |
|
initial_state_dict=initial_state, |
|
) |
|
traj_len = states.shape[0] |
|
|
|
for t in range(1, traj_len + 1): |
|
|
|
|
|
if t == traj_len: |
|
|
|
next_obs, _, _, _ = env.step(actions[t - 1]) |
|
else: |
|
|
|
next_obs = env.reset_to({"states" : states[t]}) |
|
|
|
|
|
|
|
|
|
r = env.get_reward() |
|
|
|
|
|
done = False |
|
if (done_mode == 1) or (done_mode == 2): |
|
|
|
done = done or (t == traj_len) |
|
if (done_mode == 0) or (done_mode == 2): |
|
|
|
done = done or env.is_success()["task"] |
|
done = int(done) |
|
|
|
|
|
traj["obs"].append(obs) |
|
traj["next_obs"].append(next_obs) |
|
traj["rewards"].append(r) |
|
traj["dones"].append(done) |
|
|
|
|
|
obs = deepcopy(next_obs) |
|
|
|
|
|
traj["obs"] = TensorUtils.list_of_flat_dict_to_dict_of_list(traj["obs"]) |
|
traj["next_obs"] = TensorUtils.list_of_flat_dict_to_dict_of_list(traj["next_obs"]) |
|
|
|
|
|
for k in traj: |
|
if k == "initial_state_dict": |
|
continue |
|
if isinstance(traj[k], dict): |
|
for kp in traj[k]: |
|
traj[k][kp] = np.array(traj[k][kp]) |
|
else: |
|
traj[k] = np.array(traj[k]) |
|
|
|
return traj |
|
|
|
def worker(x): |
|
env_meta, args, initial_state, states, actions = x |
|
traj = extract_trajectory( |
|
env_meta=env_meta, |
|
args=args, |
|
initial_state=initial_state, |
|
states=states, |
|
actions=actions, |
|
) |
|
return traj |
|
|
|
def dataset_states_to_obs(args): |
|
num_workers = args.num_workers |
|
|
|
env_meta = FileUtils.get_env_metadata_from_dataset(dataset_path=args.input) |
|
if env_meta['env_name'].startswith('PickPlace'): |
|
camera_names=['birdview', 'agentview', 'robot0_eye_in_hand'] |
|
else: |
|
camera_names=['birdview', 'agentview', 'sideview', 'robot0_eye_in_hand'] |
|
env = EnvUtils.create_env_for_data_processing( |
|
env_meta=env_meta, |
|
|
|
camera_names=camera_names, |
|
camera_height=args.camera_height, |
|
camera_width=args.camera_width, |
|
reward_shaping=args.shaped, |
|
) |
|
|
|
print("==== Using environment with the following metadata ====") |
|
print(json.dumps(env.serialize(), indent=4)) |
|
print("") |
|
|
|
|
|
is_robosuite_env = EnvUtils.is_robosuite_env(env_meta) |
|
|
|
|
|
f = h5py.File(args.input, "r") |
|
demos = list(f["data"].keys()) |
|
inds = np.argsort([int(elem[5:]) for elem in demos]) |
|
demos = [demos[i] for i in inds] |
|
|
|
|
|
if args.n is not None: |
|
demos = demos[:args.n] |
|
|
|
|
|
output_path = args.output |
|
f_out = h5py.File(output_path, "w") |
|
data_grp = f_out.create_group("data") |
|
print("input file: {}".format(args.input)) |
|
print("output file: {}".format(output_path)) |
|
|
|
total_samples = 0 |
|
for i in range(0, len(demos), num_workers): |
|
end = min(i + num_workers, len(demos)) |
|
initial_state_list = [] |
|
states_list = [] |
|
actions_list = [] |
|
for j in range(i, end): |
|
ep = demos[j] |
|
|
|
states = f["data/{}/states".format(ep)][()] |
|
initial_state = dict(states=states[0]) |
|
if is_robosuite_env: |
|
initial_state["model"] = f["data/{}".format(ep)].attrs["model_file"] |
|
actions = f["data/{}/actions".format(ep)][()] |
|
|
|
initial_state_list.append(initial_state) |
|
states_list.append(states) |
|
actions_list.append(actions) |
|
|
|
with multiprocessing.Pool(num_workers) as pool: |
|
trajs = pool.map(worker, [[env_meta, args, initial_state_list[j], states_list[j], actions_list[j]] for j in range(len(initial_state_list))]) |
|
|
|
for j, ind in enumerate(range(i, end)): |
|
ep = demos[ind] |
|
traj = trajs[j] |
|
|
|
if args.copy_rewards: |
|
traj["rewards"] = f["data/{}/rewards".format(ep)][()] |
|
if args.copy_dones: |
|
traj["dones"] = f["data/{}/dones".format(ep)][()] |
|
|
|
|
|
|
|
|
|
|
|
ep_data_grp = data_grp.create_group(ep) |
|
ep_data_grp.create_dataset("actions", data=np.array(traj["actions"])) |
|
ep_data_grp.create_dataset("states", data=np.array(traj["states"])) |
|
ep_data_grp.create_dataset("rewards", data=np.array(traj["rewards"])) |
|
ep_data_grp.create_dataset("dones", data=np.array(traj["dones"])) |
|
for k in traj["obs"]: |
|
if args.compress: |
|
ep_data_grp.create_dataset("obs/{}".format(k), data=np.array(traj["obs"][k]), compression="gzip") |
|
else: |
|
ep_data_grp.create_dataset("obs/{}".format(k), data=np.array(traj["obs"][k])) |
|
if not args.exclude_next_obs: |
|
if args.compress: |
|
ep_data_grp.create_dataset("next_obs/{}".format(k), data=np.array(traj["next_obs"][k]), compression="gzip") |
|
else: |
|
ep_data_grp.create_dataset("next_obs/{}".format(k), data=np.array(traj["next_obs"][k])) |
|
|
|
|
|
if is_robosuite_env: |
|
ep_data_grp.attrs["model_file"] = traj["initial_state_dict"]["model"] |
|
ep_data_grp.attrs["num_samples"] = traj["actions"].shape[0] |
|
total_samples += traj["actions"].shape[0] |
|
print("ep {}: wrote {} transitions to group {}".format(ind, ep_data_grp.attrs["num_samples"], ep)) |
|
|
|
del trajs |
|
|
|
|
|
if "mask" in f: |
|
f.copy("mask", f_out) |
|
|
|
|
|
data_grp.attrs["total"] = total_samples |
|
data_grp.attrs["env_args"] = json.dumps(env.serialize(), indent=4) |
|
print("Wrote {} trajectories to {}".format(len(demos), output_path)) |
|
|
|
f.close() |
|
f_out.close() |
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument( |
|
"--input", |
|
type=str, |
|
required=True, |
|
help="path to input hdf5 dataset", |
|
) |
|
|
|
parser.add_argument( |
|
"--output", |
|
type=str, |
|
required=True, |
|
help="name of output hdf5 dataset", |
|
) |
|
|
|
|
|
|
|
parser.add_argument( |
|
"--n", |
|
type=int, |
|
default=None, |
|
help="(optional) stop after n trajectories are processed", |
|
) |
|
|
|
parser.add_argument( |
|
"--num_workers", |
|
type=int, |
|
default=2, |
|
) |
|
|
|
|
|
parser.add_argument( |
|
"--shaped", |
|
action='store_true', |
|
help="(optional) use shaped rewards", |
|
) |
|
|
|
|
|
parser.add_argument( |
|
"--camera_names", |
|
type=str, |
|
nargs='+', |
|
default=[], |
|
help="(optional) camera name(s) to use for image observations. Leave out to not use image observations.", |
|
) |
|
|
|
parser.add_argument( |
|
"--camera_height", |
|
type=int, |
|
default=84, |
|
help="(optional) height of image observations", |
|
) |
|
|
|
parser.add_argument( |
|
"--camera_width", |
|
type=int, |
|
default=84, |
|
help="(optional) width of image observations", |
|
) |
|
|
|
|
|
|
|
|
|
|
|
parser.add_argument( |
|
"--done_mode", |
|
type=int, |
|
default=2, |
|
help="how to write done signal. If 0, done is 1 whenever s' is a success state.\ |
|
If 1, done is 1 at the end of each trajectory. If 2, both.", |
|
) |
|
|
|
|
|
parser.add_argument( |
|
"--copy_rewards", |
|
action='store_true', |
|
help="(optional) copy rewards from source file instead of inferring them", |
|
) |
|
|
|
|
|
parser.add_argument( |
|
"--copy_dones", |
|
action='store_true', |
|
help="(optional) copy dones from source file instead of inferring them", |
|
) |
|
|
|
|
|
parser.add_argument( |
|
"--exclude-next-obs", |
|
type=bool, |
|
default=True, |
|
help="(optional) exclude next obs in dataset", |
|
) |
|
|
|
|
|
parser.add_argument( |
|
"--compress", |
|
type=bool, |
|
default=True, |
|
help="(optional) compress observations with gzip option in hdf5", |
|
) |
|
|
|
args = parser.parse_args() |
|
dataset_states_to_obs(args) |
|
|