- errors = ((result != input).long() * ar_mask).reshape(
- -1, 1 + self.nb_digits
- )
- ar_mask = ar_mask.reshape(-1, 1 + self.nb_digits)
-
- nb_total = ar_mask.max(1).values.sum()
- nb_correct = nb_total - errors.max(1).values.sum()
-
- return nb_total, nb_correct
-
- test_nb_total, test_nb_correct = compute_nb_correct(self.test_input[:1000])
-
- logger(
- f"accuracy_test {n_epoch} nb_total {test_nb_total} nb_correct {test_nb_correct} accuracy {(100.0*test_nb_correct)/test_nb_total:.02f}%"
- )
-
- ##############################################################
- # Log a few generated sequences
- input = self.test_input[:10, : 12 * (1 + self.nb_digits)]
- result = input.clone()
- stack.remove_popped_values(result, self.nb_stacks, self.nb_digits)
- ar_mask = (result != input).long()
-
- # for n in range(result.size(0)):
- # logger(
- # f"test_before {stack.seq_to_str(result[n],nb_stacks=self.nb_stacks,nb_digits=self.nb_digits)}"
- # )
-
- masked_inplace_autoregression(
- model,
- self.batch_size,
- result,
- ar_mask,
- deterministic_synthesis,
- device=self.device,
- )
-
- for n in range(result.size(0)):
- logger(
- f"test_after {stack.seq_to_str(result[n],nb_stacks=self.nb_stacks,nb_digits=self.nb_digits)}"
- )
- ##############################################################
-
-
-######################################################################
-
-
-import expr
-
-
-class Expr(Task):
- def tensorize(self, sequences):
- len_max = max([len(x) for x in sequences])
- return torch.cat(
- [
- torch.tensor(
- [
- [self.char2id[c] for c in s + "#" * (len_max - len(s))]
- for s in sequences
- ]
- )
- ],
- 0,
- ).to(self.device)
-
- def __init__(
- self,
- nb_train_samples,
- nb_test_samples,
- nb_variables,
- sequence_length,
- operand_max,
- result_max,
- batch_size,
- device=torch.device("cpu"),
- ):
- super().__init__()
-
- self.batch_size = batch_size
- self.device = device
-
- train_sequences = expr.generate_sequences(
- nb_train_samples,
- nb_variables=nb_variables,
- length=sequence_length,
- operand_max=operand_max,
- result_max=result_max,
- )
-
- test_sequences = expr.generate_sequences(
- nb_test_samples,
- nb_variables=nb_variables,
- length=sequence_length,
- operand_max=operand_max,
- result_max=result_max,
- )
-
- symbols = list(set("#" + "".join(train_sequences + test_sequences)))
- symbols.sort()
-
- self.char2id = dict([(c, n) for n, c in enumerate(symbols)])
- self.id2char = dict([(n, c) for c, n in self.char2id.items()])
-
- self.filler, self.space = self.char2id["#"], self.char2id[" "]
-
- self.train_input = self.tensorize(train_sequences)
- self.test_input = self.tensorize(test_sequences)
-
- self.nb_codes = max(self.train_input.max(), self.test_input.max()) + 1
-
- def batches(self, split="train", nb_to_use=-1, desc=None):
- assert split in {"train", "test"}
- input = self.train_input if split == "train" else self.test_input
- if nb_to_use > 0:
- input = input[:nb_to_use]
- if desc is None:
- desc = f"epoch-{split}"
- for batch in tqdm.tqdm(
- input.split(self.batch_size), dynamic_ncols=True, desc=desc
- ):
- last = (batch != self.filler).max(0).values.nonzero().max() + 3
- batch = batch[:, :last]
- yield batch
-
- def vocabulary_size(self):
- return self.nb_codes