+ sum_nb_total, sum_nb_errors = 0, 0
+ for one_input, one_result in zip(input, result):
+ seq = [self.id2token[i.item()] for i in one_result]
+ nb_total, nb_errors, prog, stacks = rpl.compute_nb_errors(seq)
+ sum_nb_total += 1
+ sum_nb_errors += 0 if nb_errors == 0 else 1
+ if nb_to_log > 0:
+ gt_seq = [self.id2token[i.item()] for i in one_input]
+ _, _, gt_prog, _ = rpl.compute_nb_errors(gt_seq)
+ gt_prog = " ".join([str(x) for x in gt_prog])
+ prog = " ".join([str(x) for x in prog])
+ comment = "*" if nb_errors == 0 else "-"
+ logger(f"{comment} PROG [{gt_prog}] PREDICTED [{prog}]")
+ for start_stack, target_stack, result_stack, correct in stacks:
+ comment = "*" if correct else "-"
+ start_stack = " ".join([str(x) for x in start_stack])
+ target_stack = " ".join([str(x) for x in target_stack])
+ result_stack = " ".join([str(x) for x in result_stack])
+ logger(
+ f" {comment} [{start_stack}] -> [{target_stack}] PREDICTED [{result_stack}]"
+ )
+ nb_to_log -= 1
+
+ return sum_nb_total, sum_nb_errors
+
+ # --------------------------------------------------------------------
+ def compute_nb_errors_output(input, nb_to_log=0):
+ result = input.clone()
+ k = torch.arange(result.size(1), device=result.device)[None, :]
+ last_output_idx = (
+ ((result == self.t_output) * k).max(dim=1, keepdim=True).values
+ )
+ first_prog_idx = (
+ ((result == self.t_prog) * k).max(dim=1, keepdim=True).values
+ )
+ ar_mask = (k > last_output_idx).long() * (k < first_prog_idx).long()
+ result = (1 - ar_mask) * result + ar_mask * self.t_nul
+
+ masked_inplace_autoregression(
+ model,
+ self.batch_size,
+ result,
+ ar_mask,
+ deterministic_synthesis,
+ device=self.device,
+ )