# Written by Francois Fleuret <francois@fleuret.org>
+# This is an implementation from scratch of a "GPT", that is a model
+# composed of several causal self-attention blocks. It is equipped
+# with a caching mechanism for keys and values to avoid a O(N^3) cost
+# for auto-regression.
+
import math
import torch
######################################################################
-
-class WithResidual(nn.Module):
- def __init__(self, *f):
- super().__init__()
- self.f = f[0] if len(f) == 1 else nn.Sequential(*f)
-
- def forward(self, bs):
- bs.x = bs.x + self.f(bs).x
- return bs
-
-
-######################################################################
-
# A BracketedSequence is a BxTx... tensor with a first and a nb time
# steps to compute.
##############################
+class WithResidual(nn.Module):
+ def __init__(self, *f):
+ super().__init__()
+ self.f = f[0] if len(f) == 1 else nn.Sequential(*f)
+
+ def forward(self, bs):
+ bs.x = bs.x + self.f(bs).x
+ return bs
+
+
+##############################
+
+
class AddPositionalEncoding(nn.Module):
def __init__(self, len_max):
super().__init__()
self.w_v = randw(nb_heads, dim_v, dim_in)
self.w_o = randw(dim_v * nb_heads, dim_in)
- def forward(self, bs_q, x_kv=None):
+ def forward(self, bs_q):
x_q = bs_q.x
- if x_kv is None:
- x_kv = x_q
if bs_q.first == 0:
self.cache_k = x_q.new_zeros(
- x_q.size(0), self.w_k.size(0), x_kv.size(1), self.w_k.size(1)
+ x_q.size(0), self.w_k.size(0), x_q.size(1), self.w_k.size(1)
)
self.cache_v = x_q.new_zeros(
- x_q.size(0), self.w_v.size(0), x_kv.size(1), self.w_v.size(1)
+ x_q.size(0), self.w_v.size(0), x_q.size(1), self.w_v.size(1)
)
self.cache_y = x_q.new_zeros(x_q.size(0), x_q.size(1), self.w_o.size(1))
"ntc,hdc->nhtd", x_q[:, bs_q.first : bs_q.first + bs_q.nb], self.w_q
)
self.cache_k[:, :, bs_q.first : bs_q.first + bs_q.nb] = torch.einsum(
- "ntc,hdc->nhtd", x_kv[:, bs_q.first : bs_q.first + bs_q.nb], self.w_k
+ "ntc,hdc->nhtd", x_q[:, bs_q.first : bs_q.first + bs_q.nb], self.w_k
)
self.cache_v[:, :, bs_q.first : bs_q.first + bs_q.nb] = torch.einsum(
- "ntc,hdc->nhtd", x_kv[:, bs_q.first : bs_q.first + bs_q.nb], self.w_v
+ "ntc,hdc->nhtd", x_q[:, bs_q.first : bs_q.first + bs_q.nb], self.w_v
)
a = torch.einsum(
if bs_q.first == 0:
self.cache_attzero = (
torch.arange(x_q.size(1), device=q.device)[None, None, :, None]
- < torch.arange(x_kv.size(1), device=q.device)[None, None, None, :]
+ < torch.arange(x_q.size(1), device=q.device)[None, None, None, :]
)
a = a.masked_fill(
self.cache_attzero[
dropout=0.0,
len_max=1e5,
):
-
super().__init__()
assert dim_model % nb_heads == 0
######################################################################
if __name__ == "__main__":
-
print("Basic check.")
vocabulary_size = 10