File size: 14,304 Bytes
c1f1d32 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 |
"""
Script to extract observations from low-dimensional simulation states in a robosuite dataset.
Args:
dataset (str): path to input hdf5 dataset
output_name (str): name of output hdf5 dataset
n (int): if provided, stop after n trajectories are processed
shaped (bool): if flag is set, use dense rewards
camera_names (str or [str]): camera name(s) to use for image observations.
Leave out to not use image observations.
camera_height (int): height of image observation.
camera_width (int): width of image observation
done_mode (int): how to write done signal. If 0, done is 1 whenever s' is a success state.
If 1, done is 1 at the end of each trajectory. If 2, both.
copy_rewards (bool): if provided, copy rewards from source file instead of inferring them
copy_dones (bool): if provided, copy dones from source file instead of inferring them
Example usage:
# extract low-dimensional observations
python dataset_states_to_obs.py --dataset /path/to/demo.hdf5 --output_name low_dim.hdf5 --done_mode 2
# extract 84x84 image observations
python dataset_states_to_obs.py --dataset /path/to/demo.hdf5 --output_name image.hdf5 \
--done_mode 2 --camera_names agentview robot0_eye_in_hand --camera_height 84 --camera_width 84
# (space saving option) extract 84x84 image observations with compression and without
# extracting next obs (not needed for pure imitation learning algos)
python dataset_states_to_obs.py --dataset /path/to/demo.hdf5 --output_name image.hdf5 \
--done_mode 2 --camera_names agentview robot0_eye_in_hand --camera_height 84 --camera_width 84 \
--compress --exclude-next-obs
# use dense rewards, and only annotate the end of trajectories with done signal
python dataset_states_to_obs.py --dataset /path/to/demo.hdf5 --output_name image_dense_done_1.hdf5 \
--done_mode 1 --dense --camera_names agentview robot0_eye_in_hand --camera_height 84 --camera_width 84
"""
import os
import json
import h5py
import argparse
import numpy as np
from copy import deepcopy
import robomimic.utils.tensor_utils as TensorUtils
import robomimic.utils.file_utils as FileUtils
import robomimic.utils.env_utils as EnvUtils
from robomimic.envs.env_base import EnvBase
import multiprocessing
multiprocessing.set_start_method('spawn', force=True)
def extract_trajectory(
env_meta,
args,
initial_state,
states,
actions,
):
"""
Helper function to extract observations, rewards, and dones along a trajectory using
the simulator environment.
Args:
env (instance of EnvBase): environment
initial_state (dict): initial simulation state to load
states (np.array): array of simulation states to load to extract information
actions (np.array): array of actions
done_mode (int): how to write done signal. If 0, done is 1 whenever s' is a
success state. If 1, done is 1 at the end of each trajectory.
If 2, do both.
"""
done_mode = args.done_mode
if env_meta['env_name'].startswith('PickPlace'):
camera_names=['birdview', 'agentview', 'robot0_eye_in_hand']
else:
camera_names=['birdview', 'agentview', 'sideview', 'robot0_eye_in_hand']
env = EnvUtils.create_env_for_data_processing(
env_meta=env_meta,
# camera_names=['frontview', 'birdview', 'agentview', 'sideview', 'agentview_full', 'robot0_robotview', 'robot0_eye_in_hand'],
camera_names=camera_names,
camera_height=args.camera_height,
camera_width=args.camera_width,
reward_shaping=args.shaped,
)
assert states.shape[0] == actions.shape[0]
# load the initial state
env.reset()
obs = env.reset_to(initial_state)
traj = dict(
obs=[],
next_obs=[],
rewards=[],
dones=[],
actions=np.array(actions),
states=np.array(states),
initial_state_dict=initial_state,
)
traj_len = states.shape[0]
# iteration variable @t is over "next obs" indices
for t in range(1, traj_len + 1):
# get next observation
if t == traj_len:
# play final action to get next observation for last timestep
next_obs, _, _, _ = env.step(actions[t - 1])
else:
# reset to simulator state to get observation
next_obs = env.reset_to({"states" : states[t]})
# infer reward signal
# note: our tasks use reward r(s'), reward AFTER transition, so this is
# the reward for the current timestep
r = env.get_reward()
# infer done signal
done = False
if (done_mode == 1) or (done_mode == 2):
# done = 1 at end of trajectory
done = done or (t == traj_len)
if (done_mode == 0) or (done_mode == 2):
# done = 1 when s' is task success state
done = done or env.is_success()["task"]
done = int(done)
# collect transition
traj["obs"].append(obs)
traj["next_obs"].append(next_obs)
traj["rewards"].append(r)
traj["dones"].append(done)
# update for next iter
obs = deepcopy(next_obs)
# convert list of dict to dict of list for obs dictionaries (for convenient writes to hdf5 dataset)
traj["obs"] = TensorUtils.list_of_flat_dict_to_dict_of_list(traj["obs"])
traj["next_obs"] = TensorUtils.list_of_flat_dict_to_dict_of_list(traj["next_obs"])
# list to numpy array
for k in traj:
if k == "initial_state_dict":
continue
if isinstance(traj[k], dict):
for kp in traj[k]:
traj[k][kp] = np.array(traj[k][kp])
else:
traj[k] = np.array(traj[k])
return traj
def worker(x):
env_meta, args, initial_state, states, actions = x
traj = extract_trajectory(
env_meta=env_meta,
args=args,
initial_state=initial_state,
states=states,
actions=actions,
)
return traj
def dataset_states_to_obs(args):
num_workers = args.num_workers
# create environment to use for data processing
env_meta = FileUtils.get_env_metadata_from_dataset(dataset_path=args.input)
if env_meta['env_name'].startswith('PickPlace'):
camera_names=['birdview', 'agentview', 'robot0_eye_in_hand']
else:
camera_names=['birdview', 'agentview', 'sideview', 'robot0_eye_in_hand']
env = EnvUtils.create_env_for_data_processing(
env_meta=env_meta,
# camera_names=['frontview', 'birdview', 'agentview', 'sideview', 'agentview_full', 'robot0_robotview', 'robot0_eye_in_hand'],
camera_names=camera_names,
camera_height=args.camera_height,
camera_width=args.camera_width,
reward_shaping=args.shaped,
)
print("==== Using environment with the following metadata ====")
print(json.dumps(env.serialize(), indent=4))
print("")
# some operations for playback are robosuite-specific, so determine if this environment is a robosuite env
is_robosuite_env = EnvUtils.is_robosuite_env(env_meta)
# list of all demonstration episodes (sorted in increasing number order)
f = h5py.File(args.input, "r")
demos = list(f["data"].keys())
inds = np.argsort([int(elem[5:]) for elem in demos])
demos = [demos[i] for i in inds]
# maybe reduce the number of demonstrations to playback
if args.n is not None:
demos = demos[:args.n]
# output file in same directory as input file
output_path = args.output
f_out = h5py.File(output_path, "w")
data_grp = f_out.create_group("data")
print("input file: {}".format(args.input))
print("output file: {}".format(output_path))
total_samples = 0
for i in range(0, len(demos), num_workers):
end = min(i + num_workers, len(demos))
initial_state_list = []
states_list = []
actions_list = []
for j in range(i, end):
ep = demos[j]
# prepare initial state to reload from
states = f["data/{}/states".format(ep)][()]
initial_state = dict(states=states[0])
if is_robosuite_env:
initial_state["model"] = f["data/{}".format(ep)].attrs["model_file"]
actions = f["data/{}/actions".format(ep)][()]
initial_state_list.append(initial_state)
states_list.append(states)
actions_list.append(actions)
with multiprocessing.Pool(num_workers) as pool:
trajs = pool.map(worker, [[env_meta, args, initial_state_list[j], states_list[j], actions_list[j]] for j in range(len(initial_state_list))])
for j, ind in enumerate(range(i, end)):
ep = demos[ind]
traj = trajs[j]
# maybe copy reward or done signal from source file
if args.copy_rewards:
traj["rewards"] = f["data/{}/rewards".format(ep)][()]
if args.copy_dones:
traj["dones"] = f["data/{}/dones".format(ep)][()]
# store transitions
# IMPORTANT: keep name of group the same as source file, to make sure that filter keys are
# consistent as well
ep_data_grp = data_grp.create_group(ep)
ep_data_grp.create_dataset("actions", data=np.array(traj["actions"]))
ep_data_grp.create_dataset("states", data=np.array(traj["states"]))
ep_data_grp.create_dataset("rewards", data=np.array(traj["rewards"]))
ep_data_grp.create_dataset("dones", data=np.array(traj["dones"]))
for k in traj["obs"]:
if args.compress:
ep_data_grp.create_dataset("obs/{}".format(k), data=np.array(traj["obs"][k]), compression="gzip")
else:
ep_data_grp.create_dataset("obs/{}".format(k), data=np.array(traj["obs"][k]))
if not args.exclude_next_obs:
if args.compress:
ep_data_grp.create_dataset("next_obs/{}".format(k), data=np.array(traj["next_obs"][k]), compression="gzip")
else:
ep_data_grp.create_dataset("next_obs/{}".format(k), data=np.array(traj["next_obs"][k]))
# episode metadata
if is_robosuite_env:
ep_data_grp.attrs["model_file"] = traj["initial_state_dict"]["model"] # model xml for this episode
ep_data_grp.attrs["num_samples"] = traj["actions"].shape[0] # number of transitions in this episode
total_samples += traj["actions"].shape[0]
print("ep {}: wrote {} transitions to group {}".format(ind, ep_data_grp.attrs["num_samples"], ep))
del trajs
# copy over all filter keys that exist in the original hdf5
if "mask" in f:
f.copy("mask", f_out)
# global metadata
data_grp.attrs["total"] = total_samples
data_grp.attrs["env_args"] = json.dumps(env.serialize(), indent=4) # environment info
print("Wrote {} trajectories to {}".format(len(demos), output_path))
f.close()
f_out.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",
type=str,
required=True,
help="path to input hdf5 dataset",
)
# name of hdf5 to write - it will be in the same directory as @dataset
parser.add_argument(
"--output",
type=str,
required=True,
help="name of output hdf5 dataset",
)
# specify number of demos to process - useful for debugging conversion with a handful
# of trajectories
parser.add_argument(
"--n",
type=int,
default=None,
help="(optional) stop after n trajectories are processed",
)
parser.add_argument(
"--num_workers",
type=int,
default=2,
)
# flag for reward shaping
parser.add_argument(
"--shaped",
action='store_true',
help="(optional) use shaped rewards",
)
# camera names to use for observations
parser.add_argument(
"--camera_names",
type=str,
nargs='+',
default=[],
help="(optional) camera name(s) to use for image observations. Leave out to not use image observations.",
)
parser.add_argument(
"--camera_height",
type=int,
default=84,
help="(optional) height of image observations",
)
parser.add_argument(
"--camera_width",
type=int,
default=84,
help="(optional) width of image observations",
)
# specifies how the "done" signal is written. If "0", then the "done" signal is 1 wherever
# the transition (s, a, s') has s' in a task completion state. If "1", the "done" signal
# is one at the end of every trajectory. If "2", the "done" signal is 1 at task completion
# states for successful trajectories and 1 at the end of all trajectories.
parser.add_argument(
"--done_mode",
type=int,
default=2,
help="how to write done signal. If 0, done is 1 whenever s' is a success state.\
If 1, done is 1 at the end of each trajectory. If 2, both.",
)
# flag for copying rewards from source file instead of re-writing them
parser.add_argument(
"--copy_rewards",
action='store_true',
help="(optional) copy rewards from source file instead of inferring them",
)
# flag for copying dones from source file instead of re-writing them
parser.add_argument(
"--copy_dones",
action='store_true',
help="(optional) copy dones from source file instead of inferring them",
)
# flag to exclude next obs in dataset
parser.add_argument(
"--exclude-next-obs",
type=bool,
default=True,
help="(optional) exclude next obs in dataset",
)
# flag to compress observations with gzip option in hdf5
parser.add_argument(
"--compress",
type=bool,
default=True,
help="(optional) compress observations with gzip option in hdf5",
)
args = parser.parse_args()
dataset_states_to_obs(args)
|