######################################################################
-nb_states_codes = 4
+nb_states_codes = 5
nb_actions_codes = 5
nb_rewards_codes = 3
nb_lookahead_rewards_codes = 3
######################################################################
-def generate_episodes(nb, height=6, width=6, T=10, nb_walls=3):
+def generate_episodes(nb, height=6, width=6, T=10, nb_walls=3, nb_coins=3):
rnd = torch.rand(nb, height, width)
rnd[:, 0, :] = 0
rnd[:, -1, :] = 0
rnd[:, :, 0] = 0
rnd[:, :, -1] = 0
wall = 0
-
for k in range(nb_walls):
wall = wall + (
rnd.flatten(1).argmax(dim=1)[:, None]
rnd = rnd * (1 - wall.clamp(max=1))
+ rnd = torch.rand(nb, height, width)
+ coins = torch.zeros(nb, T, height, width, dtype=torch.int64)
+ rnd = rnd * (1 - wall.clamp(max=1))
+ for k in range(nb_coins):
+ coins[:, 0] = coins[:, 0] + (
+ rnd.flatten(1).argmax(dim=1)[:, None]
+ == torch.arange(rnd.flatten(1).size(1))[None, :]
+ ).long().reshape(rnd.size())
+
+ rnd = rnd * (1 - coins[:, 0].clamp(max=1))
+
states = wall[:, None, :, :].expand(-1, T, -1, -1).clone()
agent = torch.zeros(states.size(), dtype=torch.int64)
# assert hit.min() == 0 and hit.max() <= 1
- rewards[:, t + 1] = -hit + (1 - hit) * agent[:, t + 1, -1, -1]
+ got_coin = (agent[:, t + 1] * coins[:, t]).flatten(1).sum(dim=1)
+ coins[:, t + 1] = coins[:, t] * (1 - agent[:, t + 1])
+
+ rewards[:, t + 1] = -hit + (1 - hit) * got_coin
- states += 2 * agent + 3 * monster
+ states = states + 2 * agent + 3 * monster + 4 * coins
return states, agent_actions, rewards
def seq2str(seq):
def token2str(t):
if t >= first_states_code and t < first_states_code + nb_states_codes:
- return " #@$"[t - first_states_code]
+ return " #@T$"[t - first_states_code]
elif t >= first_actions_code and t < first_actions_code + nb_actions_codes:
return "ISNEW"[t - first_actions_code]
elif t >= first_rewards_code and t < first_rewards_code + nb_rewards_codes:
lookahead_rewards, states, actions, rewards, unicode=False, ansi_colors=False
):
if unicode:
- symbols = "·█@$"
+ symbols = "·█@T$"
# vert, hori, cross, thin_hori = "║", "═", "╬", "─"
vert, hori, cross, thin_vert, thin_hori = "┃", "━", "╋", "│", "─"
else:
- symbols = " #@$"
+ symbols = " #@T$"
vert, hori, cross, thin_vert, thin_hori = "|", "-", "+", "|", "-"
hline = (cross + hori * states.size(-1)) * states.size(1) + cross + "\n"
######################################################################
if __name__ == "__main__":
- nb, height, width, T, nb_walls = 5, 5, 7, 4, 5
+ nb, height, width, T, nb_walls = 5, 5, 7, 10, 5
states, actions, rewards = generate_episodes(nb, height, width, T, nb_walls)
seq = episodes2seq(states, actions, rewards)
lr, s, a, r = seq2episodes(seq, height, width)
print(episodes2str(lr, s, a, r, unicode=True, ansi_colors=True))
- print()
- for s in seq2str(seq):
- print(s)
+ # print()
+ # for s in seq2str(seq):
+ # print(s)
# Written by Francois Fleuret <francois@fleuret.org>
-import math, os, tqdm
+import math, os, tqdm, warnings
import torch, torchvision
result[:, it_len:] = -1
+ snapshots = []
+
def ar(result, ar_mask, logit_biases=None):
ar_mask = ar_mask.expand_as(result)
result *= 1 - ar_mask
device=self.device,
progress_bar_desc=None,
)
+ warnings.warn("keeping thinking snapshots", RuntimeWarning)
+ snapshots.append(result[:10].detach().clone())
# Generate iteration after iteration
optimistic_bias[escape.lookahead_reward2code(-1)] = -math.log(1e1)
optimistic_bias[escape.lookahead_reward2code(1)] = math.log(1e1)
- snapshots = []
-
for u in tqdm.tqdm(
range(it_len, result.size(1) - it_len + 1, it_len), desc="thinking"
):
+ lr, _, _, _ = escape.seq2episodes(result[:, :u], self.height, self.width)
+
# Generate the lookahead_reward and state
- ar_mask = (t >= u + index_lookahead_reward).long() * (
+ ar_mask = (t % it_len == index_lookahead_reward).long() * (
+ t <= u + index_lookahead_reward
+ ).long()
+ ar(result, ar_mask)
+
+ # Generate the lookahead_reward and state
+ ar_mask = (t >= u + index_states).long() * (
t < u + index_states + state_len
).long()
ar(result, ar_mask)
- snapshots.append(result[:10].detach().clone())
- backup_lookahead_reward = result[:, u + index_lookahead_reward]
# Re-generate the lookahead_reward
- ar_mask = (t == u + index_lookahead_reward).long()
+ ar_mask = (t % it_len == index_lookahead_reward).long() * (
+ t <= u + index_lookahead_reward
+ ).long()
ar(result, ar_mask, logit_biases=optimistic_bias)
- snapshots.append(result[:10].detach().clone())
# Generate the action and reward
ar_mask = (t >= u + index_action).long() * (t <= u + index_reward).long()
ar(result, ar_mask)
- snapshots.append(result[:10].detach().clone())
-
- result[:, u + index_lookahead_reward] = backup_lookahead_reward
filename = os.path.join(result_dir, f"test_thinking_compute_{n_epoch:04d}.txt")
with open(filename, "w") as f: