-def episodes2seq(states, actions, rewards, lookahead_delta=None):
- states = states.flatten(2) + first_state_code
- actions = actions[:, :, None] + first_actions_code
-
- if lookahead_delta is not None:
- # r = rewards
- # u = F.pad(r, (0, lookahead_delta - 1)).as_strided(
- # (r.size(0), r.size(1), lookahead_delta),
- # (r.size(1) + lookahead_delta - 1, 1, 1),
- # )
- # a = u[:, :, 1:].min(dim=-1).values
- # b = u[:, :, 1:].max(dim=-1).values
- # s = (a < 0).long() * a + (a >= 0).long() * b
- # lookahead_rewards = (1 + s[:, :, None]) + first_lookahead_rewards_code
-
- # a[n,t]=min_s>t r[n,s]
- a = rewards.new_zeros(rewards.size())
- b = rewards.new_zeros(rewards.size())
- for t in range(a.size(1) - 1):
- a[:, t] = rewards[:, t + 1 :].min(dim=-1).values
- b[:, t] = rewards[:, t + 1 :].max(dim=-1).values
- s = (a < 0).long() * a + (a >= 0).long() * b
- lookahead_rewards = (1 + s[:, :, None]) + first_lookahead_rewards_code
-
- r = rewards[:, :, None]
- rewards = (r + 1) + first_rewards_code
-
- assert (
- states.min() >= first_state_code
- and states.max() < first_state_code + nb_state_codes
- )
- assert (
- actions.min() >= first_actions_code
- and actions.max() < first_actions_code + nb_actions_codes
- )
- assert (
- rewards.min() >= first_rewards_code
- and rewards.max() < first_rewards_code + nb_rewards_codes
- )
-
- if lookahead_delta is None:
- return torch.cat([states, actions, rewards], dim=2).flatten(1)
- else:
- assert (
- lookahead_rewards.min() >= first_lookahead_rewards_code
- and lookahead_rewards.max()
- < first_lookahead_rewards_code + nb_lookahead_rewards_codes
- )
- return torch.cat([states, actions, rewards, lookahead_rewards], dim=2).flatten(
- 1
- )
-
-
-def seq2episodes(seq, height, width, lookahead=False):
- seq = seq.reshape(seq.size(0), -1, height * width + (3 if lookahead else 2))
- states = seq[:, :, : height * width] - first_state_code
+def episodes2seq(states, actions, rewards):
+ neg = rewards.new_zeros(rewards.size())
+ pos = rewards.new_zeros(rewards.size())
+ for t in range(neg.size(1) - 1):
+ neg[:, t] = rewards[:, t:].min(dim=-1).values
+ pos[:, t] = rewards[:, t:].max(dim=-1).values
+ s = (neg < 0).long() * neg + (neg >= 0).long() * pos
+
+ return torch.cat(
+ [
+ lookahead_reward2code(s[:, :, None]),
+ state2code(states.flatten(2)),
+ action2code(actions[:, :, None]),
+ reward2code(rewards[:, :, None]),
+ ],
+ dim=2,
+ ).flatten(1)
+
+
+def seq2episodes(seq, height, width):
+ seq = seq.reshape(seq.size(0), -1, height * width + 3)
+ lookahead_rewards = code2lookahead_reward(seq[:, :, 0])
+ states = code2state(seq[:, :, 1 : height * width + 1])