|
import torch |
|
from torch import einsum, nn |
|
import torch.nn.functional as F |
|
from einops import rearrange, repeat |
|
|
|
|
|
|
|
def exists(val): |
|
return val is not None |
|
|
|
def default(val, d): |
|
return val if exists(val) else d |
|
|
|
|
|
|
|
|
|
|
|
class LayerNorm(nn.Module): |
|
def __init__(self, dim): |
|
super().__init__() |
|
self.weight = nn.Parameter(torch.ones(dim)) |
|
self.register_buffer("bias", torch.zeros(dim)) |
|
|
|
def forward(self, x): |
|
return F.layer_norm(x, x.shape[-1:], self.weight, self.bias) |
|
|
|
|
|
|
|
|
|
class Residual(nn.Module): |
|
def __init__(self, fn): |
|
super().__init__() |
|
self.fn = fn |
|
|
|
def forward(self, x, *args, **kwargs): |
|
return self.fn(x, *args, **kwargs) + x |
|
|
|
|
|
|
|
|
|
|
|
|
|
class RotaryEmbedding(nn.Module): |
|
def __init__(self, dim): |
|
super().__init__() |
|
inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim)) |
|
self.register_buffer("inv_freq", inv_freq) |
|
|
|
def forward(self, max_seq_len, *, device): |
|
seq = torch.arange(max_seq_len, device=device, dtype=self.inv_freq.dtype) |
|
freqs = einsum("i , j -> i j", seq, self.inv_freq) |
|
return torch.cat((freqs, freqs), dim=-1) |
|
|
|
|
|
def rotate_half(x): |
|
x = rearrange(x, "... (j d) -> ... j d", j=2) |
|
x1, x2 = x.unbind(dim=-2) |
|
return torch.cat((-x2, x1), dim=-1) |
|
|
|
|
|
def apply_rotary_pos_emb(pos, t): |
|
return (t * pos.cos()) + (rotate_half(t) * pos.sin()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
class SwiGLU(nn.Module): |
|
def forward(self, x): |
|
x, gate = x.chunk(2, dim=-1) |
|
return F.silu(gate) * x |
|
|
|
|
|
|
|
|
|
|
|
class ParallelTransformerBlock(nn.Module): |
|
def __init__(self, dim, dim_head=64, heads=8, ff_mult=4): |
|
super().__init__() |
|
self.norm = LayerNorm(dim) |
|
|
|
attn_inner_dim = dim_head * heads |
|
ff_inner_dim = dim * ff_mult |
|
self.fused_dims = (attn_inner_dim, dim_head, dim_head, (ff_inner_dim * 2)) |
|
|
|
self.heads = heads |
|
self.scale = dim_head**-0.5 |
|
self.rotary_emb = RotaryEmbedding(dim_head) |
|
|
|
self.fused_attn_ff_proj = nn.Linear(dim, sum(self.fused_dims), bias=False) |
|
self.attn_out = nn.Linear(attn_inner_dim, dim, bias=False) |
|
|
|
self.ff_out = nn.Sequential( |
|
SwiGLU(), |
|
nn.Linear(ff_inner_dim, dim, bias=False) |
|
) |
|
|
|
self.register_buffer("pos_emb", None, persistent=False) |
|
|
|
|
|
def get_rotary_embedding(self, n, device): |
|
if self.pos_emb is not None and self.pos_emb.shape[-2] >= n: |
|
return self.pos_emb[:n] |
|
|
|
pos_emb = self.rotary_emb(n, device=device) |
|
self.register_buffer("pos_emb", pos_emb, persistent=False) |
|
return pos_emb |
|
|
|
def forward(self, x, attn_mask=None): |
|
""" |
|
einstein notation |
|
b - batch |
|
h - heads |
|
n, i, j - sequence length (base sequence length, source, target) |
|
d - feature dimension |
|
""" |
|
|
|
n, device, h = x.shape[1], x.device, self.heads |
|
|
|
|
|
|
|
x = self.norm(x) |
|
|
|
|
|
|
|
q, k, v, ff = self.fused_attn_ff_proj(x).split(self.fused_dims, dim=-1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
q = rearrange(q, "b n (h d) -> b h n d", h=h) |
|
|
|
|
|
|
|
positions = self.get_rotary_embedding(n, device) |
|
q, k = map(lambda t: apply_rotary_pos_emb(positions, t), (q, k)) |
|
|
|
|
|
|
|
q = q * self.scale |
|
|
|
|
|
|
|
sim = einsum("b h i d, b j d -> b h i j", q, k) |
|
|
|
|
|
|
|
|
|
if exists(attn_mask): |
|
attn_mask = rearrange(attn_mask, 'b i j -> b 1 i j') |
|
sim = sim.masked_fill(~attn_mask, -torch.finfo(sim.dtype).max) |
|
|
|
|
|
|
|
sim = sim - sim.amax(dim=-1, keepdim=True).detach() |
|
attn = sim.softmax(dim=-1) |
|
|
|
|
|
|
|
out = einsum("b h i j, b j d -> b h i d", attn, v) |
|
|
|
|
|
|
|
out = rearrange(out, "b h n d -> b n (h d)") |
|
return self.attn_out(out) + self.ff_out(ff) |
|
|
|
|
|
|
|
class CrossAttention(nn.Module): |
|
def __init__( |
|
self, |
|
dim, |
|
*, |
|
context_dim=None, |
|
dim_head=64, |
|
heads=12, |
|
parallel_ff=False, |
|
ff_mult=4, |
|
norm_context=False |
|
): |
|
super().__init__() |
|
self.heads = heads |
|
self.scale = dim_head ** -0.5 |
|
inner_dim = heads * dim_head |
|
context_dim = default(context_dim, dim) |
|
|
|
self.norm = LayerNorm(dim) |
|
self.context_norm = LayerNorm(context_dim) if norm_context else nn.Identity() |
|
|
|
self.to_q = nn.Linear(dim, inner_dim, bias=False) |
|
self.to_kv = nn.Linear(context_dim, dim_head * 2, bias=False) |
|
self.to_out = nn.Linear(inner_dim, dim, bias=False) |
|
|
|
|
|
|
|
ff_inner_dim = ff_mult * dim |
|
|
|
self.ff = nn.Sequential( |
|
nn.Linear(dim, ff_inner_dim * 2, bias=False), |
|
SwiGLU(), |
|
nn.Linear(ff_inner_dim, dim, bias=False) |
|
) if parallel_ff else None |
|
|
|
def forward(self, x, context, mask): |
|
""" |
|
einstein notation |
|
b - batch |
|
h - heads |
|
n, i, j - sequence length (base sequence length, source, target) |
|
d - feature dimension |
|
""" |
|
|
|
|
|
|
|
x = self.norm(x) |
|
context = self.context_norm(context) |
|
|
|
|
|
|
|
q = self.to_q(x) |
|
q = rearrange(q, 'b n (h d) -> b h n d', h = self.heads) |
|
|
|
|
|
|
|
q = q * self.scale |
|
|
|
|
|
|
|
k, v = self.to_kv(context).chunk(2, dim=-1) |
|
|
|
|
|
|
|
sim = einsum('b h i d, b j d -> b h i j', q, k) |
|
|
|
|
|
mask = mask.unsqueeze(1).repeat(1,self.heads,1,1) |
|
sim = sim + mask |
|
sim = sim - sim.amax(dim=-1, keepdim=True) |
|
attn = sim.softmax(dim=-1) |
|
|
|
|
|
|
|
out = einsum('b h i j, b j d -> b h i d', attn, v) |
|
|
|
|
|
|
|
out = rearrange(out, 'b h n d -> b n (h d)') |
|
out = self.to_out(out) |
|
|
|
|
|
|
|
if exists(self.ff): |
|
out = out + self.ff(x) |
|
|
|
return out |
|
|
|
|
|
class Cross_model(nn.Module): |
|
def __init__( |
|
self, |
|
dim=512, |
|
layer_num=4, |
|
dim_head=64, |
|
heads=8, |
|
ff_mult=4 |
|
): |
|
super().__init__() |
|
|
|
self.layers = nn.ModuleList([]) |
|
|
|
|
|
for ind in range(layer_num): |
|
self.layers.append(nn.ModuleList([ |
|
Residual(CrossAttention(dim=dim, dim_head=dim_head, heads=heads, parallel_ff=True, ff_mult=ff_mult)), |
|
Residual(ParallelTransformerBlock(dim=dim, dim_head=dim_head, heads=heads, ff_mult=ff_mult)) |
|
])) |
|
|
|
def forward( |
|
self, |
|
query_tokens, |
|
context_tokens, |
|
mask |
|
): |
|
|
|
for cross_attn, self_attn_ff in self.layers: |
|
query_tokens = cross_attn(query_tokens, context_tokens,mask) |
|
query_tokens = self_attn_ff(query_tokens) |
|
|
|
return query_tokens |
|
|