##############################
+class NoiseInjector(nn.Module):
+ def __init__(self):
+ super().__init__()
+ self.noise_std = 0.0
+
+ def forward(self, x):
+ if self.noise_std > 0:
+ x = x + torch.randn(x.size(), device=x.device) * self.noise_std
+ return x
+
+
+def set_noise_injection(model, noise_std):
+ for m in model.modules():
+ if isinstance(m, NoiseInjector):
+ m.noise_std = noise_std
+
+
+##############################
+
+
class MyGPT(nn.Module):
def __init__(
self,
for b in range(nb_blocks):
trunk_blocks += [
WithResidual(
- CacheWrapper(nn.LayerNorm((dim_model,))),
+ CacheWrapper(
+ nn.LayerNorm((dim_model,)),
+ NoiseInjector(),
+ ),
QKVAttention(
dim_in=dim_model,
dim_qk=dim_keys,
WithResidual(
CacheWrapper(
nn.LayerNorm((dim_model,)),
+ NoiseInjector(),
nn.Linear(in_features=dim_model, out_features=dim_hidden),
nn.ReLU(),
nn.Linear(in_features=dim_hidden, out_features=dim_model),
bs = self.readout(bs)
return bs
- # ar_mask is a tensor with 0s and 1s, of same shape as input, with
- # 1s where tokens should be generated. The others are kept
- # unchanged.
-
- def masked_inplace_autoregression(
- self,
- input,
- ar_mask,
- summed_logits,
- temperature=1.0,
- deterministic_synthesis=False,
- forbidden_tokens=None,
- forced_biases=None,
- ):
- to_generate = (ar_mask.sum(0) > 0).nonzero()
-
- if to_generate.min() > 0:
- self(
- BracketedSequence(input, 0, to_generate.min())
- ) # Needed to initialize the model's cache
- for s in range(to_generate.min(), to_generate.max() + 1):
- output = self(BracketedSequence(input, s, 1)).x
-
- logits = output[:, s]
-
- logits = (logits / temperature).log_softmax(dim=-1)
-
- if forbidden_tokens is not None:
- logits = logits.masked_fill(forbidden_tokens, float("-inf"))
-
- if forced_biases is not None:
- logits = logits + forced_biases[None, :]
-
- if deterministic_synthesis:
- t_next = logits.argmax(-1)
- else:
- dist = torch.distributions.categorical.Categorical(logits=logits)
- t_next = dist.sample()
- if summed_logits is not None:
- summed_logits += logits[torch.arange(t_next.size(0)), t_next].sum(
- dim=-1
- )
-
- input[:, s] = ar_mask[:, s] * t_next + (1 - ar_mask[:, s]) * input[:, s]
-
def record_attention(self, v=True):
for m in self.modules():
if isinstance(m, QKVAttention):