3 # Any copyright is dedicated to the Public Domain.
4 # https://creativecommons.org/publicdomain/zero/1.0/
6 # Written by Francois Fleuret <francois@fleuret.org>
8 # This is an implementation from scratch of a "GPT", that is a model
9 # composed of several causal self-attention blocks. It is equipped
10 # with a caching mechanism for keys and values to avoid a O(N^3) cost
11 # for auto-regression.
18 from torch.nn import functional as F
20 ######################################################################
22 # A BracketedSequence is a BxTx... tensor with a first and a nb time
25 # Modules able to process it expect that they will have to process a
26 # first bracket starting at t=0, followed by a succession of brackets
27 # that move forward in time, do not overlap, and cover the axis T with
30 # Although it is more general, for a classical prompt-conditioned
31 # auto-regressive process it will be a first bracket starting at 0 and
32 # of arbitrary length for the "prompt", followed by brackets of length
33 # 1 for the successive tokens.
35 # Modules able to process brackets may implement a cache that is
36 # resetted when the input bracket starts at t=0
39 class BracketedSequence:
40 def __init__(self, x, first=None, nb=None):
42 self.first = 0 if first is None else first
43 self.nb = x.size(1) if nb is None else nb
46 return self.x[:, self.first : self.first + self.nb]
49 return self.first == 0 and self.nb == self.x.size(1)
52 ######################################################################
55 class CacheWrapper(nn.Module):
56 def __init__(self, *f):
58 self.f = f[0] if len(f) == 1 else nn.Sequential(*f)
60 def forward(self, bs):
62 y = self.f(bs.slice())
63 self.cache_y = y.new(*((y.size(0), bs.x.size(1)) + y.size()[2:]))
64 self.cache_y[:, bs.first : bs.first + bs.nb] = y
66 self.cache_y[:, bs.first : bs.first + bs.nb] = self.f(bs.slice())
68 return BracketedSequence(self.cache_y, bs.first, bs.nb)
71 ##############################
74 class WithResidual(nn.Module):
75 def __init__(self, *f):
77 self.f = f[0] if len(f) == 1 else nn.Sequential(*f)
79 def forward(self, bs):
80 return BracketedSequence(bs.x + self.f(bs).x, bs.first, bs.nb)
83 ##############################
86 class AddPositionalEncoding(nn.Module):
87 def __init__(self, len_max):
89 self.len_max = len_max
91 # [Vaswani et al 2018] PE_{t,2i} = sin(t/(L^{2i/D})), PE_{t,2i+1} = cos(t/(L^{2i/D}))
93 def forward(self, bs):
95 t = torch.arange(bs.x.size(1), dtype=bs.x.dtype, device=bs.x.device)[
98 j = torch.arange(bs.x.size(2), dtype=bs.x.dtype, device=bs.x.device)[
103 t / (self.len_max ** ((j - k) / bs.x.size(2))) + math.pi / 2 * k
105 self.cache_y = bs.x.new(bs.x.size())
107 self.cache_y[:, bs.first : bs.first + bs.nb] = (
108 bs.slice() + self.pe[bs.first : bs.first + bs.nb]
111 return BracketedSequence(self.cache_y, bs.first, bs.nb)
114 ##############################
117 class QKVAttention(nn.Module):
125 attention_dropout=0.0,
130 return nn.Parameter(torch.randn(*d) / math.sqrt(d[-1]))
133 self.attention_dropout = attention_dropout
134 self.record_attention = False
136 self.w_q = randw(nb_heads, dim_qk, dim_in)
137 self.w_k = randw(nb_heads, dim_qk, dim_in)
138 self.w_v = randw(nb_heads, dim_v, dim_in)
139 self.w_o = randw(dim_v * nb_heads, dim_in)
141 def forward(self, bs_q):
145 self.causal or bs_q.complete()
146 ), "Partial evaluation is only possible for causal models"
149 self.cache_k = x_q.new_zeros(
150 x_q.size(0), self.w_k.size(0), x_q.size(1), self.w_k.size(1)
152 self.cache_v = x_q.new_zeros(
153 x_q.size(0), self.w_v.size(0), x_q.size(1), self.w_v.size(1)
155 self.cache_y = x_q.new_zeros(x_q.size(0), x_q.size(1), self.w_o.size(1))
158 "ntc,hdc->nhtd", x_q[:, bs_q.first : bs_q.first + bs_q.nb], self.w_q
161 self.cache_k[:, :, bs_q.first : bs_q.first + bs_q.nb] = torch.einsum(
162 "ntc,hdc->nhtd", x_q[:, bs_q.first : bs_q.first + bs_q.nb], self.w_k
164 self.cache_v[:, :, bs_q.first : bs_q.first + bs_q.nb] = torch.einsum(
165 "ntc,hdc->nhtd", x_q[:, bs_q.first : bs_q.first + bs_q.nb], self.w_v
169 "nhtd,nhsd->nhts", q, self.cache_k[:, :, : bs_q.first + bs_q.nb]
170 ) / math.sqrt(self.w_q.size(1))
174 self.cache_attzero = (
175 torch.arange(x_q.size(1), device=q.device)[None, None, :, None]
176 < torch.arange(x_q.size(1), device=q.device)[None, None, None, :]
180 :, :, bs_q.first : bs_q.first + bs_q.nb, : bs_q.first + bs_q.nb
187 if self.record_attention:
190 a = F.dropout(a, self.attention_dropout, self.training)
193 "nhts,nhsd->nthd", a, self.cache_v[:, :, : bs_q.first + bs_q.nb]
196 self.cache_y[:, bs_q.first : bs_q.first + bs_q.nb] = y @ self.w_o
198 return BracketedSequence(self.cache_y, bs_q.first, bs_q.nb)
201 ##############################
204 class MyGPT(nn.Module):
219 assert dim_model % nb_heads == 0
221 self.embedding = nn.Sequential(
222 CacheWrapper(nn.Embedding(vocabulary_size, dim_model), nn.Dropout(dropout)),
223 AddPositionalEncoding(len_max),
228 for b in range(nb_blocks):
231 CacheWrapper(nn.LayerNorm((dim_model,))),
235 dim_v=dim_model // nb_heads,
238 attention_dropout=dropout,
243 nn.LayerNorm((dim_model,)),
244 nn.Linear(in_features=dim_model, out_features=dim_hidden),
246 nn.Linear(in_features=dim_hidden, out_features=dim_model),
252 self.trunk = nn.Sequential(*trunk_blocks)
254 self.readout = CacheWrapper(
255 nn.Linear(in_features=dim_model, out_features=vocabulary_size)
258 with torch.no_grad():
259 for m in self.modules():
260 if isinstance(m, nn.Embedding):
261 m.weight.normal_(mean=0, std=2e-2)
262 elif isinstance(m, nn.LayerNorm):
266 def forward(self, bs):
267 # print(f"GENERATE {bs.first} {bs.first+bs.nb}")
268 bs = BracketedSequence(F.pad(bs.x, (1, -1)), bs.first, bs.nb)
269 bs = self.embedding(bs)
271 bs = self.readout(bs)
274 # ar_mask is a tensor with 0s and 1s, of same shape as input, with
275 # 1s where tokens should be generated. The others are kept
278 def masked_inplace_autoregression(
284 deterministic_synthesis=False,
285 forbidden_tokens=None,
288 to_generate = (ar_mask.sum(0) > 0).nonzero()
290 if to_generate.min() > 0:
292 BracketedSequence(input, 0, to_generate.min())
293 ) # Needed to initialize the model's cache
294 for s in range(to_generate.min(), to_generate.max() + 1):
295 output = self(BracketedSequence(input, s, 1)).x
297 logits = output[:, s]
299 logits = (logits / temperature).log_softmax(dim=-1)
301 if forbidden_tokens is not None:
302 logits = logits.masked_fill(forbidden_tokens, float("-inf"))
304 if forced_biases is not None:
305 logits = logits + forced_biases[None, :]
307 if deterministic_synthesis:
308 t_next = logits.argmax(-1)
310 dist = torch.distributions.categorical.Categorical(logits=logits)
311 t_next = dist.sample()
312 if summed_logits is not None:
313 summed_logits += logits[torch.arange(t_next.size(0)), t_next].sum(
317 input[:, s] = ar_mask[:, s] * t_next + (1 - ar_mask[:, s]) * input[:, s]
319 def record_attention(self, v=True):
320 for m in self.modules():
321 if isinstance(m, QKVAttention):
322 m.record_attention = v
324 def retrieve_attention(self):
326 for m in self.modules():
327 if isinstance(m, QKVAttention):
332 ######################################################################
334 if __name__ == "__main__":
335 print("Basic check.")
338 x = torch.randint(vocabulary_size, (1, 5))
341 vocabulary_size=vocabulary_size,
352 y1 = model(BracketedSequence(x)).x
353 y2 = torch.randn_like(y1)
354 for s in range(x.size(1)):
355 z = model(BracketedSequence(x, s, 1))
358 print(f"error={((y1 - y2).norm() / (y1.norm() + y2.norm())).item()}")
360 ######################################################################