lmzjms's picture
Upload 1162 files
0b32ad6 verified
raw
history blame
15.1 kB
"""
Speaker verification models
Authors:
* Haibin Wu 2022
"""
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from .pooling import (
AttentiveStatisticsPooling,
SelfAttentivePooling,
TemporalAveragePooling,
TemporalStatisticsPooling,
)
XVECTOR_TDNNS_LENGTH_REDUCTION = 14
ECAPA_TDNNS_LENGTH_REDUCTION = 0
__all__ = [
"TDNN",
"XVectorBackbone",
"ECAPA_TDNN",
"SpeakerEmbeddingExtractor",
"SuperbXvector",
]
class TDNN(nn.Module):
"""
TDNN as defined by https://www.danielpovey.com/files/2015_interspeech_multisplice.pdf.
Context size and dilation determine the frames selected
(although context size is not really defined in the traditional sense).
For example:
context size 5 and dilation 1 is equivalent to [-2,-1,0,1,2]
context size 3 and dilation 2 is equivalent to [-2, 0, 2]
context size 1 and dilation 1 is equivalent to [0]
Args:
input_size (int): The input feature size
output_size (int): The output feature size
context_size (int): See example
dilation (int): See example
dropout_p (float): (default, 0.0) The dropout rate
batch_norm (bool): (default, False) Use batch norm for TDNN layers
"""
def __init__(
self,
input_size: int,
output_size: int,
context_size: int,
dilation: int,
dropout_p: float = 0.0,
batch_norm: bool = True,
):
super().__init__()
self._indim = input_size
self._outdim = output_size
self.context_size = context_size
self.dilation = dilation
self.dropout_p = dropout_p
self.batch_norm = batch_norm
self.kernel = nn.Linear(input_size * context_size, output_size)
self.nonlinearity = nn.ReLU()
if batch_norm:
self.bn = nn.BatchNorm1d(output_size)
if dropout_p:
self.drop = nn.Dropout(p=dropout_p)
@property
def input_size(self) -> int:
return self._indim
@property
def output_size(self) -> int:
return self._outdim
def forward(self, x: torch.Tensor):
"""
Args:
x (torch.FloatTensor): (batch, seq_len, input_size)
Returns:
torch.FloatTensor: (batch, seq_len, output_size)
"""
_, _, d = x.shape
assert (
d == self.input_size
), "Input size was wrong. Expected ({}), got ({})".format(self.input_size, d)
x = x.unsqueeze(1)
# Unfold input into smaller temporal contexts
x = F.unfold(
x,
(self.context_size, self.input_size),
stride=(1, self.input_size),
dilation=(self.dilation, 1),
)
x = x.transpose(1, 2)
x = self.kernel(x)
x = self.nonlinearity(x)
if self.dropout_p:
x = self.drop(x)
if self.batch_norm:
x = x.transpose(1, 2)
x = self.bn(x)
x = x.transpose(1, 2)
return x
class XVectorBackbone(nn.Module):
"""
The TDNN layers the same as in https://danielpovey.com/files/2018_odyssey_xvector_lid.pdf.
Args:
input_size (int): The input feature size, usually is the output size of upstream models
output_size (int): (default, 1500) The size of the speaker embedding
dropout_p (float): (default, 0.0) The dropout rate
batch_norm (bool): (default, False) Use batch norm for TDNN layers
"""
def __init__(
self,
input_size: int,
output_size: int = 1500,
dropout_p: float = 0.0,
batch_norm: False = True,
):
super().__init__()
self._indim = input_size
self._outdim = output_size
self.module = nn.Sequential(
TDNN(
input_size=input_size,
output_size=512,
context_size=5,
dilation=1,
dropout_p=dropout_p,
batch_norm=batch_norm,
),
TDNN(
input_size=512,
output_size=512,
context_size=3,
dilation=2,
dropout_p=dropout_p,
batch_norm=batch_norm,
),
TDNN(
input_size=512,
output_size=512,
context_size=3,
dilation=3,
dropout_p=dropout_p,
batch_norm=batch_norm,
),
TDNN(
input_size=512,
output_size=512,
context_size=1,
dilation=1,
dropout_p=dropout_p,
batch_norm=batch_norm,
),
TDNN(
input_size=512,
output_size=output_size,
context_size=1,
dilation=1,
dropout_p=dropout_p,
batch_norm=batch_norm,
),
)
@property
def input_size(self) -> int:
return self._indim
@property
def output_size(self) -> int:
return self._outdim
def forward(self, x: torch.Tensor):
"""
Args:
x (torch.FloatTensor): (batch, seq_len, input_size)
output:
torch.FloatTensor: (batch, seq_len, output_size)
"""
x = self.module(x)
return x
"""
ECAPA-TDNN
"""
class _SEModule(nn.Module):
def __init__(self, channels, bottleneck=128):
super().__init__()
self.se = nn.Sequential(
nn.AdaptiveAvgPool1d(1),
nn.Conv1d(channels, bottleneck, kernel_size=1, padding=0),
nn.ReLU(),
nn.Conv1d(bottleneck, channels, kernel_size=1, padding=0),
nn.Sigmoid(),
)
def forward(self, input):
x = self.se(input)
return input * x
class _Bottle2neck(nn.Module):
def __init__(self, inplanes, planes, kernel_size=None, dilation=None, scale=8):
super().__init__()
width = int(math.floor(planes / scale))
self.conv1 = nn.Conv1d(inplanes, width * scale, kernel_size=1)
self.bn1 = nn.BatchNorm1d(width * scale)
self.nums = scale - 1
convs = []
bns = []
num_pad = math.floor(kernel_size / 2) * dilation
for i in range(self.nums):
convs.append(
nn.Conv1d(
width,
width,
kernel_size=kernel_size,
dilation=dilation,
padding=num_pad,
)
)
bns.append(nn.BatchNorm1d(width))
self.convs = nn.ModuleList(convs)
self.bns = nn.ModuleList(bns)
self.conv3 = nn.Conv1d(width * scale, planes, kernel_size=1)
self.bn3 = nn.BatchNorm1d(planes)
self.relu = nn.ReLU()
self.width = width
self.se = _SEModule(planes)
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.relu(out)
out = self.bn1(out)
spx = torch.split(out, self.width, 1)
for i in range(self.nums):
if i == 0:
sp = spx[i]
else:
sp = sp + spx[i]
sp = self.convs[i](sp)
sp = self.relu(sp)
sp = self.bns[i](sp)
if i == 0:
out = sp
else:
out = torch.cat((out, sp), 1)
out = torch.cat((out, spx[self.nums]), 1)
out = self.conv3(out)
out = self.relu(out)
out = self.bn3(out)
out = self.se(out)
out += residual
return out
class ECAPA_TDNN(nn.Module):
"""
ECAPA-TDNN model as in https://arxiv.org/abs/2005.07143.
Reference code: https://github.com/TaoRuijie/ECAPA-TDNN.
Args:
input_size (int): The input feature size, usually is the output size of upstream models
output_size (int): (default, 1536) The size of the speaker embedding
C (int): (default, 1024) The channel dimension
"""
def __init__(
self, input_size: int = 80, output_size: int = 1536, C: int = 1024, **kwargs
):
super().__init__()
self._indim = input_size
self._outdim = output_size
self.conv1 = nn.Conv1d(input_size, C, kernel_size=5, stride=1, padding=2)
self.relu = nn.ReLU()
self.bn1 = nn.BatchNorm1d(C)
self.layer1 = _Bottle2neck(C, C, kernel_size=3, dilation=2, scale=8)
self.layer2 = _Bottle2neck(C, C, kernel_size=3, dilation=3, scale=8)
self.layer3 = _Bottle2neck(C, C, kernel_size=3, dilation=4, scale=8)
self.layer4 = nn.Conv1d(3 * C, output_size, kernel_size=1)
@property
def input_size(self):
return self._indim
@property
def output_size(self):
return self._outdim
def forward(self, x: torch.FloatTensor):
"""
Args:
x (torch.FloatTensor): size (batch, seq_len, input_size)
Returns:
x (torch.FloatTensor): size (batch, seq_len, output_size)
"""
x = self.conv1(x.transpose(1, 2).contiguous())
x = self.relu(x)
x = self.bn1(x)
x1 = self.layer1(x)
x2 = self.layer2(x + x1)
x3 = self.layer3(x + x1 + x2)
x = self.layer4(torch.cat((x1, x2, x3), dim=1))
x = self.relu(x)
x = x.transpose(1, 2).contiguous()
return x
class SpeakerEmbeddingExtractor(nn.Module):
"""
The speaker embedding extractor module.
Args:
input_size (int): The input feature size, usually is the output size of upstream models
output_size (int): (default, 1500) The size of the speaker embedding
backbone (str): (default, XVector) Use which kind of speaker model
pooling_type (str): (default, TAP) Use which kind of pooling method
"""
def __init__(
self,
input_size: int,
output_size: int = 1500,
backbone: str = "XVector",
pooling_type: str = "TemporalAveragePooling",
):
super().__init__()
self._indim = input_size
self._outdim = output_size
if backbone == "XVector":
self.backbone = XVectorBackbone(
input_size=input_size, output_size=output_size
)
self.offset = XVECTOR_TDNNS_LENGTH_REDUCTION
elif backbone == "ECAPA-TDNN":
self.backbone = ECAPA_TDNN(input_size=input_size, output_size=output_size)
self.offset = ECAPA_TDNNS_LENGTH_REDUCTION
else:
raise ValueError("{} backbone type is not defined".format(backbone))
if pooling_type == "TemporalAveragePooling" or pooling_type == "TAP":
self.pooling = TemporalAveragePooling(self.backbone.output_size)
elif pooling_type == "TemporalStatisticsPooling" or pooling_type == "TSP":
self.pooling = TemporalStatisticsPooling(self.backbone.output_size)
elif pooling_type == "SelfAttentivePooling" or pooling_type == "SAP":
self.pooling = SelfAttentivePooling(self.backbone.output_size)
elif pooling_type == "AttentiveStatisticsPooling" or pooling_type == "ASP":
self.pooling = AttentiveStatisticsPooling(self.backbone.output_size)
else:
raise ValueError("{} pooling type is not defined".format(pooling_type))
self._outdim = self.pooling.output_size
@property
def input_size(self) -> int:
return self._indim
@property
def output_size(self) -> int:
return self._outdim
def forward(self, x: torch.Tensor, xlen: torch.LongTensor = None):
"""
Args:
x (torch.Tensor): size (batch, seq_len, input_size)
xlen (torch.LongTensor): size (batch, )
Returns:
x (torch.Tensor): size (batch, output_size)
"""
x = self.backbone(x)
if xlen is not None:
xlen = torch.LongTensor([max(item - self.offset, 0) for item in xlen])
else:
xlen = torch.LongTensor([x.shape[1]] * x.shape[0])
x = self.pooling(x, xlen)
return x
class _UtteranceExtractor(nn.Module):
def __init__(self, input_size, output_size):
super().__init__()
self._indim = input_size
self._outdim = output_size
self.linear1 = nn.Linear(input_size, output_size)
self.linear2 = nn.Linear(output_size, output_size)
self.act_fn = nn.ReLU()
@property
def input_size(self):
return self._indim
@property
def output_size(self):
return self._outdim
def forward(self, x_BxH):
hid_BxH = self.linear1(x_BxH)
hid_BxH = self.act_fn(hid_BxH)
if self.training:
hid_BxH = self.linear2(hid_BxH)
hid_BxH = self.act_fn(hid_BxH)
return hid_BxH
class SuperbXvector(nn.Module):
"""
The Xvector used in the SUPERB Benchmark with the exact default arguments.
Args:
input_size (int): The input feature size, usually is the output size of upstream models
output_size (int): (default, 512) The size of the speaker embedding
hidden_size (int): (default, 512) The major hidden size in the network
aggregation_size (int): (default, 1500) The output size of the x-vector, which is usually large
dropout_p (float): (default, 0.0) The dropout rate
batch_norm (bool): (default, False) Use batch norm for TDNN layers
"""
def __init__(
self,
input_size: int,
output_size: int = 512,
hidden_size: int = 512,
aggregation_size: int = 1500,
dropout_p: float = 0.0,
batch_norm: bool = False,
):
super().__init__()
self._input_size = input_size
self._output_size = output_size
self.projector = nn.Linear(input_size, hidden_size)
self.tdnns = XVectorBackbone(
hidden_size, aggregation_size, dropout_p=dropout_p, batch_norm=batch_norm
)
latest_size = self.tdnns.output_size
self.pooling = TemporalStatisticsPooling(latest_size)
latest_size = self.pooling.output_size
self.affine = _UtteranceExtractor(latest_size, output_size)
@property
def input_size(self) -> int:
return self._input_size
@property
def output_size(self) -> int:
return self._output_size
def forward(self, x, x_len):
"""
Args:
x (torch.FloatTensor): (batch_size, seq_len, input_size)
x_len (torch.LongTensor): (batch_size, )
Returns:
torch.FloatTensor: (batch_size, output_size)
"""
x = self.projector(x)
x = self.tdnns(x)
x_len = x_len - XVECTOR_TDNNS_LENGTH_REDUCTION
assert (
x_len <= 0
).sum() == 0, "The input sequence is too short for the X-vector model"
x = self.pooling(x, x_len)
x = self.affine(x)
return x