+ # --------------------------------------------------------------------
+ def compute_nb_errors_output(input, nb_to_log=0):
+ result = input.clone()
+ k = torch.arange(result.size(1), device=result.device)[None, :]
+ last_output_idx = ((result == self.t_output) * k).max(dim=1, keep_dim=True)
+ first_prog_idx = ((result == self.t_prog) * k).min(dim=1, keep_dim=True)
+ ar_mask = (k > last_output_idx).long() * (k < first_prog_idx)
+ result = (1 - ar_mask) * result + ar_mask * self.t_nul
+
+ masked_inplace_autoregression(
+ model,
+ self.batch_size,
+ result,
+ ar_mask,
+ deterministic_synthesis,
+ device=self.device,
+ )
+
+ sum_nb_total, sum_nb_errors = 0, 0
+ for x, y in zip(input, result):
+ seq = [self.id2token[i.item()] for i in y]
+ sum_nb_total += 1
+ sum_nb_errors += 0 if (x - y).abs().max() == 0 else 1
+ if nb_to_log > 0:
+ gt_seq = [self.id2token[i.item()] for i in x]
+ _, _, gt_prog, _ = rpl.compute_nb_errors(gt_seq)
+ gt_prog = " ".join([str(x) for x in gt_prog])
+ prog = " ".join([str(x) for x in prog])
+ comment = "*" if nb_errors == 0 else "-"
+ logger(f"{comment} PROG [{gt_prog}] PREDICTED [{prog}]")
+ for start_stack, target_stack, result_stack, correct in stacks:
+ comment = "*" if correct else "-"
+ start_stack = " ".join([str(x) for x in start_stack])
+ target_stack = " ".join([str(x) for x in target_stack])
+ result_stack = " ".join([str(x) for x in result_stack])
+ logger(
+ f" {comment} [{start_stack}] -> [{target_stack}] PREDICTED [{result_stack}]"
+ )
+ nb_to_log -= 1
+
+ return sum_nb_total, sum_nb_errors
+
+ # --------------------------------------------------------------------
+
+ test_nb_total, test_nb_errors = compute_nb_errors_prog(
+ self.test_input[:1000].to(self.device), nb_to_log=10