-def random_scene():
- scene = []
- colors = [
- ((Box.nb_rgb_levels - 1), 0, 0),
- (0, (Box.nb_rgb_levels - 1), 0),
- (0, 0, (Box.nb_rgb_levels - 1)),
- ((Box.nb_rgb_levels - 1), (Box.nb_rgb_levels - 1), 0),
- (
- (Box.nb_rgb_levels * 2) // 3,
- (Box.nb_rgb_levels * 2) // 3,
- (Box.nb_rgb_levels * 2) // 3,
- ),
- ]
-
- for k in range(10):
- wh = torch.rand(2) * 0.2 + 0.2
- xy = torch.rand(2) * (1 - wh)
- c = colors[torch.randint(len(colors), (1,))]
- b = Box(
- xy[0].item(), xy[1].item(), wh[0].item(), wh[1].item(), c[0], c[1], c[2]
- )
- if not b.collision(scene):
- scene.append(b)
-
- return scene
-
-
-def generate_episode(steps, size=64):
- delta = 0.1
- effects = [
- (False, 0, 0),
- (False, delta, 0),
- (False, 0, delta),
- (False, -delta, 0),
- (False, 0, -delta),
- (True, delta, 0),
- (True, 0, delta),
- (True, -delta, 0),
- (True, 0, -delta),
- ]
-
- while True:
- frames = []
-
- scene = random_scene()
- xh, yh = tuple(x.item() for x in torch.rand(2))
-
- actions = torch.randint(len(effects), (len(steps),))
- change = False
-
- for s, a in zip(steps, actions):
- if s:
- frames.append(scene2tensor(xh, yh, scene, size=size))
-
- g, dx, dy = effects[a]
- if g:
- for b in scene:
- if b.x <= xh and b.x + b.w >= xh and b.y <= yh and b.y + b.h >= yh:
- x, y = b.x, b.y
- b.x += dx
- b.y += dy
- if (
- b.x < 0
- or b.y < 0
- or b.x + b.w > 1
- or b.y + b.h > 1
- or b.collision(scene)
- ):
- b.x, b.y = x, y
- else:
- xh += dx
- yh += dy
- change = True
- else:
- x, y = xh, yh
- xh += dx
- yh += dy
- if xh < 0 or xh > 1 or yh < 0 or yh > 1:
- xh, yh = x, y
-
- if change:
- break
-
- return frames, actions
-
-
-######################################################################
-
-
-def generate_episodes(nb, steps):
- all_frames = []
- for n in tqdm.tqdm(range(nb), dynamic_ncols=True, desc="world-data"):
- frames, actions = generate_episode(steps)
- all_frames += frames
- return torch.cat(all_frames, 0).contiguous()
-
-
-def create_data_and_processors(nb_train_samples, nb_test_samples, nb_epochs=10):
- steps = [True] + [False] * 30 + [True]
- train_input = generate_episodes(nb_train_samples, steps)
- test_input = generate_episodes(nb_test_samples, steps)
-
- print(f"{train_input.size()=} {test_input.size()=}")
-
- encoder, quantizer, decoder = train_encoder(
- train_input, test_input, nb_epochs=nb_epochs
- )
- encoder.train(False)
- quantizer.train(False)
- decoder.train(False)
-
- z = encoder(train_input[:1])
- pow2 = (2 ** torch.arange(z.size(1), device=z.device))[None, None, :]
- z_h, z_w = z.size(2), z.size(3)
-
- def frame2seq(x):
- z = encoder(x)
- ze_bool = (quantizer(z) >= 0).long()
- seq = (
- ze_bool.permute(0, 2, 3, 1).reshape(ze_bool.size(0), -1, ze_bool.size(1))
- * pow2
- ).sum(-1)
- return seq
-
- def seq2frame(seq, T=1e-2):
- zd_bool = (seq[:, :, None] // pow2) % 2
- zd_bool = zd_bool.reshape(zd_bool.size(0), z_h, z_w, -1).permute(0, 3, 1, 2)
- logits = decoder(zd_bool * 2.0 - 1.0)
- logits = logits.reshape(
- logits.size(0), -1, 3, logits.size(2), logits.size(3)
- ).permute(0, 2, 3, 4, 1)
- results = torch.distributions.categorical.Categorical(
- logits=logits / T
- ).sample()
- return results
-
- return train_input, test_input, frame2seq, seq2frame