progress_bar_desc="autoregression",
device=torch.device("cpu"),
):
+ assert input.size() == ar_mask.size()
+
batches = zip(input.split(batch_size), ar_mask.split(batch_size))
if progress_bar_desc is not None:
batches,
dynamic_ncols=True,
desc=progress_bar_desc,
- total=input.size(0) // batch_size,
+ # total=input.size(0) // batch_size,
)
- for input, ar_mask in batches:
- model.masked_inplace_autoregression(
- input, ar_mask, forbidden_tokens, deterministic_synthesis
- )
+ with torch.autograd.no_grad():
+ t = model.training
+ model.eval()
+
+ for input, ar_mask in batches:
+ model.masked_inplace_autoregression(
+ input, ar_mask, forbidden_tokens, deterministic_synthesis
+ )
+
+ model.train(t)
+
+
+######################################################################
class Task:
pass
+######################################################################
+
+
+class Problem:
+ def generate(nb):
+ pass
+
+ def perf(seq, logger):
+ pass
+
+
+class ProblemByheart(Problem):
+ def __init__(self):
+ nb_seq, len_prompt, len_result = 100, 5, 5
+ self.seq = torch.randint(10, (nb_seq, len_prompt + 1 + len_result))
+ self.seq[:, len_prompt] = -1
+
+ def generate_sequences(self, nb):
+ return self.seq[torch.randint(self.seq.size(0), (nb,))]
+
+
+class SandBox(Task):
+ def __init__(
+ self,
+ nb_train_samples,
+ nb_test_samples,
+ batch_size,
+ logger=None,
+ device=torch.device("cpu"),
+ ):
+ super().__init__()
+
+ self.batch_size = batch_size
+
+ problems = [ProblemByheart()]
+ nb_common_codes = 100
+
+ def generate_sequences(nb_samples):
+ problem_indexes = torch.randint(len(problems), (nb_samples,))
+ nb_samples_per_problem = torch.one_hot(problem_indexes).sum(0)
+ print(f"{nb_samples_per_problem}")
+ all_seq = []
+ for nb, p in zip(nb_samples_per_problem, problems):
+ all_seq.append(p.generate_sequences(nb_samples_per_problem[nb]))
+ return all_seq
+
+ train_seq = generate_sequences(nb_train_samples)
+ test_seq = generate_sequences(nb_test_samples)
+
+ for strain, stest in zip(train_seq, test_seq):
+ s = torch.cat((strain, stest), 0)
+
+ self.nb_codes = max(self.train_input.max(), self.test_input.max()) + 1
+
+ def batches(self, split="train", nb_to_use=-1, desc=None):
+ assert split in {"train", "test"}
+ input = self.train_input if split == "train" else self.test_input
+ if nb_to_use > 0:
+ input = input[:nb_to_use]
+ if desc is None:
+ desc = f"epoch-{split}"
+ for batch in tqdm.tqdm(
+ input.split(self.batch_size), dynamic_ncols=True, desc=desc
+ ):
+ yield batch
+
+ def vocabulary_size(self):
+ return self.nb_codes
+
+ def produce_results(
+ self, n_epoch, model, result_dir, logger, deterministic_synthesis
+ ):
+ # logger(
+ # f"accuracy_test {n_epoch} nb_total {test_nb_total} nb_correct {test_nb_correct} accuracy {(100.0*test_nb_correct)/test_nb_total:.02f}%"
+ # )
+ pass
+
+
######################################################################
import picoclvr
pruner_train=None,
pruner_eval=None,
):
+ super().__init__()
+
def generate_descr(nb, cache_suffix, pruner):
return picoclvr.generate(
nb,
def __init__(
self, nb_train_samples, nb_test_samples, batch_size, device=torch.device("cpu")
):
+ super().__init__()
+
self.nb_train_samples = (nb_train_samples,)
self.nb_test_samples = (nb_test_samples,)
self.batch_size = batch_size
nb_walls,
device=torch.device("cpu"),
):
+ super().__init__()
+
self.batch_size = batch_size
self.height = height
self.width = width
def produce_results(
self, n_epoch, model, result_dir, logger, deterministic_synthesis
):
- with torch.autograd.no_grad():
- t = model.training
- model.eval()
-
- train_nb_total, train_nb_correct, count = self.compute_error(
- model,
- "train",
- nb_to_use=1000,
- deterministic_synthesis=deterministic_synthesis,
- )
- logger(
- f"accuracy_train {n_epoch} nb_total {train_nb_total} nb_correct {train_nb_correct} accuracy {(100.0*train_nb_correct)/train_nb_total:.02f}%"
- )
-
- test_nb_total, test_nb_correct, count = self.compute_error(
- model,
- "test",
- nb_to_use=1000,
- deterministic_synthesis=deterministic_synthesis,
- )
- logger(
- f"accuracy_test {n_epoch} nb_total {test_nb_total} nb_correct {test_nb_correct} accuracy {(100.0*test_nb_correct)/test_nb_total:.02f}%"
- )
+ train_nb_total, train_nb_correct, count = self.compute_error(
+ model,
+ "train",
+ nb_to_use=1000,
+ deterministic_synthesis=deterministic_synthesis,
+ )
+ logger(
+ f"accuracy_train {n_epoch} nb_total {train_nb_total} nb_correct {train_nb_correct} accuracy {(100.0*train_nb_correct)/train_nb_total:.02f}%"
+ )
- if count is not None:
- proportion_optimal = count.diagonal().sum().float() / count.sum()
- logger(f"proportion_optimal_test {proportion_optimal*100:.02f}%")
- with open(
- os.path.join(result_dir, f"maze_result_{n_epoch:04d}.txt"), "w"
- ) as f:
- for i in range(count.size(0)):
- for j in range(count.size(1)):
- eol = " " if j < count.size(1) - 1 else "\n"
- f.write(f"{count[i,j]}{eol}")
-
- input = self.test_input[:48]
- result = input.clone()
- ar_mask = result.new_zeros(result.size())
- ar_mask[:, self.height * self.width :] = 1
- result *= 1 - ar_mask
- masked_inplace_autoregression(
- model,
- self.batch_size,
- result,
- ar_mask,
- deterministic_synthesis,
- device=self.device,
- )
+ test_nb_total, test_nb_correct, count = self.compute_error(
+ model,
+ "test",
+ nb_to_use=1000,
+ deterministic_synthesis=deterministic_synthesis,
+ )
+ logger(
+ f"accuracy_test {n_epoch} nb_total {test_nb_total} nb_correct {test_nb_correct} accuracy {(100.0*test_nb_correct)/test_nb_total:.02f}%"
+ )
- mazes, paths = self.seq2map(input)
- _, predicted_paths = self.seq2map(result)
-
- filename = os.path.join(result_dir, f"maze_result_{n_epoch:04d}.png")
- maze.save_image(
- filename,
- mazes=mazes,
- target_paths=paths,
- predicted_paths=predicted_paths,
- path_correct=maze.path_correctness(mazes, predicted_paths),
- path_optimal=maze.path_optimality(paths, predicted_paths),
- )
- logger(f"wrote {filename}")
+ if count is not None:
+ proportion_optimal = count.diagonal().sum().float() / count.sum()
+ logger(f"proportion_optimal_test {proportion_optimal*100:.02f}%")
+ with open(
+ os.path.join(result_dir, f"maze_result_{n_epoch:04d}.txt"), "w"
+ ) as f:
+ for i in range(count.size(0)):
+ for j in range(count.size(1)):
+ eol = " " if j < count.size(1) - 1 else "\n"
+ f.write(f"{count[i,j]}{eol}")
+
+ input = self.test_input[:48]
+ result = input.clone()
+ ar_mask = result.new_zeros(result.size())
+ ar_mask[:, self.height * self.width :] = 1
+ result *= 1 - ar_mask
+ masked_inplace_autoregression(
+ model,
+ self.batch_size,
+ result,
+ ar_mask,
+ deterministic_synthesis,
+ device=self.device,
+ )
- model.train(t)
+ mazes, paths = self.seq2map(input)
+ _, predicted_paths = self.seq2map(result)
+
+ filename = os.path.join(result_dir, f"maze_result_{n_epoch:04d}.png")
+ maze.save_image(
+ filename,
+ mazes=mazes,
+ target_paths=paths,
+ predicted_paths=predicted_paths,
+ path_correct=maze.path_correctness(mazes, predicted_paths),
+ path_optimal=maze.path_optimality(paths, predicted_paths),
+ )
+ logger(f"wrote {filename}")
######################################################################
prompt_length,
device=torch.device("cpu"),
):
+ super().__init__()
+
self.batch_size = batch_size
self.height = height
self.width = width
def produce_results(
self, n_epoch, model, result_dir, logger, deterministic_synthesis
):
- with torch.autograd.no_grad():
- t = model.training
- model.eval()
-
- def compute_nb_correct(input, prior_visits):
- result = input.clone()
- i = torch.arange(result.size(1), device=result.device)[None, :]
- ar_mask = (
- torch.logical_and(i >= self.prompt_length * 2, i % 2 == 0)
- .long()
- .expand_as(result)
- )
- result *= 1 - ar_mask
-
- # snake.solver(result,ar_mask)
-
- masked_inplace_autoregression(
- model,
- self.batch_size,
- result,
- ar_mask,
- deterministic_synthesis,
- device=self.device,
- )
-
- nb_total = ((prior_visits > 0) * ar_mask).sum()
-
- nb_correct = (
- (result == input).long() * (prior_visits > 0) * ar_mask
- ).sum()
+ def compute_nb_correct(input, prior_visits):
+ result = input.clone()
+ i = torch.arange(result.size(1), device=result.device)[None, :]
+ ar_mask = (
+ torch.logical_and(i >= self.prompt_length * 2, i % 2 == 0)
+ .long()
+ .expand_as(result)
+ )
+ result *= 1 - ar_mask
- # nb_total = result.size(0)
- # nb_correct = ((result - input).abs().sum(1) == 0).sum()
+ masked_inplace_autoregression(
+ model,
+ self.batch_size,
+ result,
+ ar_mask,
+ deterministic_synthesis,
+ device=self.device,
+ )
- return nb_total, nb_correct
+ nb_total = ((prior_visits > 0) * ar_mask).sum()
- # train_nb_total, train_nb_correct = compute_nb_correct(
- # self.train_input, self.train_prior_visits
- # )
+ nb_correct = ((result == input).long() * (prior_visits > 0) * ar_mask).sum()
- # logger(
- # f"accuracy_train nb_total {train_nb_total} nb_correct {train_nb_correct} accuracy {(100.0*train_nb_correct)/train_nb_total:.02f}%"
- # )
+ return nb_total, nb_correct
- test_nb_total, test_nb_correct = compute_nb_correct(
- self.test_input[:1000], self.test_prior_visits[:1000]
- )
-
- logger(
- f"accuracy_test {n_epoch} nb_total {test_nb_total} nb_correct {test_nb_correct} accuracy {(100.0*test_nb_correct)/test_nb_total:.02f}%"
- )
+ test_nb_total, test_nb_correct = compute_nb_correct(
+ self.test_input[:1000], self.test_prior_visits[:1000]
+ )
- model.train(t)
+ logger(
+ f"accuracy_test {n_epoch} nb_total {test_nb_total} nb_correct {test_nb_correct} accuracy {(100.0*test_nb_correct)/test_nb_total:.02f}%"
+ )
######################################################################
fraction_values_for_train=None,
device=torch.device("cpu"),
):
+ super().__init__()
+
self.batch_size = batch_size
self.nb_steps = nb_steps
self.nb_stacks = nb_stacks
def produce_results(
self, n_epoch, model, result_dir, logger, deterministic_synthesis
):
- with torch.autograd.no_grad():
- t = model.training
- model.eval()
-
- def compute_nb_correct(input):
- result = input.clone()
- stack.remove_popped_values(result, self.nb_stacks, self.nb_digits)
- ar_mask = (result != input).long()
- masked_inplace_autoregression(
- model,
- self.batch_size,
- result,
- ar_mask,
- deterministic_synthesis,
- device=self.device,
- )
-
- errors = ((result != input).long() * ar_mask).reshape(
- -1, 1 + self.nb_digits
- )
- ar_mask = ar_mask.reshape(-1, 1 + self.nb_digits)
-
- nb_total = ar_mask.max(1).values.sum()
- nb_correct = nb_total - errors.max(1).values.sum()
-
- return nb_total, nb_correct
-
- test_nb_total, test_nb_correct = compute_nb_correct(self.test_input[:1000])
-
- logger(
- f"accuracy_test {n_epoch} nb_total {test_nb_total} nb_correct {test_nb_correct} accuracy {(100.0*test_nb_correct)/test_nb_total:.02f}%"
- )
-
- ##############################################################
- # Log a few generated sequences
- input = self.test_input[:10, : 12 * (1 + self.nb_digits)]
+ def compute_nb_correct(input):
result = input.clone()
stack.remove_popped_values(result, self.nb_stacks, self.nb_digits)
ar_mask = (result != input).long()
-
- # for n in range(result.size(0)):
- # logger(
- # f"test_before {stack.seq_to_str(result[n],nb_stacks=self.nb_stacks,nb_digits=self.nb_digits)}"
- # )
-
masked_inplace_autoregression(
model,
self.batch_size,
device=self.device,
)
- for n in range(result.size(0)):
- logger(
- f"test_after {stack.seq_to_str(result[n],nb_stacks=self.nb_stacks,nb_digits=self.nb_digits)}"
- )
- ##############################################################
+ errors = ((result != input).long() * ar_mask).reshape(
+ -1, 1 + self.nb_digits
+ )
+ ar_mask = ar_mask.reshape(-1, 1 + self.nb_digits)
+
+ nb_total = ar_mask.max(1).values.sum()
+ nb_correct = nb_total - errors.max(1).values.sum()
+
+ return nb_total, nb_correct
+
+ test_nb_total, test_nb_correct = compute_nb_correct(self.test_input[:1000])
+
+ logger(
+ f"accuracy_test {n_epoch} nb_total {test_nb_total} nb_correct {test_nb_correct} accuracy {(100.0*test_nb_correct)/test_nb_total:.02f}%"
+ )
+
+ ##############################################################
+ # Log a few generated sequences
+ input = self.test_input[:10, : 12 * (1 + self.nb_digits)]
+ result = input.clone()
+ stack.remove_popped_values(result, self.nb_stacks, self.nb_digits)
+ ar_mask = (result != input).long()
+
+ # for n in range(result.size(0)):
+ # logger(
+ # f"test_before {stack.seq_to_str(result[n],nb_stacks=self.nb_stacks,nb_digits=self.nb_digits)}"
+ # )
+
+ masked_inplace_autoregression(
+ model,
+ self.batch_size,
+ result,
+ ar_mask,
+ deterministic_synthesis,
+ device=self.device,
+ )
- model.train(t)
+ for n in range(result.size(0)):
+ logger(
+ f"test_after {stack.seq_to_str(result[n],nb_stacks=self.nb_stacks,nb_digits=self.nb_digits)}"
+ )
+ ##############################################################
######################################################################
nb_test_samples,
nb_variables,
sequence_length,
+ operand_max,
+ result_max,
batch_size,
device=torch.device("cpu"),
):
+ super().__init__()
+
self.batch_size = batch_size
self.device = device
nb_train_samples,
nb_variables=nb_variables,
length=sequence_length,
+ operand_max=operand_max,
+ result_max=result_max,
)
test_sequences = expr.generate_sequences(
nb_test_samples,
nb_variables=nb_variables,
length=sequence_length,
+ operand_max=operand_max,
+ result_max=result_max,
)
symbols = list(set("#" + "".join(train_sequences + test_sequences)))
deterministic_synthesis,
input_file=None,
):
- with torch.autograd.no_grad():
- t = model.training
- model.eval()
-
- def compute_nb_correct(input):
- result = input.clone()
- s = (result == self.space).long()
- ar_mask = (s.cumsum(dim=1) - s).clamp(min=0, max=1)
- result = (1 - ar_mask) * result + ar_mask * self.filler
- masked_inplace_autoregression(
- model,
- self.batch_size,
- result,
- ar_mask,
- deterministic_synthesis,
- device=self.device,
- )
+ def compute_nb_correct(input):
+ result = input.clone()
+ s = (result == self.space).long()
+ ar_mask = (s.cumsum(dim=1) - s).clamp(min=0, max=1)
+ result = (1 - ar_mask) * result + ar_mask * self.filler
+ masked_inplace_autoregression(
+ model,
+ self.batch_size,
+ result,
+ ar_mask,
+ deterministic_synthesis,
+ device=self.device,
+ )
+
+ nb_total = input.size(0)
+ nb_correct = (input == result).long().min(1).values.sum()
- nb_total = input.size(0)
- nb_correct = (input == result).long().min(1).values.sum()
+ #######################################################################
+ # Comput predicted vs. true variable values
- #######################################################################
- # Comput predicted vs. true variable values
+ nb_delta = torch.zeros(5, dtype=torch.int64)
+ nb_missed = 0
- nb_delta = torch.zeros(5, dtype=torch.int64)
- nb_missed = 0
+ values_input = expr.extract_results([self.seq2str(s) for s in input])
+ values_result = expr.extract_results([self.seq2str(s) for s in result])
- values_input = expr.extract_results([self.seq2str(s) for s in input])
- values_result = expr.extract_results([self.seq2str(s) for s in result])
+ filename = os.path.join(result_dir, f"expr_result_{n_epoch:04d}.txt")
+ with open(filename, "w") as f:
for i, r in zip(values_input, values_result):
for n, vi in i.items():
vr = r.get(n)
+ f.write(f"{vi} {-1 if vr is None else vr}\n")
+
if vr is None or vr < 0:
nb_missed += 1
else:
else:
nb_delta[d] += 1
- ######################################################################
+ ######################################################################
- return nb_total, nb_correct, nb_delta, nb_missed
+ return nb_total, nb_correct, nb_delta, nb_missed
- (
- test_nb_total,
- test_nb_correct,
- test_nb_delta,
- test_nb_missed,
- ) = compute_nb_correct(self.test_input[:10000])
+ (
+ test_nb_total,
+ test_nb_correct,
+ test_nb_delta,
+ test_nb_missed,
+ ) = compute_nb_correct(self.test_input[:10000])
- logger(
- f"accuracy_test {n_epoch} nb_total {test_nb_total} nb_correct {test_nb_correct} accuracy {(100.0*test_nb_correct)/test_nb_total:.02f}%"
- )
+ logger(
+ f"accuracy_test {n_epoch} nb_total {test_nb_total} nb_correct {test_nb_correct} accuracy {(100.0*test_nb_correct)/test_nb_total:.02f}%"
+ )
- nb_total = test_nb_delta.sum() + test_nb_missed
- for d in range(test_nb_delta.size(0)):
- logger(
- f"error_value {n_epoch} delta {d} {test_nb_delta[d]} {test_nb_delta[d]*100/nb_total:.02f}%"
- )
+ nb_total = test_nb_delta.sum() + test_nb_missed
+ for d in range(test_nb_delta.size(0)):
logger(
- f"error_value {n_epoch} missed {test_nb_missed} {test_nb_missed*100/nb_total:.02f}%"
+ f"error_value {n_epoch} delta {d} {test_nb_delta[d]} {test_nb_delta[d]*100/nb_total:.02f}%"
)
+ logger(
+ f"error_value {n_epoch} missed {test_nb_missed} {test_nb_missed*100/nb_total:.02f}%"
+ )
- ##############################################################
- # Log a few generated sequences
- if input_file is None:
- input = self.test_input[:10]
- else:
- with open(input_file, "r") as f:
- sequences = [e.strip() for e in f.readlines()]
- sequences = [s + " " + "#" * 50 for s in sequences]
- input = self.tensorize(sequences)
+ ##############################################################
+ # Log a few generated sequences
+ if input_file is None:
+ input = self.test_input[:10]
+ else:
+ with open(input_file, "r") as f:
+ sequences = [e.strip() for e in f.readlines()]
+ sequences = [s + " " + "#" * 50 for s in sequences]
+ input = self.tensorize(sequences)
- result = input.clone()
- s = (result == self.space).long()
- ar_mask = (s.cumsum(dim=1) - s).clamp(min=0, max=1)
- result = (1 - ar_mask) * result + ar_mask * self.filler
+ result = input.clone()
+ s = (result == self.space).long()
+ ar_mask = (s.cumsum(dim=1) - s).clamp(min=0, max=1)
+ result = (1 - ar_mask) * result + ar_mask * self.filler
- for n in range(result.size(0)):
- logger(f"test_before {self.seq2str(result[n])}")
+ for n in range(result.size(0)):
+ logger(f"test_before {self.seq2str(result[n])}")
- masked_inplace_autoregression(
- model,
- self.batch_size,
- result,
- ar_mask,
- deterministic_synthesis,
- device=self.device,
- )
+ masked_inplace_autoregression(
+ model,
+ self.batch_size,
+ result,
+ ar_mask,
+ deterministic_synthesis,
+ device=self.device,
+ )
+
+ correct = (1 - ar_mask) * self.space + ar_mask * input
+ for n in range(result.size(0)):
+ comment = "GOOD" if (result[n] - input[n]).abs().max() == 0 else ""
+ logger(f"test_after {self.seq2str(result[n])} {comment}")
+ logger(f"truth {self.seq2str(correct[n])}")
+ ##############################################################
+
+
+######################################################################
+
+import world
+
+
+class World(Task):
+ def __init__(
+ self,
+ nb_train_samples,
+ nb_test_samples,
+ batch_size,
+ vqae_nb_epochs,
+ logger=None,
+ device=torch.device("cpu"),
+ device_storage=torch.device("cpu"),
+ ):
+ super().__init__()
+
+ self.batch_size = batch_size
+ self.device = device
+
+ (
+ train_frames,
+ train_action_seq,
+ test_frames,
+ test_action_seq,
+ self.frame2seq,
+ self.seq2frame,
+ ) = world.create_data_and_processors(
+ nb_train_samples,
+ nb_test_samples,
+ mode="first_last",
+ nb_steps=30,
+ nb_epochs=vqae_nb_epochs,
+ logger=logger,
+ device=device,
+ device_storage=device_storage,
+ )
+
+ print(f"{train_action_seq.size()=}")
+
+ train_frame_seq = self.frame2seq(train_frames).to(device_storage)
+ test_frame_seq = self.frame2seq(test_frames).to(device_storage)
+
+ nb_frame_codes = max(train_frame_seq.max(), test_frame_seq.max()) + 1
+ nb_action_codes = max(train_action_seq.max(), test_action_seq.max()) + 1
- correct = (1 - ar_mask) * self.space + ar_mask * input
- for n in range(result.size(0)):
- comment = "GOOD" if (result[n] - input[n]).abs().max() == 0 else ""
- logger(f"test_after {self.seq2str(result[n])} {comment}")
- logger(f"truth {self.seq2str(correct[n])}")
- ##############################################################
+ self.len_frame_seq = train_frame_seq.size(1)
+ self.len_action_seq = train_action_seq.size(1)
+ self.nb_codes = nb_frame_codes + nb_action_codes
- model.train(t)
+ train_frame_seq = train_frame_seq.reshape(train_frame_seq.size(0) // 2, 2, -1)
+ print(f"{train_action_seq.device=} {nb_frame_codes.device=}")
+ train_action_seq += nb_frame_codes
+ self.train_input = torch.cat(
+ (train_frame_seq[:, 0, :], train_action_seq, train_frame_seq[:, 1, :]), 1
+ )
+
+ test_frame_seq = test_frame_seq.reshape(test_frame_seq.size(0) // 2, 2, -1)
+ test_action_seq += nb_frame_codes
+ self.test_input = torch.cat(
+ (test_frame_seq[:, 0, :], test_action_seq, test_frame_seq[:, 1, :]), 1
+ )
+
+ def batches(self, split="train", nb_to_use=-1, desc=None):
+ assert split in {"train", "test"}
+ input = self.train_input if split == "train" else self.test_input
+ if nb_to_use > 0:
+ input = input[:nb_to_use]
+ if desc is None:
+ desc = f"epoch-{split}"
+ for batch in tqdm.tqdm(
+ input.split(self.batch_size), dynamic_ncols=True, desc=desc
+ ):
+ yield batch.to(self.device)
+
+ def vocabulary_size(self):
+ return self.nb_codes
+
+ def produce_results(
+ self, n_epoch, model, result_dir, logger, deterministic_synthesis
+ ):
+ k = torch.arange(
+ 2 * self.len_frame_seq + self.len_action_seq, device=self.device
+ )[None, :]
+
+ input = self.test_input[:64].to(self.device)
+ result = input.clone()
+
+ ar_mask = (
+ (k >= self.len_frame_seq + self.len_action_seq).long().expand_as(result)
+ )
+ result *= 1 - ar_mask
+
+ masked_inplace_autoregression(
+ model,
+ self.batch_size,
+ result,
+ ar_mask,
+ deterministic_synthesis,
+ device=self.device,
+ )
+
+ seq_start = input[:, : self.len_frame_seq]
+ seq_end = input[:, self.len_frame_seq + self.len_action_seq :]
+ seq_predicted = result[:, self.len_frame_seq + self.len_action_seq :]
+
+ result = torch.cat(
+ (seq_start[:, None, :], seq_end[:, None, :], seq_predicted[:, None, :]), 1
+ )
+ result = result.reshape(-1, result.size(-1))
+ print(f"{result.size()=}")
+
+ frames = self.seq2frame(result)
+ image_name = os.path.join(result_dir, f"world_result_{n_epoch:04d}.png")
+ torchvision.utils.save_image(
+ frames.float() / (world.Box.nb_rgb_levels - 1),
+ image_name,
+ nrow=12,
+ padding=1,
+ pad_value=0.0,
+ )
+ logger(f"wrote {image_name}")
######################################################################