- sum_nb_total, sum_nb_errors = 0, 0
- for one_input, one_result in zip(input, result):
- seq = [self.id2token[i.item()] for i in one_result]
- nb_total, nb_errors, prog, stacks = rpl.compute_nb_errors(seq)
- sum_nb_total += 1
- sum_nb_errors += 0 if nb_errors == 0 else 1
- if nb_to_log > 0:
- gt_seq = [self.id2token[i.item()] for i in one_input]
- _, _, gt_prog, _ = rpl.compute_nb_errors(gt_seq)
- gt_prog = " ".join([str(x) for x in gt_prog])
- prog = " ".join([str(x) for x in prog])
- comment = "*" if nb_errors == 0 else "-"
- logger(f"{comment} PROG [{gt_prog}] PREDICTED [{prog}]")
- for start_stack, target_stack, result_stack, correct in stacks:
- comment = "*" if correct else "-"
- start_stack = " ".join([str(x) for x in start_stack])
- target_stack = " ".join([str(x) for x in target_stack])
- result_stack = " ".join([str(x) for x in result_stack])
- logger(
- f" {comment} [{start_stack}] -> [{target_stack}] PREDICTED [{result_stack}]"
- )
- nb_to_log -= 1
-
- return sum_nb_total, sum_nb_errors
-
- # --------------------------------------------------------------------
- def compute_nb_errors_output(input, nb_to_log=0):
- result = input.clone()
- k = torch.arange(result.size(1), device=result.device)[None, :]
- last_output_idx = (
- ((result == self.t_output) * k).max(dim=1, keepdim=True).values
- )
- first_prog_idx = (
- ((result == self.t_prog) * k).max(dim=1, keepdim=True).values
- )
- ar_mask = (k > last_output_idx).long() * (k < first_prog_idx).long()
- result = (1 - ar_mask) * result + ar_mask * self.t_nul
-
- masked_inplace_autoregression(
- model,
- self.batch_size,
- result,
- ar_mask,
- deterministic_synthesis,
- device=self.device,
- )
-
- sum_nb_total, sum_nb_errors = 0, 0
- for one_input, one_result, i, j in zip(
- input, result, last_output_idx, first_prog_idx
- ):
- seq = [self.id2token[i.item()] for i in one_result]
- sum_nb_total += 1
- correct = (one_input - one_result).abs().max() == 0
- sum_nb_errors += 0 if correct else 1
- if nb_to_log > 0:
- result_stack = [
- self.id2token[i.item()] for i in one_result[i : j + 1]
- ]
- target_stack = [
- self.id2token[i.item()] for i in one_input[i : j + 1]
- ]
- comment = "*" if correct else "-"
- result_stack = " ".join([str(x) for x in result_stack])
- target_stack = " ".join([str(x) for x in target_stack])
- logger(
- f"output_test {comment} [{target_stack}] PREDICTED [{result_stack}]"
- )
- nb_to_log -= 1
-
- return sum_nb_total, sum_nb_errors
-
- # --------------------------------------------------------------------
-
- if not self.no_prog:
- test_nb_total, test_nb_errors = compute_nb_errors_prog(
- self.test_input[:1000].to(self.device), nb_to_log=10
- )
-
- logger(
- f"accuracy_prog_test {n_epoch} nb_total {test_nb_total} nb_errors {test_nb_errors} accuracy {100.0*(1-test_nb_errors/test_nb_total):.02f}%"
- )
-
- test_nb_total, test_nb_errors = compute_nb_errors_output(
- self.test_input[:1000].to(self.device), nb_to_log=10
- )
-
- logger(
- f"accuracy_output_test {n_epoch} nb_total {test_nb_total} nb_errors {test_nb_errors} accuracy {100.0*(1-test_nb_errors/test_nb_total):.02f}%"
- )
-
- if save_attention_image is not None:
- ns = torch.randint(self.test_input.size(0), (1,)).item()
- input = self.test_input[ns : ns + 1].clone()
- last = (input != self.t_nul).max(0).values.nonzero().max() + 3
- input = input[:, :last].to(self.device)
-
- with torch.autograd.no_grad():
- t = model.training
- model.eval()
- model.record_attention(True)
- model(BracketedSequence(input))
- model.train(t)
- ram = model.retrieve_attention()
- model.record_attention(False)
-
- tokens_output = [self.id2token[i.item()] for i in input[0]]
- tokens_input = ["n/a"] + tokens_output[:-1]
- for n_head in range(ram[0].size(1)):
- filename = os.path.join(
- result_dir, f"rpl_attention_{n_epoch}_h{n_head}.pdf"
- )
- attention_matrices = [m[0, n_head] for m in ram]
- save_attention_image(
- filename,
- tokens_input,
- tokens_output,
- attention_matrices,
- k_top=10,
- # min_total_attention=0.9,
- token_gap=12,
- layer_gap=50,
- )
- logger(f"wrote {filename}")
-
-
-######################################################################
-
-
-import expr
-
-
-class Expr(Task):
- def tensorize(self, sequences):
- len_max = max([len(x) for x in sequences])
- return torch.cat(
- [
- torch.tensor(
- [
- [self.char2id[c] for c in s + "#" * (len_max - len(s))]
- for s in sequences
- ]
- )
- ],
- 0,
- ).to(self.device)
-
- def __init__(
- self,
- nb_train_samples,
- nb_test_samples,
- nb_variables,
- sequence_length,
- operand_max,
- result_max,
- batch_size,
- device=torch.device("cpu"),
- ):
- super().__init__()
-
- self.batch_size = batch_size
- self.device = device
-
- train_sequences = expr.generate_sequences(
- nb_train_samples,
- nb_variables=nb_variables,
- length=sequence_length,
- operand_max=operand_max,
- result_max=result_max,
- )
-
- test_sequences = expr.generate_sequences(
- nb_test_samples,
- nb_variables=nb_variables,
- length=sequence_length,
- operand_max=operand_max,
- result_max=result_max,
- )
-
- symbols = list(set("#" + "".join(train_sequences + test_sequences)))
- symbols.sort()
-
- self.char2id = dict([(c, n) for n, c in enumerate(symbols)])
- self.id2char = dict([(n, c) for c, n in self.char2id.items()])
-
- self.filler, self.space = self.char2id["#"], self.char2id[" "]
-
- self.train_input = self.tensorize(train_sequences)
- self.test_input = self.tensorize(test_sequences)
-
- self.nb_codes = max(self.train_input.max(), self.test_input.max()) + 1
-
- def batches(self, split="train", nb_to_use=-1, desc=None):
- assert split in {"train", "test"}
- input = self.train_input if split == "train" else self.test_input
- if nb_to_use > 0:
- input = input[:nb_to_use]
- if desc is None:
- desc = f"epoch-{split}"
- for batch in tqdm.tqdm(
- input.split(self.batch_size), dynamic_ncols=True, desc=desc
- ):
- last = (batch != self.filler).max(0).values.nonzero().max() + 3
- batch = batch[:, :last]
- yield batch
-
- def vocabulary_size(self):
- return self.nb_codes