X-Git-Url: https://fleuret.org/cgi-bin/gitweb/gitweb.cgi?a=blobdiff_plain;f=reasoning.py;h=9e26d64e34d1d796187d4f2fb9ad0f055562472c;hb=5a0c2432316b0a413f1769ab429d33433a94e6e1;hp=003806a8070d50a272200e5e3bb8d9a59b13dbfb;hpb=0be6757c554ab40b08b4acfd90787a86f4c4cc5b;p=culture.git diff --git a/reasoning.py b/reasoning.py index 003806a..9e26d64 100755 --- a/reasoning.py +++ b/reasoning.py @@ -27,22 +27,46 @@ class Reasoning(problem.Problem): ("cyan", [0, 255, 255]), ("violet", [255, 0, 255]), ("lightgreen", [192, 255, 192]), - ("pink", [255, 192, 192]), + ("brown", [165, 42, 42]), ("lightblue", [192, 192, 255]), - ("gray", [192, 192, 192]), + ("gray", [128, 128, 128]), ] - def __init__( - self, - ): + def __init__(self, device=torch.device("cpu")): self.colors = torch.tensor([c for _, c in self.named_colors]) self.name2color = dict([(p[0], i) for i, p in enumerate(self.named_colors)]) self.height = 10 self.width = 10 + self.device = device ###################################################################### def frame2img(self, x, scale=15): + x = x.reshape(x.size(0), self.height, -1) + m = torch.logical_and(x >= 0, x < self.nb_token_values()).long() + x = self.colors[x * m].permute(0, 3, 1, 2) + s = x.shape + x = x[:, :, :, None, :, None].expand(-1, -1, -1, scale, -1, scale) + x = x.reshape(s[0], s[1], s[2] * scale, s[3] * scale) + + x[:, :, :, torch.arange(0, x.size(3), scale)] = 0 + x[:, :, torch.arange(0, x.size(2), scale), :] = 0 + x = x[:, :, 1:, 1:] + + for n in range(m.size(0)): + for i in range(m.size(1)): + for j in range(m.size(2)): + if m[n, i, j] == 0: + for k in range(2, scale - 2): + for l in [0, 1]: + x[n, :, i * scale + k, j * scale + k - l] = 0 + x[ + n, :, i * scale + scale - 1 - k, j * scale + k - l + ] = 0 + + return x + + def frame2img_(self, x, scale=15): x = x.reshape(x.size(0), self.height, -1) x = self.colors[x].permute(0, 3, 1, 2) s = x.shape @@ -170,7 +194,72 @@ class Reasoning(problem.Problem): def nb_token_values(self): return len(self.colors) - def rec_coo(self, x, n, min_height=3, min_width=3): + # That's quite a tensorial spaghetti mess to sample + # non-overlapping rectangles quickly, but made the generation of + # 100k samples go from 1h50 with a lame pure python code to 3min30s + # with this one. + def rec_coo(self, nb_rec, min_height=3, min_width=3): + nb_trials = 200 + + while True: + v = ( + ( + torch.rand(nb_trials * nb_rec, self.height + 1, device=self.device) + .sort(dim=-1) + .indices + < 2 + ) + .long() + .cumsum(dim=1) + == 1 + ).long() + + h = ( + ( + torch.rand(nb_trials * nb_rec, self.width + 1, device=self.device) + .sort(dim=-1) + .indices + < 2 + ) + .long() + .cumsum(dim=1) + == 1 + ).long() + + i = torch.logical_and( + v.sum(dim=-1) >= min_height, h.sum(dim=-1) >= min_width + ) + + v, h = v[i], h[i] + v = v[: v.size(0) - v.size(0) % nb_rec] + h = h[: h.size(0) - h.size(0) % nb_rec] + v = v.reshape(v.size(0) // nb_rec, nb_rec, -1) + h = h.reshape(h.size(0) // nb_rec, nb_rec, -1) + + r = v[:, :, :, None] * h[:, :, None, :] + + valid = r.sum(dim=1).flatten(1).max(dim=-1).values == 1 + + v = v[valid] + h = h[valid] + + if v.size(0) > 0: + break + + av = torch.arange(v.size(2), device=self.device)[None, :] + ah = torch.arange(h.size(2), device=self.device)[None, :] + + return [ + (i1.item(), j1.item(), i2.item() + 1, j2.item() + 1) + for i1, j1, i2, j2 in zip( + v.size(2) - (v[0] * (v.size(2) - av)).max(dim=-1).values, + h.size(2) - (h[0] * (h.size(2) - ah)).max(dim=-1).values, + (v[0] * av).max(dim=-1).values, + (h[0] * ah).max(dim=-1).values, + ) + ] + + def rec_coo_(self, x, n, min_height=3, min_width=3): collision = x.new(x.size()) while True: collision[...] = 0 @@ -195,23 +284,23 @@ class Reasoning(problem.Problem): ###################################################################### def task_replace_color(self, A, f_A, B, f_B): - N = 3 - c = torch.randperm(len(self.colors) - 1)[: N + 1] + 1 + nb_rec = 3 + c = torch.randperm(len(self.colors) - 1)[: nb_rec + 1] + 1 for X, f_X in [(A, f_A), (B, f_B)]: - r = self.rec_coo(X, N) - for n in range(N): + r = self.rec_coo(nb_rec) + for n in range(nb_rec): i1, j1, i2, j2 = r[n] X[i1:i2, j1:j2] = c[n] f_X[i1:i2, j1:j2] = c[n if n > 0 else -1] - def task_move(self, A, f_A, B, f_B): - di, dj = torch.randint(2, (2,)) * 2 - 1 - N = 3 - c = torch.randperm(len(self.colors) - 1)[:N] + 1 + def task_translate(self, A, f_A, B, f_B): + di, dj = torch.randint(3, (2,)) - 1 + nb_rec = 3 + c = torch.randperm(len(self.colors) - 1)[:nb_rec] + 1 for X, f_X in [(A, f_A), (B, f_B)]: while True: - r = self.rec_coo(X, N) - i1, j1, i2, j2 = r[N - 1] + r = self.rec_coo(nb_rec) + i1, j1, i2, j2 = r[nb_rec - 1] if ( i1 + di >= 0 and i2 + di < X.size(0) @@ -220,29 +309,29 @@ class Reasoning(problem.Problem): ): break - for n in range(N): + for n in range(nb_rec): i1, j1, i2, j2 = r[n] X[i1:i2, j1:j2] = c[n] - if n == N - 1: + if n == nb_rec - 1: f_X[i1 + di : i2 + di, j1 + dj : j2 + dj] = c[n] else: f_X[i1:i2, j1:j2] = c[n] def task_grow(self, A, f_A, B, f_B): di, dj = torch.randint(2, (2,)) * 2 - 1 - N = 3 - c = torch.randperm(len(self.colors) - 1)[:N] + 1 + nb_rec = 3 + c = torch.randperm(len(self.colors) - 1)[:nb_rec] + 1 direction = torch.randint(2, (1,)) for X, f_X in [(A, f_A), (B, f_B)]: while True: - r = self.rec_coo(X, N) - i1, j1, i2, j2 = r[N - 1] + r = self.rec_coo(nb_rec) + i1, j1, i2, j2 = r[nb_rec - 1] if i1 + 3 < i2 and j1 + 3 < j2: break - for n in range(N): + for n in range(nb_rec): i1, j1, i2, j2 = r[n] - if n == N - 1: + if n == nb_rec - 1: if direction == 0: X[i1 + 1 : i2 - 1, j1 + 1 : j2 - 1] = c[n] f_X[i1:i2, j1:j2] = c[n] @@ -255,54 +344,152 @@ class Reasoning(problem.Problem): def task_color_grow(self, A, f_A, B, f_B): di, dj = torch.randint(2, (2,)) * 2 - 1 - N = 3 - c = torch.randperm(len(self.colors) - 1)[: 2 * N] + 1 - direction = torch.randint(2, (1,)) + nb_rec = 3 + c = torch.randperm(len(self.colors) - 1)[: 2 * nb_rec] + 1 + direction = torch.randint(4, (1,)) for X, f_X in [(A, f_A), (B, f_B)]: - r = self.rec_coo(X, N) - for n in range(N): + r = self.rec_coo(nb_rec) + for n in range(nb_rec): i1, j1, i2, j2 = r[n] - i = (i1 + i2) // 2 X[i1:i2, j1:j2] = c[2 * n] - X[i : i + 1, j1:j2] = c[2 * n + 1] f_X[i1:i2, j1:j2] = c[2 * n] - if n == N - 1: - f_X[i:i2, j1:j2] = c[2 * n + 1] - else: - f_X[i : i + 1, j1:j2] = c[2 * n + 1] + # Not my proudest moment + if direction == 0: + i = (i1 + i2) // 2 + X[i : i + 1, j1:j2] = c[2 * n + 1] + if n == nb_rec - 1: + f_X[i:i2, j1:j2] = c[2 * n + 1] + else: + f_X[i : i + 1, j1:j2] = c[2 * n + 1] + elif direction == 1: + i = (i1 + i2 - 1) // 2 + X[i : i + 1, j1:j2] = c[2 * n + 1] + if n == nb_rec - 1: + f_X[i1 : i + 1, j1:j2] = c[2 * n + 1] + else: + f_X[i : i + 1, j1:j2] = c[2 * n + 1] + elif direction == 2: + j = (j1 + j2) // 2 + X[i1:i2, j : j + 1] = c[2 * n + 1] + if n == nb_rec - 1: + f_X[i1:i2, j:j2] = c[2 * n + 1] + else: + f_X[i1:i2, j : j + 1] = c[2 * n + 1] + elif direction == 3: + j = (j1 + j2 - 1) // 2 + X[i1:i2, j : j + 1] = c[2 * n + 1] + if n == nb_rec - 1: + f_X[i1:i2, j1 : j + 1] = c[2 * n + 1] + else: + f_X[i1:i2, j : j + 1] = c[2 * n + 1] def task_frame(self, A, f_A, B, f_B): - N = 3 - c = torch.randperm(len(self.colors) - 1)[: N + 1] + 1 + nb_rec = 3 + c = torch.randperm(len(self.colors) - 1)[: nb_rec + 1] + 1 for X, f_X in [(A, f_A), (B, f_B)]: - r = self.rec_coo(X, N) - for n in range(N): + r = self.rec_coo(nb_rec) + for n in range(nb_rec): i1, j1, i2, j2 = r[n] X[i1:i2, j1:j2] = c[n] f_X[i1:i2, j1:j2] = c[n] - if n == N - 1: + if n == nb_rec - 1: f_X[i1 + 1 : i2 - 1, j1 + 1 : j2 - 1] = 0 def task_detect(self, A, f_A, B, f_B): - N = 3 - c = torch.randperm(len(self.colors) - 1)[: N + 1] + 1 + nb_rec = 3 + c = torch.randperm(len(self.colors) - 1)[: nb_rec + 1] + 1 for X, f_X in [(A, f_A), (B, f_B)]: - r = self.rec_coo(X, N) - for n in range(N): + r = self.rec_coo(nb_rec) + for n in range(nb_rec): i1, j1, i2, j2 = r[n] X[i1:i2, j1:j2] = c[n] - f_X[i1, j1] = c[-1] + if n < nb_rec - 1: + f_X[i1, j1] = c[-1] + + def task_count(self, A, f_A, B, f_B): + N = torch.randint(4, (1,)) + 2 + c = torch.randperm(len(self.colors) - 1)[:N] + 1 + + for X, f_X in [(A, f_A), (B, f_B)]: + + def contact(i, j, q): + nq, nq_diag = 0, 0 + no = 0 + + for ii, jj in [ + (i - 1, j - 1), + (i - 1, j), + (i - 1, j + 1), + (i, j - 1), + (i, j + 1), + (i + 1, j - 1), + (i + 1, j), + (i + 1, j + 1), + ]: + if ii >= 0 and ii < self.height and jj >= 0 and jj < self.width: + if X[ii, jj] != 0 and X[ii, jj] != q: + no += 1 + + for ii, jj in [ + (i - 1, j - 1), + (i - 1, j + 1), + (i + 1, j - 1), + (i + 1, j + 1), + ]: + if ii >= 0 and ii < self.height and jj >= 0 and jj < self.width: + if X[ii, jj] == q and X[i, jj] != q and X[ii, j] != q: + nq_diag += 1 + + for ii, jj in [(i - 1, j), (i, j - 1), (i, j + 1), (i + 1, j)]: + if ii >= 0 and ii < self.height and jj >= 0 and jj < self.width: + if X[ii, jj] == q: + nq += 1 + + return no, nq, nq_diag + + nb = torch.zeros(N, dtype=torch.int64) + q = torch.randint(N, (self.height * self.width,)) + k = torch.randperm(self.height * self.width) + for p in range(self.height * self.width): + i, j = k[p] % self.height, k[p] // self.height + no, nq, nq_diag = contact(i, j, c[q[p]]) + if no == 0 and nq_diag == 0: + if nq == 0: + if nb[q[p]] < self.width: + X[i, j] = c[q[p]] + nb[q[p]] += 1 + if nq == 1: + X[i, j] = c[q[p]] + + for n in range(N): + for j in range(nb[n]): + f_X[n, j] = c[n] + + def task_count_(self, A, f_A, B, f_B): + N = torch.randint(3, (1,)) + 1 + c = torch.randperm(len(self.colors) - 1)[:N] + 1 + for X, f_X in [(A, f_A), (B, f_B)]: + nb = torch.randint(self.width, (3,)) + 1 + k = torch.randperm(self.height * self.width)[: nb.sum()] + p = 0 + for n in range(N): + for m in range(nb[n]): + i, j = k[p] % self.height, k[p] // self.height + X[i, j] = c[n] + f_X[n, m] = c[n] + p += 1 ###################################################################### - def generate_prompts_and_answers(self, nb): + def generate_prompts_and_answers(self, nb, device="cpu"): tasks = [ self.task_replace_color, - self.task_move, + self.task_translate, self.task_grow, self.task_color_grow, self.task_frame, self.task_detect, + self.task_count, ] prompts = torch.zeros(nb, self.height, self.width * 3, dtype=torch.int64) answers = torch.zeros(nb, self.height, self.width, dtype=torch.int64) @@ -320,6 +507,7 @@ class Reasoning(problem.Problem): f_B = answer task = tasks[torch.randint(len(tasks), (1,))] task(A, f_A, B, f_B) + return prompts.flatten(1), answers.flatten(1) def save_quizzes( @@ -353,14 +541,15 @@ if __name__ == "__main__": delay = time.perf_counter() - start_time print(f"{prompts.size(0)/delay:02f} seq/s") - # predicted_prompts = torch.rand(prompts.size(0)) < 0.5 - # predicted_answers = torch.logical_not(predicted_prompts) + predicted_prompts = torch.rand(prompts.size(0)) < 0.5 + predicted_answers = torch.logical_not(predicted_prompts) reasoning.save_quizzes( "/tmp", "test", - prompts[:36], - answers[:36], + prompts[:64], + answers[:64], # You can add a bool to put a frame around the predicted parts - # predicted_prompts, predicted_answers + # predicted_prompts[:64], + # predicted_answers[:64], )