######################################################################
-def generate_episodes(nb, height=6, width=6, T=10):
+def generate_episodes(nb, height=6, width=6, T=10, nb_walls=3):
rnd = torch.rand(nb, height, width)
rnd[:, 0, :] = 0
rnd[:, -1, :] = 0
rnd[:, :, -1] = 0
wall = 0
- for k in range(3):
+ for k in range(nb_walls):
wall = wall + (
rnd.flatten(1).argmax(dim=1)[:, None]
== torch.arange(rnd.flatten(1).size(1))[None, :]
).long().reshape(rnd.size())
+
rnd = rnd * (1 - wall.clamp(max=1))
states = wall[:, None, :, :].expand(-1, T, -1, -1).clone()
######################################################################
if __name__ == "__main__":
- nb, height, width, T = 25, 5, 7, 25
- states, actions, rewards = generate_episodes(nb, height, width, T)
+ nb, height, width, T, nb_walls = 25, 5, 7, 25, 5
+ states, actions, rewards = generate_episodes(nb, height, width, T, nb_walls)
seq = episodes2seq(states, actions, rewards, lookahead_delta=T)
s, a, r, lr = seq2episodes(seq, height, width, lookahead=True)
print(episodes2str(s, a, r, lookahead_rewards=lr, unicode=True, ansi_colors=True))
##############################
# escape options
-parser.add_argument("--escape_height", type=int, default=4)
+parser.add_argument("--escape_height", type=int, default=5)
-parser.add_argument("--escape_width", type=int, default=6)
+parser.add_argument("--escape_width", type=int, default=7)
parser.add_argument("--escape_T", type=int, default=25)
+parser.add_argument("--escape_nb_walls", type=int, default=5)
+
######################################################################
args = parser.parse_args()
height=args.escape_height,
width=args.escape_width,
T=args.escape_T,
+ nb_walls=args.escape_nb_walls,
logger=log_string,
device=device,
)
height,
width,
T,
+ nb_walls,
logger=None,
device=torch.device("cpu"),
):
self.width = width
states, actions, rewards = escape.generate_episodes(
- nb_train_samples + nb_test_samples, height, width, T
+ nb_train_samples + nb_test_samples, height, width, T, nb_walls
)
seq = escape.episodes2seq(states, actions, rewards, lookahead_delta=T)
# seq = seq[:, seq.size(1) // 3 : 2 * seq.size(1) // 3]
def thinking_autoregression(
self, n_epoch, model, result_dir, logger, deterministic_synthesis, nmax=1000
):
- result = self.test_input[:100].clone()
+ result = self.test_input[:250].clone()
t = torch.arange(result.size(1), device=result.device)[None, :]
state_len = self.height * self.width
for u in tqdm.tqdm(
range(it_len, result.size(1) - it_len + 1, it_len), desc="thinking"
):
- # Put the lookahead reward to -1 for the current iteration,
- # sample the next state
- s = -1
+ # Put the lookahead reward to either 0 or -1 for the
+ # current iteration, sample the next state
+ s = -1 # (torch.rand(result.size(0), device = result.device) < 0.2).long()
result[:, u - 1] = s + 1 + escape.first_lookahead_rewards_code
ar_mask = (t >= u).long() * (t < u + state_len).long()
ar(result, ar_mask)
for v in range(0, u, it_len):
# Extract the rewards
r = result[:, range(v + state_len + 1 + it_len, u + it_len - 1, it_len)]
- r = r - escape.first_lookahead_rewards_code - 1
+ r = r - escape.first_rewards_code - 1
a = r.min(dim=1).values
b = r.max(dim=1).values
s = (a < 0).long() * a + (a >= 0).long() * b