3 # Any copyright is dedicated to the Public Domain.
4 # https://creativecommons.org/publicdomain/zero/1.0/
6 # Written by Francois Fleuret <francois@fleuret.org>
13 from torch.nn import functional as F
15 ######################################################################
17 # A BracketedSequence is a BxTx... tensor with a first and a nb time
20 # Modules able to process it expect that they will have to process a
21 # first bracket starting at t=0, followed by a succession of brackets
22 # that move forward in time, do not overlap, and cover the axis T with
25 # Although it is more general, for a classical prompt-conditioned
26 # auto-regressive process it will be a first bracket starting at 0 and
27 # of arbitrary length for the "prompt", followed by brackets of length
28 # 1 for the successive tokens.
30 # Modules able to process brackets may implement a cache that is
31 # resetted when the input bracket starts at t=0
34 class BracketedSequence:
35 def __init__(self, x, first=None, nb=None):
37 self.first = 0 if first is None else first
38 self.nb = x.size(1) if nb is None else nb
41 return self.x[:, self.first : self.first + self.nb]
44 ######################################################################
47 class WithResidual(nn.Module):
48 def __init__(self, *f):
50 self.f = f[0] if len(f) == 1 else nn.Sequential(*f)
52 def forward(self, bs):
53 bs.x = bs.x + self.f(bs).x
57 ######################################################################
60 class CacheWrapper(nn.Module):
61 def __init__(self, *f):
63 self.f = f[0] if len(f) == 1 else nn.Sequential(*f)
65 def forward(self, bs):
67 y = self.f(bs.slice())
68 self.cache_y = y.new(*((y.size(0), bs.x.size(1)) + y.size()[2:]))
69 self.cache_y[:, bs.first : bs.first + bs.nb] = y
71 self.cache_y[:, bs.first : bs.first + bs.nb] = self.f(bs.slice())
78 ##############################
81 class AddPositionalEncoding(nn.Module):
82 def __init__(self, len_max):
84 self.len_max = len_max
86 # [Vaswani et al 2018] PE_{t,2i} = sin(t/(L^{2i/D})), PE_{t,2i+1} = cos(t/(L^{2i/D}))
88 def forward(self, bs, order): # NxTxD, T
91 torch.arange(bs.x.size(1) + 1, dtype=bs.x.dtype, device=bs.x.device)[
96 j = torch.arange(bs.x.size(2) // 2, dtype=bs.x.dtype, device=bs.x.device)[
102 t / (self.len_max ** ((j - k) / bs.x.size(2))) + math.pi / 2 * k
105 .expand(bs.x.size(0), -1, -1)
108 order_output = order + 1
109 order_input = F.pad(order + 1, (1, -1))
111 pe_input = pe.gather(
112 1, order_input.unsqueeze(-1).expand(-1, -1, pe.size(-1))
114 pe_output = pe.gather(
115 1, order_output.unsqueeze(-1).expand(-1, -1, pe.size(-1))
118 self.pe = torch.cat((pe_input, pe_output), 2)
119 self.cache_y = bs.x.new(bs.x.size())
121 self.cache_y[:, bs.first : bs.first + bs.nb] = (
122 bs.slice() + self.pe[:, bs.first : bs.first + bs.nb]
130 ##############################
133 class QKVAttention(nn.Module):
141 attention_dropout=0.0,
147 return nn.Parameter(torch.randn(*d) / math.sqrt(d[-1]))
149 if amm_generator is None:
150 self.amm_generator = (
151 lambda d: torch.arange(d)[None, None, :, None]
152 < torch.arange(d)[None, None, None, :]
155 self.amm_generator = amm_generator
158 self.attention_dropout = attention_dropout
160 self.w_q = randw(nb_heads, dim_qk, dim_in)
161 self.w_k = randw(nb_heads, dim_qk, dim_in)
162 self.w_v = randw(nb_heads, dim_v, dim_in)
163 self.w_o = randw(dim_v * nb_heads, dim_in)
165 def forward(self, bs_q):
169 self.cache_k = x_q.new_zeros(
170 x_q.size(0), self.w_k.size(0), x_q.size(1), self.w_k.size(1)
172 self.cache_v = x_q.new_zeros(
173 x_q.size(0), self.w_v.size(0), x_q.size(1), self.w_v.size(1)
175 self.cache_y = x_q.new_zeros(x_q.size(0), x_q.size(1), self.w_o.size(1))
178 "ntc,hdc->nhtd", x_q[:, bs_q.first : bs_q.first + bs_q.nb], self.w_q
180 self.cache_k[:, :, bs_q.first : bs_q.first + bs_q.nb] = torch.einsum(
181 "ntc,hdc->nhtd", x_q[:, bs_q.first : bs_q.first + bs_q.nb], self.w_k
183 self.cache_v[:, :, bs_q.first : bs_q.first + bs_q.nb] = torch.einsum(
184 "ntc,hdc->nhtd", x_q[:, bs_q.first : bs_q.first + bs_q.nb], self.w_v
188 "nhtd,nhsd->nhts", q, self.cache_k[:, :, : bs_q.first + bs_q.nb]
189 ) / math.sqrt(self.w_q.size(1))
193 self.cache_attzero = self.amm_generator(x_q.size(1)).to(q.device)
196 :, :, bs_q.first : bs_q.first + bs_q.nb, : bs_q.first + bs_q.nb
202 a = F.dropout(a, self.attention_dropout, self.training)
205 "nhts,nhsd->nthd", a, self.cache_v[:, :, : bs_q.first + bs_q.nb]
208 self.cache_y[:, bs_q.first : bs_q.first + bs_q.nb] = y @ self.w_o
210 bs_q.x = self.cache_y
215 ##############################
218 class MyGPT(nn.Module):
234 assert dim_model % nb_heads == 0
236 self.embedding = CacheWrapper(
237 nn.Embedding(vocabulary_size, dim_model), nn.Dropout(dropout)
239 self.pe = AddPositionalEncoding(len_max)
243 for b in range(nb_blocks):
246 CacheWrapper(nn.LayerNorm((dim_model,))),
250 dim_v=dim_model // nb_heads,
253 attention_dropout=dropout,
254 amm_generator=amm_generator,
259 nn.LayerNorm((dim_model,)),
260 nn.Linear(in_features=dim_model, out_features=dim_hidden),
262 nn.Linear(in_features=dim_hidden, out_features=dim_model),
268 self.trunk = nn.Sequential(*trunk_blocks)
270 self.readout = CacheWrapper(
271 nn.Linear(in_features=dim_model, out_features=vocabulary_size)
274 with torch.no_grad():
275 for m in self.modules():
276 if isinstance(m, nn.Embedding):
277 m.weight.normal_(mean=0, std=2e-2)
278 elif isinstance(m, nn.LayerNorm):
282 def forward(self, bs, mode="standard", order=None):
283 bs = BracketedSequence(F.pad(bs.x, (1, -1)), bs.first, bs.nb)
285 order = torch.arange(bs.x.size(1), device=bs.x.device)[None, :].expand_as(
288 bs = self.embedding(bs)
289 bs = self.pe(bs, order)
291 if mode == "standard":
293 bs = self.readout(bs)
301 bs = BracketedSequence(torch.cat(r, -1))
303 raise ValueError(f"{mode=}")
307 ######################################################################
309 if __name__ == "__main__":
310 print("Basic check.")
313 x = torch.randint(vocabulary_size, (9, 7))
316 vocabulary_size=vocabulary_size,
327 y1 = model(BracketedSequence(x)).x
329 y2 = torch.randn_like(y1)
330 for s in range(x.size(1)):
331 z = model(BracketedSequence(x, s, 1))
334 # print(y1.max(dim = 2).values)
335 # print(y2.max(dim = 2).values)
336 print(f"error={((y1 - y2).norm() / (y1.norm() + y2.norm())).item()}")
338 ######################################################################