chore(sync): merge dev into main (#1379)

* Optimize latency (#1259)

* add attribute:   configs/config.py
	Optimize latency:   tools/rvc_for_realtime.py

* new file:   assets/Synthesizer_inputs.pth

* fix:   configs/config.py
	fix:   tools/rvc_for_realtime.py

* fix bug:   infer/lib/infer_pack/models.py

* new file:   assets/hubert_inputs.pth
	new file:   assets/rmvpe_inputs.pth
	modified:   configs/config.py
	new features:   infer/lib/rmvpe.py
	new features:   tools/jit_export/__init__.py
	new features:   tools/jit_export/get_hubert.py
	new features:   tools/jit_export/get_rmvpe.py
	new features:   tools/jit_export/get_synthesizer.py
	optimize:   tools/rvc_for_realtime.py

* optimize:   tools/jit_export/get_synthesizer.py
	fix bug:   tools/jit_export/__init__.py

* Fixed a bug caused by using half on the CPU:   infer/lib/rmvpe.py
	Fixed a bug caused by using half on the CPU:   tools/jit_export/__init__.py
	Fixed CIRCULAR IMPORT:   tools/jit_export/get_rmvpe.py
	Fixed CIRCULAR IMPORT:   tools/jit_export/get_synthesizer.py
	Fixed a bug caused by using half on the CPU:   tools/rvc_for_realtime.py

* Remove useless code:   infer/lib/rmvpe.py

* Delete gui_v1 copy.py

* Delete .vscode/launch.json

* Delete jit_export_test.py

* Delete tools/rvc_for_realtime copy.py

* Delete configs/config.json

* Delete .gitignore

* Fix exceptions caused by switching inference devices:   infer/lib/rmvpe.py
	Fix exceptions caused by switching inference devices:   tools/jit_export/__init__.py
	Fix exceptions caused by switching inference devices:   tools/rvc_for_realtime.py

* restore

* replace(you can undo this commit)

* remove debug_print

---------

Co-authored-by: Ftps <ftpsflandre@gmail.com>

* Fixed some bugs when exporting ONNX model (#1254)

* fix import (#1280)

* fix import

* lint

* 🎨 同步 locale (#1242)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* Fix jit load and import issue (#1282)

* fix jit model loading :   infer/lib/rmvpe.py

* modified:   assets/hubert/.gitignore
	move file:    assets/hubert_inputs.pth -> assets/hubert/hubert_inputs.pth
	modified:   assets/rmvpe/.gitignore
	move file:    assets/rmvpe_inputs.pth -> assets/rmvpe/rmvpe_inputs.pth
	fix import:   gui_v1.py

* feat(workflow): trigger on dev

* feat(workflow): add close-pr on non-dev branch

* Add input wav and delay time monitor for real-time gui (#1293)

* feat(workflow): trigger on dev

* feat(workflow): add close-pr on non-dev branch

* 🎨 同步 locale (#1289)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* feat: edit PR template

* add input wav and delay time monitor

---------

Co-authored-by: 源文雨 <41315874+fumiama@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: RVC-Boss <129054828+RVC-Boss@users.noreply.github.com>

* Optimize latency using scripted jit (#1291)

* feat(workflow): trigger on dev

* feat(workflow): add close-pr on non-dev branch

* 🎨 同步 locale (#1289)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* feat: edit PR template

* Optimize-latency-using-scripted:   configs/config.py
	Optimize-latency-using-scripted:   infer/lib/infer_pack/attentions.py
	Optimize-latency-using-scripted:   infer/lib/infer_pack/commons.py
	Optimize-latency-using-scripted:   infer/lib/infer_pack/models.py
	Optimize-latency-using-scripted:   infer/lib/infer_pack/modules.py
	Optimize-latency-using-scripted:   infer/lib/jit/__init__.py
	Optimize-latency-using-scripted:   infer/lib/jit/get_hubert.py
	Optimize-latency-using-scripted:   infer/lib/jit/get_rmvpe.py
	Optimize-latency-using-scripted:   infer/lib/jit/get_synthesizer.py
	Optimize-latency-using-scripted:   infer/lib/rmvpe.py
	Optimize-latency-using-scripted:   tools/rvc_for_realtime.py

* modified:   infer/lib/infer_pack/models.py

* fix some bug:   configs/config.py
	fix some bug:   infer/lib/infer_pack/models.py
	fix some bug:   infer/lib/rmvpe.py

* Fixed abnormal reference of logger in multiprocessing:   infer/modules/train/train.py

---------

Co-authored-by: 源文雨 <41315874+fumiama@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* Format code (#1298)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* 🎨 同步 locale (#1299)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* feat: optimize actions

* feat(workflow): add sync dev

* feat: optimize actions

* feat: optimize actions

* feat: optimize actions

* feat: optimize actions

* feat: add jit options (#1303)

Delete useless code:   infer/lib/jit/get_synthesizer.py
	Optimized code:   tools/rvc_for_realtime.py

* Code refactor + re-design inference ui (#1304)

* Code refacor + re-design inference ui

* Fix tabname

* i18n jp

---------

Co-authored-by: Ftps <ftpsflandre@gmail.com>

* feat: optimize actions

* feat: optimize actions

* Update README & en_US locale file (#1309)

* critical: some bug fixes (#1322)

* JIT acceleration switch does not support hot update

* fix padding bug of rmvpe in torch-directml

* fix padding bug of rmvpe in torch-directml

* Fix STFT under torch_directml (#1330)

* chore(format): run black on dev (#1318)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* chore(i18n): sync locale on dev (#1317)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* feat: allow for tta to be passed to uvr (#1361)

* chore(format): run black on dev (#1373)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* Added script for automatically download all needed models at install (#1366)

* Delete modules.py

* Add files via upload

* Add files via upload

* Add files via upload

* Add files via upload

* chore(i18n): sync locale on dev (#1377)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* chore(format): run black on dev (#1376)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* Update IPEX library (#1362)

* Update IPEX library

* Update ipex index

* chore(format): run black on dev (#1378)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

---------

Co-authored-by: Chengjia Jiang <46401978+ChasonJiang@users.noreply.github.com>
Co-authored-by: Ftps <ftpsflandre@gmail.com>
Co-authored-by: shizuku_nia <102004222+ShizukuNia@users.noreply.github.com>
Co-authored-by: Ftps <63702646+Tps-F@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: 源文雨 <41315874+fumiama@users.noreply.github.com>
Co-authored-by: yxlllc <33565655+yxlllc@users.noreply.github.com>
Co-authored-by: RVC-Boss <129054828+RVC-Boss@users.noreply.github.com>
Co-authored-by: Blaise <133521603+blaise-tk@users.noreply.github.com>
Co-authored-by: Rice Cake <gak141808@gmail.com>
Co-authored-by: AWAS666 <33494149+AWAS666@users.noreply.github.com>
Co-authored-by: Dmitry <nda2911@yandex.ru>
Co-authored-by: Disty0 <47277141+Disty0@users.noreply.github.com>
This commit is contained in:
github-actions[bot]
2023-10-06 17:14:33 +08:00
committed by GitHub
parent fe166e7f3d
commit e9dd11bddb
42 changed files with 2014 additions and 1120 deletions

View File

@@ -1,5 +1,6 @@
import copy
import math
from typing import Optional
import numpy as np
import torch
@@ -22,11 +23,11 @@ class Encoder(nn.Module):
window_size=10,
**kwargs
):
super().__init__()
super(Encoder, self).__init__()
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
self.n_layers = n_layers
self.n_layers = int(n_layers)
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.window_size = window_size
@@ -61,14 +62,17 @@ class Encoder(nn.Module):
def forward(self, x, x_mask):
attn_mask = x_mask.unsqueeze(2) * x_mask.unsqueeze(-1)
x = x * x_mask
for i in range(self.n_layers):
y = self.attn_layers[i](x, x, attn_mask)
zippep = zip(
self.attn_layers, self.norm_layers_1, self.ffn_layers, self.norm_layers_2
)
for attn_layers, norm_layers_1, ffn_layers, norm_layers_2 in zippep:
y = attn_layers(x, x, attn_mask)
y = self.drop(y)
x = self.norm_layers_1[i](x + y)
x = norm_layers_1(x + y)
y = self.ffn_layers[i](x, x_mask)
y = ffn_layers(x, x_mask)
y = self.drop(y)
x = self.norm_layers_2[i](x + y)
x = norm_layers_2(x + y)
x = x * x_mask
return x
@@ -86,7 +90,7 @@ class Decoder(nn.Module):
proximal_init=True,
**kwargs
):
super().__init__()
super(Decoder, self).__init__()
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
@@ -172,7 +176,7 @@ class MultiHeadAttention(nn.Module):
proximal_bias=False,
proximal_init=False,
):
super().__init__()
super(MultiHeadAttention, self).__init__()
assert channels % n_heads == 0
self.channels = channels
@@ -213,19 +217,28 @@ class MultiHeadAttention(nn.Module):
self.conv_k.weight.copy_(self.conv_q.weight)
self.conv_k.bias.copy_(self.conv_q.bias)
def forward(self, x, c, attn_mask=None):
def forward(
self, x: torch.Tensor, c: torch.Tensor, attn_mask: Optional[torch.Tensor] = None
):
q = self.conv_q(x)
k = self.conv_k(c)
v = self.conv_v(c)
x, self.attn = self.attention(q, k, v, mask=attn_mask)
x, _ = self.attention(q, k, v, mask=attn_mask)
x = self.conv_o(x)
return x
def attention(self, query, key, value, mask=None):
def attention(
self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
mask: Optional[torch.Tensor] = None,
):
# reshape [b, d, t] -> [b, n_h, t, d_k]
b, d, t_s, t_t = (*key.size(), query.size(2))
b, d, t_s = key.size()
t_t = query.size(2)
query = query.view(b, self.n_heads, self.k_channels, t_t).transpose(2, 3)
key = key.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3)
value = value.view(b, self.n_heads, self.k_channels, t_s).transpose(2, 3)
@@ -292,16 +305,17 @@ class MultiHeadAttention(nn.Module):
ret = torch.matmul(x, y.unsqueeze(0).transpose(-2, -1))
return ret
def _get_relative_embeddings(self, relative_embeddings, length):
def _get_relative_embeddings(self, relative_embeddings, length: int):
max_relative_position = 2 * self.window_size + 1
# Pad first before slice to avoid using cond ops.
pad_length = max(length - (self.window_size + 1), 0)
pad_length: int = max(length - (self.window_size + 1), 0)
slice_start_position = max((self.window_size + 1) - length, 0)
slice_end_position = slice_start_position + 2 * length - 1
if pad_length > 0:
padded_relative_embeddings = F.pad(
relative_embeddings,
commons.convert_pad_shape([[0, 0], [pad_length, pad_length], [0, 0]]),
# commons.convert_pad_shape([[0, 0], [pad_length, pad_length], [0, 0]]),
[0, 0, pad_length, pad_length, 0, 0],
)
else:
padded_relative_embeddings = relative_embeddings
@@ -317,12 +331,18 @@ class MultiHeadAttention(nn.Module):
"""
batch, heads, length, _ = x.size()
# Concat columns of pad to shift from relative to absolute indexing.
x = F.pad(x, commons.convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, 1]]))
x = F.pad(
x,
# commons.convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, 1]])
[0, 1, 0, 0, 0, 0, 0, 0],
)
# Concat extra elements so to add up to shape (len+1, 2*len-1).
x_flat = x.view([batch, heads, length * 2 * length])
x_flat = F.pad(
x_flat, commons.convert_pad_shape([[0, 0], [0, 0], [0, length - 1]])
x_flat,
# commons.convert_pad_shape([[0, 0], [0, 0], [0, int(length) - 1]])
[0, int(length) - 1, 0, 0, 0, 0],
)
# Reshape and slice out the padded elements.
@@ -339,15 +359,21 @@ class MultiHeadAttention(nn.Module):
batch, heads, length, _ = x.size()
# padd along column
x = F.pad(
x, commons.convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, length - 1]])
x,
# commons.convert_pad_shape([[0, 0], [0, 0], [0, 0], [0, int(length) - 1]])
[0, int(length) - 1, 0, 0, 0, 0, 0, 0],
)
x_flat = x.view([batch, heads, length**2 + length * (length - 1)])
x_flat = x.view([batch, heads, int(length**2) + int(length * (length - 1))])
# add 0's in the beginning that will skew the elements after reshape
x_flat = F.pad(x_flat, commons.convert_pad_shape([[0, 0], [0, 0], [length, 0]]))
x_flat = F.pad(
x_flat,
# commons.convert_pad_shape([[0, 0], [0, 0], [int(length), 0]])
[length, 0, 0, 0, 0, 0],
)
x_final = x_flat.view([batch, heads, length, 2 * length])[:, :, :, 1:]
return x_final
def _attention_bias_proximal(self, length):
def _attention_bias_proximal(self, length: int):
"""Bias for self-attention to encourage attention to close positions.
Args:
length: an integer scalar.
@@ -367,10 +393,10 @@ class FFN(nn.Module):
filter_channels,
kernel_size,
p_dropout=0.0,
activation=None,
activation: str = None,
causal=False,
):
super().__init__()
super(FFN, self).__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.filter_channels = filter_channels
@@ -378,40 +404,56 @@ class FFN(nn.Module):
self.p_dropout = p_dropout
self.activation = activation
self.causal = causal
if causal:
self.padding = self._causal_padding
else:
self.padding = self._same_padding
self.is_activation = True if activation == "gelu" else False
# if causal:
# self.padding = self._causal_padding
# else:
# self.padding = self._same_padding
self.conv_1 = nn.Conv1d(in_channels, filter_channels, kernel_size)
self.conv_2 = nn.Conv1d(filter_channels, out_channels, kernel_size)
self.drop = nn.Dropout(p_dropout)
def forward(self, x, x_mask):
x = self.conv_1(self.padding(x * x_mask))
if self.activation == "gelu":
def padding(self, x: torch.Tensor, x_mask: torch.Tensor) -> torch.Tensor:
if self.causal:
padding = self._causal_padding(x * x_mask)
else:
padding = self._same_padding(x * x_mask)
return padding
def forward(self, x: torch.Tensor, x_mask: torch.Tensor):
x = self.conv_1(self.padding(x, x_mask))
if self.is_activation:
x = x * torch.sigmoid(1.702 * x)
else:
x = torch.relu(x)
x = self.drop(x)
x = self.conv_2(self.padding(x * x_mask))
x = self.conv_2(self.padding(x, x_mask))
return x * x_mask
def _causal_padding(self, x):
if self.kernel_size == 1:
return x
pad_l = self.kernel_size - 1
pad_r = 0
padding = [[0, 0], [0, 0], [pad_l, pad_r]]
x = F.pad(x, commons.convert_pad_shape(padding))
pad_l: int = self.kernel_size - 1
pad_r: int = 0
# padding = [[0, 0], [0, 0], [pad_l, pad_r]]
x = F.pad(
x,
# commons.convert_pad_shape(padding)
[pad_l, pad_r, 0, 0, 0, 0],
)
return x
def _same_padding(self, x):
if self.kernel_size == 1:
return x
pad_l = (self.kernel_size - 1) // 2
pad_r = self.kernel_size // 2
padding = [[0, 0], [0, 0], [pad_l, pad_r]]
x = F.pad(x, commons.convert_pad_shape(padding))
pad_l: int = (self.kernel_size - 1) // 2
pad_r: int = self.kernel_size // 2
# padding = [[0, 0], [0, 0], [pad_l, pad_r]]
x = F.pad(
x,
# commons.convert_pad_shape(padding)
[pad_l, pad_r, 0, 0, 0, 0],
)
return x

View File

@@ -1,3 +1,4 @@
from typing import List, Optional
import math
import numpy as np
@@ -16,10 +17,10 @@ def get_padding(kernel_size, dilation=1):
return int((kernel_size * dilation - dilation) / 2)
def convert_pad_shape(pad_shape):
l = pad_shape[::-1]
pad_shape = [item for sublist in l for item in sublist]
return pad_shape
# def convert_pad_shape(pad_shape):
# l = pad_shape[::-1]
# pad_shape = [item for sublist in l for item in sublist]
# return pad_shape
def kl_divergence(m_p, logs_p, m_q, logs_q):
@@ -113,10 +114,14 @@ def fused_add_tanh_sigmoid_multiply(input_a, input_b, n_channels):
return acts
def convert_pad_shape(pad_shape):
l = pad_shape[::-1]
pad_shape = [item for sublist in l for item in sublist]
return pad_shape
# def convert_pad_shape(pad_shape):
# l = pad_shape[::-1]
# pad_shape = [item for sublist in l for item in sublist]
# return pad_shape
def convert_pad_shape(pad_shape: List[List[int]]) -> List[int]:
return torch.tensor(pad_shape).flip(0).reshape(-1).int().tolist()
def shift_1d(x):
@@ -124,7 +129,7 @@ def shift_1d(x):
return x
def sequence_mask(length, max_length=None):
def sequence_mask(length: torch.Tensor, max_length: Optional[int] = None):
if max_length is None:
max_length = length.max()
x = torch.arange(max_length, dtype=length.dtype, device=length.device)

View File

@@ -1,5 +1,6 @@
import math
import logging
from typing import Optional
logger = logging.getLogger(__name__)
@@ -28,25 +29,32 @@ class TextEncoder256(nn.Module):
p_dropout,
f0=True,
):
super().__init__()
super(TextEncoder256, self).__init__()
self.out_channels = out_channels
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.p_dropout = float(p_dropout)
self.emb_phone = nn.Linear(256, hidden_channels)
self.lrelu = nn.LeakyReLU(0.1, inplace=True)
if f0 == True:
self.emb_pitch = nn.Embedding(256, hidden_channels) # pitch 256
self.encoder = attentions.Encoder(
hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
float(p_dropout),
)
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)
def forward(self, phone, pitch, lengths):
if pitch == None:
def forward(
self, phone: torch.Tensor, pitch: Optional[torch.Tensor], lengths: torch.Tensor
):
if pitch is None:
x = self.emb_phone(phone)
else:
x = self.emb_phone(phone) + self.emb_pitch(pitch)
@@ -75,25 +83,30 @@ class TextEncoder768(nn.Module):
p_dropout,
f0=True,
):
super().__init__()
super(TextEncoder768, self).__init__()
self.out_channels = out_channels
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.p_dropout = float(p_dropout)
self.emb_phone = nn.Linear(768, hidden_channels)
self.lrelu = nn.LeakyReLU(0.1, inplace=True)
if f0 == True:
self.emb_pitch = nn.Embedding(256, hidden_channels) # pitch 256
self.encoder = attentions.Encoder(
hidden_channels, filter_channels, n_heads, n_layers, kernel_size, p_dropout
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
float(p_dropout),
)
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)
def forward(self, phone, pitch, lengths):
if pitch == None:
def forward(self, phone: torch.Tensor, pitch: torch.Tensor, lengths: torch.Tensor):
if pitch is None:
x = self.emb_phone(phone)
else:
x = self.emb_phone(phone) + self.emb_pitch(pitch)
@@ -121,7 +134,7 @@ class ResidualCouplingBlock(nn.Module):
n_flows=4,
gin_channels=0,
):
super().__init__()
super(ResidualCouplingBlock, self).__init__()
self.channels = channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
@@ -145,19 +158,36 @@ class ResidualCouplingBlock(nn.Module):
)
self.flows.append(modules.Flip())
def forward(self, x, x_mask, g=None, reverse=False):
def forward(
self,
x: torch.Tensor,
x_mask: torch.Tensor,
g: Optional[torch.Tensor] = None,
reverse: bool = False,
):
if not reverse:
for flow in self.flows:
x, _ = flow(x, x_mask, g=g, reverse=reverse)
else:
for flow in reversed(self.flows):
x = flow(x, x_mask, g=g, reverse=reverse)
for flow in self.flows[::-1]:
x, _ = flow.forward(x, x_mask, g=g, reverse=reverse)
return x
def remove_weight_norm(self):
for i in range(self.n_flows):
self.flows[i * 2].remove_weight_norm()
def __prepare_scriptable__(self):
for i in range(self.n_flows):
for hook in self.flows[i * 2]._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.flows[i * 2])
return self
class PosteriorEncoder(nn.Module):
def __init__(
@@ -170,7 +200,7 @@ class PosteriorEncoder(nn.Module):
n_layers,
gin_channels=0,
):
super().__init__()
super(PosteriorEncoder, self).__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.hidden_channels = hidden_channels
@@ -189,7 +219,9 @@ class PosteriorEncoder(nn.Module):
)
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)
def forward(self, x, x_lengths, g=None):
def forward(
self, x: torch.Tensor, x_lengths: torch.Tensor, g: Optional[torch.Tensor] = None
):
x_mask = torch.unsqueeze(commons.sequence_mask(x_lengths, x.size(2)), 1).to(
x.dtype
)
@@ -203,6 +235,15 @@ class PosteriorEncoder(nn.Module):
def remove_weight_norm(self):
self.enc.remove_weight_norm()
def __prepare_scriptable__(self):
for hook in self.enc._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.enc)
return self
class Generator(torch.nn.Module):
def __init__(
@@ -252,7 +293,7 @@ class Generator(torch.nn.Module):
if gin_channels != 0:
self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1)
def forward(self, x, g=None):
def forward(self, x: torch.Tensor, g: Optional[torch.Tensor] = None):
x = self.conv_pre(x)
if g is not None:
x = x + self.cond(g)
@@ -273,6 +314,28 @@ class Generator(torch.nn.Module):
return x
def __prepare_scriptable__(self):
for l in self.ups:
for hook in l._forward_pre_hooks.values():
# The hook we want to remove is an instance of WeightNorm class, so
# normally we would do `if isinstance(...)` but this class is not accessible
# because of shadowing, so we check the module name directly.
# https://github.com/pytorch/pytorch/blob/be0ca00c5ce260eb5bcec3237357f7a30cc08983/torch/nn/utils/__init__.py#L3
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(l)
for l in self.resblocks:
for hook in l._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(l)
return self
def remove_weight_norm(self):
for l in self.ups:
remove_weight_norm(l)
@@ -293,7 +356,7 @@ class SineGen(torch.nn.Module):
voiced_thoreshold: F0 threshold for U/V classification (default 0)
flag_for_pulse: this SinGen is used inside PulseGen (default False)
Note: when flag_for_pulse is True, the first time step of a voiced
segment is always sin(np.pi) or cos(0)
segment is always sin(torch.pi) or cos(0)
"""
def __init__(
@@ -321,7 +384,7 @@ class SineGen(torch.nn.Module):
uv = uv.float()
return uv
def forward(self, f0, upp):
def forward(self, f0: torch.Tensor, upp: int):
"""sine_tensor, uv = forward(f0)
input F0: tensor(batchsize=1, length, dim=1)
f0 for unvoiced steps should be 0
@@ -333,7 +396,7 @@ class SineGen(torch.nn.Module):
f0_buf = torch.zeros(f0.shape[0], f0.shape[1], self.dim, device=f0.device)
# fundamental component
f0_buf[:, :, 0] = f0[:, :, 0]
for idx in np.arange(self.harmonic_num):
for idx in range(self.harmonic_num):
f0_buf[:, :, idx + 1] = f0_buf[:, :, 0] * (
idx + 2
) # idx + 2: the (idx+1)-th overtone, (idx+2)-th harmonic
@@ -347,12 +410,12 @@ class SineGen(torch.nn.Module):
tmp_over_one *= upp
tmp_over_one = F.interpolate(
tmp_over_one.transpose(2, 1),
scale_factor=upp,
scale_factor=float(upp),
mode="linear",
align_corners=True,
).transpose(2, 1)
rad_values = F.interpolate(
rad_values.transpose(2, 1), scale_factor=upp, mode="nearest"
rad_values.transpose(2, 1), scale_factor=float(upp), mode="nearest"
).transpose(
2, 1
) #######
@@ -361,12 +424,12 @@ class SineGen(torch.nn.Module):
cumsum_shift = torch.zeros_like(rad_values)
cumsum_shift[:, 1:, :] = tmp_over_one_idx * -1.0
sine_waves = torch.sin(
torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * np.pi
torch.cumsum(rad_values + cumsum_shift, dim=1) * 2 * torch.pi
)
sine_waves = sine_waves * self.sine_amp
uv = self._f02uv(f0)
uv = F.interpolate(
uv.transpose(2, 1), scale_factor=upp, mode="nearest"
uv.transpose(2, 1), scale_factor=float(upp), mode="nearest"
).transpose(2, 1)
noise_amp = uv * self.noise_std + (1 - uv) * self.sine_amp / 3
noise = noise_amp * torch.randn_like(sine_waves)
@@ -414,18 +477,19 @@ class SourceModuleHnNSF(torch.nn.Module):
# to merge source harmonics into a single excitation
self.l_linear = torch.nn.Linear(harmonic_num + 1, 1)
self.l_tanh = torch.nn.Tanh()
# self.ddtype:int = -1
def forward(self, x, upp=None):
if hasattr(self, "ddtype") == False:
self.ddtype = self.l_linear.weight.dtype
def forward(self, x: torch.Tensor, upp: int = 1):
# if self.ddtype ==-1:
# self.ddtype = self.l_linear.weight.dtype
sine_wavs, uv, _ = self.l_sin_gen(x, upp)
# print(x.dtype,sine_wavs.dtype,self.l_linear.weight.dtype)
# if self.is_half:
# sine_wavs = sine_wavs.half()
# sine_merge = self.l_tanh(self.l_linear(sine_wavs.to(x)))
# print(sine_wavs.dtype,self.ddtype)
if sine_wavs.dtype != self.ddtype:
sine_wavs = sine_wavs.to(self.ddtype)
# if sine_wavs.dtype != self.l_linear.weight.dtype:
sine_wavs = sine_wavs.to(dtype=self.l_linear.weight.dtype)
sine_merge = self.l_tanh(self.l_linear(sine_wavs))
return sine_merge, None, None # noise, uv
@@ -448,7 +512,7 @@ class GeneratorNSF(torch.nn.Module):
self.num_kernels = len(resblock_kernel_sizes)
self.num_upsamples = len(upsample_rates)
self.f0_upsamp = torch.nn.Upsample(scale_factor=np.prod(upsample_rates))
self.f0_upsamp = torch.nn.Upsample(scale_factor=math.prod(upsample_rates))
self.m_source = SourceModuleHnNSF(
sampling_rate=sr, harmonic_num=0, is_half=is_half
)
@@ -473,7 +537,7 @@ class GeneratorNSF(torch.nn.Module):
)
)
if i + 1 < len(upsample_rates):
stride_f0 = np.prod(upsample_rates[i + 1 :])
stride_f0 = math.prod(upsample_rates[i + 1 :])
self.noise_convs.append(
Conv1d(
1,
@@ -500,27 +564,36 @@ class GeneratorNSF(torch.nn.Module):
if gin_channels != 0:
self.cond = nn.Conv1d(gin_channels, upsample_initial_channel, 1)
self.upp = np.prod(upsample_rates)
self.upp = math.prod(upsample_rates)
def forward(self, x, f0, g=None):
self.lrelu_slope = modules.LRELU_SLOPE
def forward(self, x, f0, g: Optional[torch.Tensor] = None):
har_source, noi_source, uv = self.m_source(f0, self.upp)
har_source = har_source.transpose(1, 2)
x = self.conv_pre(x)
if g is not None:
x = x + self.cond(g)
for i in range(self.num_upsamples):
x = F.leaky_relu(x, modules.LRELU_SLOPE)
x = self.ups[i](x)
x_source = self.noise_convs[i](har_source)
x = x + x_source
xs = None
for j in range(self.num_kernels):
if xs is None:
xs = self.resblocks[i * self.num_kernels + j](x)
else:
xs += self.resblocks[i * self.num_kernels + j](x)
x = xs / self.num_kernels
# torch.jit.script() does not support direct indexing of torch modules
# That's why I wrote this
for i, (ups, noise_convs) in enumerate(zip(self.ups, self.noise_convs)):
if i < self.num_upsamples:
x = F.leaky_relu(x, self.lrelu_slope)
x = ups(x)
x_source = noise_convs(har_source)
x = x + x_source
xs: Optional[torch.Tensor] = None
l = [i * self.num_kernels + j for j in range(self.num_kernels)]
for j, resblock in enumerate(self.resblocks):
if j in l:
if xs is None:
xs = resblock(x)
else:
xs += resblock(x)
# This assertion cannot be ignored! \
# If ignored, it will cause torch.jit.script() compilation errors
assert isinstance(xs, torch.Tensor)
x = xs / self.num_kernels
x = F.leaky_relu(x)
x = self.conv_post(x)
x = torch.tanh(x)
@@ -532,6 +605,27 @@ class GeneratorNSF(torch.nn.Module):
for l in self.resblocks:
l.remove_weight_norm()
def __prepare_scriptable__(self):
for l in self.ups:
for hook in l._forward_pre_hooks.values():
# The hook we want to remove is an instance of WeightNorm class, so
# normally we would do `if isinstance(...)` but this class is not accessible
# because of shadowing, so we check the module name directly.
# https://github.com/pytorch/pytorch/blob/be0ca00c5ce260eb5bcec3237357f7a30cc08983/torch/nn/utils/__init__.py#L3
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(l)
for l in self.resblocks:
for hook in self.resblocks._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(l)
return self
sr2sr = {
"32k": 32000,
@@ -563,8 +657,8 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
sr,
**kwargs
):
super().__init__()
if type(sr) == type("strr"):
super(SynthesizerTrnMs256NSFsid, self).__init__()
if isinstance(sr, str):
sr = sr2sr[sr]
self.spec_channels = spec_channels
self.inter_channels = inter_channels
@@ -573,7 +667,7 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.p_dropout = float(p_dropout)
self.resblock = resblock
self.resblock_kernel_sizes = resblock_kernel_sizes
self.resblock_dilation_sizes = resblock_dilation_sizes
@@ -591,7 +685,7 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
n_heads,
n_layers,
kernel_size,
p_dropout,
float(p_dropout),
)
self.dec = GeneratorNSF(
inter_channels,
@@ -630,8 +724,42 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
self.flow.remove_weight_norm()
self.enc_q.remove_weight_norm()
def __prepare_scriptable__(self):
for hook in self.dec._forward_pre_hooks.values():
# The hook we want to remove is an instance of WeightNorm class, so
# normally we would do `if isinstance(...)` but this class is not accessible
# because of shadowing, so we check the module name directly.
# https://github.com/pytorch/pytorch/blob/be0ca00c5ce260eb5bcec3237357f7a30cc08983/torch/nn/utils/__init__.py#L3
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.dec)
for hook in self.flow._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.flow)
if hasattr(self, "enc_q"):
for hook in self.enc_q._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.enc_q)
return self
@torch.jit.ignore
def forward(
self, phone, phone_lengths, pitch, pitchf, y, y_lengths, ds
self,
phone: torch.Tensor,
phone_lengths: torch.Tensor,
pitch: torch.Tensor,
pitchf: torch.Tensor,
y: torch.Tensor,
y_lengths: torch.Tensor,
ds: Optional[torch.Tensor] = None,
): # 这里ds是id[bs,1]
# print(1,pitch.shape)#[bs,t]
g = self.emb_g(ds).unsqueeze(-1) # [b, 256, 1]##1是t广播的
@@ -647,15 +775,25 @@ class SynthesizerTrnMs256NSFsid(nn.Module):
o = self.dec(z_slice, pitchf, g=g)
return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q)
def infer(self, phone, phone_lengths, pitch, nsff0, sid, rate=None):
@torch.jit.export
def infer(
self,
phone: torch.Tensor,
phone_lengths: torch.Tensor,
pitch: torch.Tensor,
nsff0: torch.Tensor,
sid: torch.Tensor,
rate: Optional[torch.Tensor] = None,
):
g = self.emb_g(sid).unsqueeze(-1)
m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths)
z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask
if rate:
head = int(z_p.shape[2] * rate)
z_p = z_p[:, :, -head:]
x_mask = x_mask[:, :, -head:]
nsff0 = nsff0[:, -head:]
if rate is not None:
assert isinstance(rate, torch.Tensor)
head = int(z_p.shape[2] * (1 - rate.item()))
z_p = z_p[:, :, head:]
x_mask = x_mask[:, :, head:]
nsff0 = nsff0[:, head:]
z = self.flow(z_p, x_mask, g=g, reverse=True)
o = self.dec(z * x_mask, nsff0, g=g)
return o, x_mask, (z, z_p, m_p, logs_p)
@@ -684,8 +822,8 @@ class SynthesizerTrnMs768NSFsid(nn.Module):
sr,
**kwargs
):
super().__init__()
if type(sr) == type("strr"):
super(SynthesizerTrnMs768NSFsid, self).__init__()
if isinstance(sr, str):
sr = sr2sr[sr]
self.spec_channels = spec_channels
self.inter_channels = inter_channels
@@ -694,7 +832,7 @@ class SynthesizerTrnMs768NSFsid(nn.Module):
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.p_dropout = float(p_dropout)
self.resblock = resblock
self.resblock_kernel_sizes = resblock_kernel_sizes
self.resblock_dilation_sizes = resblock_dilation_sizes
@@ -712,7 +850,7 @@ class SynthesizerTrnMs768NSFsid(nn.Module):
n_heads,
n_layers,
kernel_size,
p_dropout,
float(p_dropout),
)
self.dec = GeneratorNSF(
inter_channels,
@@ -751,6 +889,33 @@ class SynthesizerTrnMs768NSFsid(nn.Module):
self.flow.remove_weight_norm()
self.enc_q.remove_weight_norm()
def __prepare_scriptable__(self):
for hook in self.dec._forward_pre_hooks.values():
# The hook we want to remove is an instance of WeightNorm class, so
# normally we would do `if isinstance(...)` but this class is not accessible
# because of shadowing, so we check the module name directly.
# https://github.com/pytorch/pytorch/blob/be0ca00c5ce260eb5bcec3237357f7a30cc08983/torch/nn/utils/__init__.py#L3
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.dec)
for hook in self.flow._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.flow)
if hasattr(self, "enc_q"):
for hook in self.enc_q._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.enc_q)
return self
@torch.jit.ignore
def forward(
self, phone, phone_lengths, pitch, pitchf, y, y_lengths, ds
): # 这里ds是id[bs,1]
@@ -768,15 +933,24 @@ class SynthesizerTrnMs768NSFsid(nn.Module):
o = self.dec(z_slice, pitchf, g=g)
return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q)
def infer(self, phone, phone_lengths, pitch, nsff0, sid, rate=None):
@torch.jit.export
def infer(
self,
phone: torch.Tensor,
phone_lengths: torch.Tensor,
pitch: torch.Tensor,
nsff0: torch.Tensor,
sid: torch.Tensor,
rate: Optional[torch.Tensor] = None,
):
g = self.emb_g(sid).unsqueeze(-1)
m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths)
z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask
if rate:
head = int(z_p.shape[2] * rate)
z_p = z_p[:, :, -head:]
x_mask = x_mask[:, :, -head:]
nsff0 = nsff0[:, -head:]
if rate is not None:
head = int(z_p.shape[2] * (1.0 - rate.item()))
z_p = z_p[:, :, head:]
x_mask = x_mask[:, :, head:]
nsff0 = nsff0[:, head:]
z = self.flow(z_p, x_mask, g=g, reverse=True)
o = self.dec(z * x_mask, nsff0, g=g)
return o, x_mask, (z, z_p, m_p, logs_p)
@@ -805,7 +979,7 @@ class SynthesizerTrnMs256NSFsid_nono(nn.Module):
sr=None,
**kwargs
):
super().__init__()
super(SynthesizerTrnMs256NSFsid_nono, self).__init__()
self.spec_channels = spec_channels
self.inter_channels = inter_channels
self.hidden_channels = hidden_channels
@@ -813,7 +987,7 @@ class SynthesizerTrnMs256NSFsid_nono(nn.Module):
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.p_dropout = float(p_dropout)
self.resblock = resblock
self.resblock_kernel_sizes = resblock_kernel_sizes
self.resblock_dilation_sizes = resblock_dilation_sizes
@@ -831,7 +1005,7 @@ class SynthesizerTrnMs256NSFsid_nono(nn.Module):
n_heads,
n_layers,
kernel_size,
p_dropout,
float(p_dropout),
f0=False,
)
self.dec = Generator(
@@ -869,6 +1043,33 @@ class SynthesizerTrnMs256NSFsid_nono(nn.Module):
self.flow.remove_weight_norm()
self.enc_q.remove_weight_norm()
def __prepare_scriptable__(self):
for hook in self.dec._forward_pre_hooks.values():
# The hook we want to remove is an instance of WeightNorm class, so
# normally we would do `if isinstance(...)` but this class is not accessible
# because of shadowing, so we check the module name directly.
# https://github.com/pytorch/pytorch/blob/be0ca00c5ce260eb5bcec3237357f7a30cc08983/torch/nn/utils/__init__.py#L3
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.dec)
for hook in self.flow._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.flow)
if hasattr(self, "enc_q"):
for hook in self.enc_q._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.enc_q)
return self
@torch.jit.ignore
def forward(self, phone, phone_lengths, y, y_lengths, ds): # 这里ds是id[bs,1]
g = self.emb_g(ds).unsqueeze(-1) # [b, 256, 1]##1是t广播的
m_p, logs_p, x_mask = self.enc_p(phone, None, phone_lengths)
@@ -880,14 +1081,22 @@ class SynthesizerTrnMs256NSFsid_nono(nn.Module):
o = self.dec(z_slice, g=g)
return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q)
def infer(self, phone, phone_lengths, sid, rate=None):
@torch.jit.export
def infer(
self,
phone: torch.Tensor,
phone_lengths: torch.Tensor,
sid: torch.Tensor,
rate: Optional[torch.Tensor] = None,
):
g = self.emb_g(sid).unsqueeze(-1)
m_p, logs_p, x_mask = self.enc_p(phone, None, phone_lengths)
z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask
if rate:
head = int(z_p.shape[2] * rate)
z_p = z_p[:, :, -head:]
x_mask = x_mask[:, :, -head:]
if rate is not None:
head = int(z_p.shape[2] * (1.0 - rate.item()))
z_p = z_p[:, :, head:]
x_mask = x_mask[:, :, head:]
nsff0 = nsff0[:, head:]
z = self.flow(z_p, x_mask, g=g, reverse=True)
o = self.dec(z * x_mask, g=g)
return o, x_mask, (z, z_p, m_p, logs_p)
@@ -916,7 +1125,7 @@ class SynthesizerTrnMs768NSFsid_nono(nn.Module):
sr=None,
**kwargs
):
super().__init__()
super(self, SynthesizerTrnMs768NSFsid_nono).__init__()
self.spec_channels = spec_channels
self.inter_channels = inter_channels
self.hidden_channels = hidden_channels
@@ -924,7 +1133,7 @@ class SynthesizerTrnMs768NSFsid_nono(nn.Module):
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.p_dropout = float(p_dropout)
self.resblock = resblock
self.resblock_kernel_sizes = resblock_kernel_sizes
self.resblock_dilation_sizes = resblock_dilation_sizes
@@ -942,7 +1151,7 @@ class SynthesizerTrnMs768NSFsid_nono(nn.Module):
n_heads,
n_layers,
kernel_size,
p_dropout,
float(p_dropout),
f0=False,
)
self.dec = Generator(
@@ -980,6 +1189,33 @@ class SynthesizerTrnMs768NSFsid_nono(nn.Module):
self.flow.remove_weight_norm()
self.enc_q.remove_weight_norm()
def __prepare_scriptable__(self):
for hook in self.dec._forward_pre_hooks.values():
# The hook we want to remove is an instance of WeightNorm class, so
# normally we would do `if isinstance(...)` but this class is not accessible
# because of shadowing, so we check the module name directly.
# https://github.com/pytorch/pytorch/blob/be0ca00c5ce260eb5bcec3237357f7a30cc08983/torch/nn/utils/__init__.py#L3
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.dec)
for hook in self.flow._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.flow)
if hasattr(self, "enc_q"):
for hook in self.enc_q._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.enc_q)
return self
@torch.jit.ignore
def forward(self, phone, phone_lengths, y, y_lengths, ds): # 这里ds是id[bs,1]
g = self.emb_g(ds).unsqueeze(-1) # [b, 256, 1]##1是t广播的
m_p, logs_p, x_mask = self.enc_p(phone, None, phone_lengths)
@@ -991,14 +1227,22 @@ class SynthesizerTrnMs768NSFsid_nono(nn.Module):
o = self.dec(z_slice, g=g)
return o, ids_slice, x_mask, y_mask, (z, z_p, m_p, logs_p, m_q, logs_q)
def infer(self, phone, phone_lengths, sid, rate=None):
@torch.jit.export
def infer(
self,
phone: torch.Tensor,
phone_lengths: torch.Tensor,
sid: torch.Tensor,
rate: Optional[torch.Tensor] = None,
):
g = self.emb_g(sid).unsqueeze(-1)
m_p, logs_p, x_mask = self.enc_p(phone, None, phone_lengths)
z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask
if rate:
head = int(z_p.shape[2] * rate)
z_p = z_p[:, :, -head:]
x_mask = x_mask[:, :, -head:]
if rate is not None:
head = int(z_p.shape[2] * (1.0 - rate.item()))
z_p = z_p[:, :, head:]
x_mask = x_mask[:, :, head:]
nsff0 = nsff0[:, head:]
z = self.flow(z_p, x_mask, g=g, reverse=True)
o = self.dec(z * x_mask, g=g)
return o, x_mask, (z, z_p, m_p, logs_p)

View File

@@ -551,7 +551,7 @@ class SynthesizerTrnMsNSFsidM(nn.Module):
gin_channels,
sr,
version,
**kwargs
**kwargs,
):
super().__init__()
if type(sr) == type("strr"):
@@ -621,10 +621,7 @@ class SynthesizerTrnMsNSFsidM(nn.Module):
self.emb_g = nn.Embedding(self.spk_embed_dim, gin_channels)
self.speaker_map = None
logger.debug(
"gin_channels: "
+ gin_channels
+ ", self.spk_embed_dim: "
+ self.spk_embed_dim
f"gin_channels: {gin_channels}, self.spk_embed_dim: {self.spk_embed_dim}"
)
def remove_weight_norm(self):

View File

@@ -1,5 +1,6 @@
import copy
import math
from typing import Optional, Tuple
import numpy as np
import scipy
@@ -18,7 +19,7 @@ LRELU_SLOPE = 0.1
class LayerNorm(nn.Module):
def __init__(self, channels, eps=1e-5):
super().__init__()
super(LayerNorm, self).__init__()
self.channels = channels
self.eps = eps
@@ -41,13 +42,13 @@ class ConvReluNorm(nn.Module):
n_layers,
p_dropout,
):
super().__init__()
super(ConvReluNorm, self).__init__()
self.in_channels = in_channels
self.hidden_channels = hidden_channels
self.out_channels = out_channels
self.kernel_size = kernel_size
self.n_layers = n_layers
self.p_dropout = p_dropout
self.p_dropout = float(p_dropout)
assert n_layers > 1, "Number of layers should be larger than 0."
self.conv_layers = nn.ModuleList()
@@ -58,7 +59,7 @@ class ConvReluNorm(nn.Module):
)
)
self.norm_layers.append(LayerNorm(hidden_channels))
self.relu_drop = nn.Sequential(nn.ReLU(), nn.Dropout(p_dropout))
self.relu_drop = nn.Sequential(nn.ReLU(), nn.Dropout(float(p_dropout)))
for _ in range(n_layers - 1):
self.conv_layers.append(
nn.Conv1d(
@@ -89,13 +90,13 @@ class DDSConv(nn.Module):
"""
def __init__(self, channels, kernel_size, n_layers, p_dropout=0.0):
super().__init__()
super(DDSConv, self).__init__()
self.channels = channels
self.kernel_size = kernel_size
self.n_layers = n_layers
self.p_dropout = p_dropout
self.p_dropout = float(p_dropout)
self.drop = nn.Dropout(p_dropout)
self.drop = nn.Dropout(float(p_dropout))
self.convs_sep = nn.ModuleList()
self.convs_1x1 = nn.ModuleList()
self.norms_1 = nn.ModuleList()
@@ -117,7 +118,7 @@ class DDSConv(nn.Module):
self.norms_1.append(LayerNorm(channels))
self.norms_2.append(LayerNorm(channels))
def forward(self, x, x_mask, g=None):
def forward(self, x, x_mask, g: Optional[torch.Tensor] = None):
if g is not None:
x = x + g
for i in range(self.n_layers):
@@ -149,11 +150,11 @@ class WN(torch.nn.Module):
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.gin_channels = gin_channels
self.p_dropout = p_dropout
self.p_dropout = float(p_dropout)
self.in_layers = torch.nn.ModuleList()
self.res_skip_layers = torch.nn.ModuleList()
self.drop = nn.Dropout(p_dropout)
self.drop = nn.Dropout(float(p_dropout))
if gin_channels != 0:
cond_layer = torch.nn.Conv1d(
@@ -184,15 +185,19 @@ class WN(torch.nn.Module):
res_skip_layer = torch.nn.utils.weight_norm(res_skip_layer, name="weight")
self.res_skip_layers.append(res_skip_layer)
def forward(self, x, x_mask, g=None, **kwargs):
def forward(
self, x: torch.Tensor, x_mask: torch.Tensor, g: Optional[torch.Tensor] = None
):
output = torch.zeros_like(x)
n_channels_tensor = torch.IntTensor([self.hidden_channels])
if g is not None:
g = self.cond_layer(g)
for i in range(self.n_layers):
x_in = self.in_layers[i](x)
for i, (in_layer, res_skip_layer) in enumerate(
zip(self.in_layers, self.res_skip_layers)
):
x_in = in_layer(x)
if g is not None:
cond_offset = i * 2 * self.hidden_channels
g_l = g[:, cond_offset : cond_offset + 2 * self.hidden_channels, :]
@@ -202,7 +207,7 @@ class WN(torch.nn.Module):
acts = commons.fused_add_tanh_sigmoid_multiply(x_in, g_l, n_channels_tensor)
acts = self.drop(acts)
res_skip_acts = self.res_skip_layers[i](acts)
res_skip_acts = res_skip_layer(acts)
if i < self.n_layers - 1:
res_acts = res_skip_acts[:, : self.hidden_channels, :]
x = (x + res_acts) * x_mask
@@ -219,6 +224,30 @@ class WN(torch.nn.Module):
for l in self.res_skip_layers:
torch.nn.utils.remove_weight_norm(l)
def __prepare_scriptable__(self):
if self.gin_channels != 0:
for hook in self.cond_layer._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.cond_layer)
for l in self.in_layers:
for hook in l._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(l)
for l in self.res_skip_layers:
for hook in l._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(l)
return self
class ResBlock1(torch.nn.Module):
def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)):
@@ -294,14 +323,15 @@ class ResBlock1(torch.nn.Module):
]
)
self.convs2.apply(init_weights)
self.lrelu_slope = LRELU_SLOPE
def forward(self, x, x_mask=None):
def forward(self, x: torch.Tensor, x_mask: Optional[torch.Tensor] = None):
for c1, c2 in zip(self.convs1, self.convs2):
xt = F.leaky_relu(x, LRELU_SLOPE)
xt = F.leaky_relu(x, self.lrelu_slope)
if x_mask is not None:
xt = xt * x_mask
xt = c1(xt)
xt = F.leaky_relu(xt, LRELU_SLOPE)
xt = F.leaky_relu(xt, self.lrelu_slope)
if x_mask is not None:
xt = xt * x_mask
xt = c2(xt)
@@ -316,6 +346,23 @@ class ResBlock1(torch.nn.Module):
for l in self.convs2:
remove_weight_norm(l)
def __prepare_scriptable__(self):
for l in self.convs1:
for hook in l._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(l)
for l in self.convs2:
for hook in l._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(l)
return self
class ResBlock2(torch.nn.Module):
def __init__(self, channels, kernel_size=3, dilation=(1, 3)):
@@ -345,10 +392,11 @@ class ResBlock2(torch.nn.Module):
]
)
self.convs.apply(init_weights)
self.lrelu_slope = LRELU_SLOPE
def forward(self, x, x_mask=None):
def forward(self, x, x_mask: Optional[torch.Tensor] = None):
for c in self.convs:
xt = F.leaky_relu(x, LRELU_SLOPE)
xt = F.leaky_relu(x, self.lrelu_slope)
if x_mask is not None:
xt = xt * x_mask
xt = c(xt)
@@ -361,9 +409,25 @@ class ResBlock2(torch.nn.Module):
for l in self.convs:
remove_weight_norm(l)
def __prepare_scriptable__(self):
for l in self.convs:
for hook in l._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(l)
return self
class Log(nn.Module):
def forward(self, x, x_mask, reverse=False, **kwargs):
def forward(
self,
x: torch.Tensor,
x_mask: torch.Tensor,
g: Optional[torch.Tensor] = None,
reverse: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
if not reverse:
y = torch.log(torch.clamp_min(x, 1e-5)) * x_mask
logdet = torch.sum(-y, [1, 2])
@@ -374,18 +438,27 @@ class Log(nn.Module):
class Flip(nn.Module):
def forward(self, x, *args, reverse=False, **kwargs):
# torch.jit.script() Compiled functions \
# can't take variable number of arguments or \
# use keyword-only arguments with defaults
def forward(
self,
x: torch.Tensor,
x_mask: torch.Tensor,
g: Optional[torch.Tensor] = None,
reverse: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
x = torch.flip(x, [1])
if not reverse:
logdet = torch.zeros(x.size(0)).to(dtype=x.dtype, device=x.device)
return x, logdet
else:
return x
return x, torch.zeros([1], device=x.device)
class ElementwiseAffine(nn.Module):
def __init__(self, channels):
super().__init__()
super(ElementwiseAffine, self).__init__()
self.channels = channels
self.m = nn.Parameter(torch.zeros(channels, 1))
self.logs = nn.Parameter(torch.zeros(channels, 1))
@@ -414,7 +487,7 @@ class ResidualCouplingLayer(nn.Module):
mean_only=False,
):
assert channels % 2 == 0, "channels should be divisible by 2"
super().__init__()
super(ResidualCouplingLayer, self).__init__()
self.channels = channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
@@ -429,14 +502,20 @@ class ResidualCouplingLayer(nn.Module):
kernel_size,
dilation_rate,
n_layers,
p_dropout=p_dropout,
p_dropout=float(p_dropout),
gin_channels=gin_channels,
)
self.post = nn.Conv1d(hidden_channels, self.half_channels * (2 - mean_only), 1)
self.post.weight.data.zero_()
self.post.bias.data.zero_()
def forward(self, x, x_mask, g=None, reverse=False):
def forward(
self,
x: torch.Tensor,
x_mask: torch.Tensor,
g: Optional[torch.Tensor] = None,
reverse: bool = False,
):
x0, x1 = torch.split(x, [self.half_channels] * 2, 1)
h = self.pre(x0) * x_mask
h = self.enc(h, x_mask, g=g)
@@ -455,11 +534,20 @@ class ResidualCouplingLayer(nn.Module):
else:
x1 = (x1 - m) * torch.exp(-logs) * x_mask
x = torch.cat([x0, x1], 1)
return x
return x, torch.zeros([1])
def remove_weight_norm(self):
self.enc.remove_weight_norm()
def __prepare_scriptable__(self):
for hook in self.enc._forward_pre_hooks.values():
if (
hook.__module__ == "torch.nn.utils.weight_norm"
and hook.__class__.__name__ == "WeightNorm"
):
torch.nn.utils.remove_weight_norm(self.enc)
return self
class ConvFlow(nn.Module):
def __init__(
@@ -471,7 +559,7 @@ class ConvFlow(nn.Module):
num_bins=10,
tail_bound=5.0,
):
super().__init__()
super(ConvFlow, self).__init__()
self.in_channels = in_channels
self.filter_channels = filter_channels
self.kernel_size = kernel_size
@@ -488,7 +576,13 @@ class ConvFlow(nn.Module):
self.proj.weight.data.zero_()
self.proj.bias.data.zero_()
def forward(self, x, x_mask, g=None, reverse=False):
def forward(
self,
x: torch.Tensor,
x_mask: torch.Tensor,
g: Optional[torch.Tensor] = None,
reverse=False,
):
x0, x1 = torch.split(x, [self.half_channels] * 2, 1)
h = self.pre(x0)
h = self.convs(h, x_mask, g=g)

163
infer/lib/jit/__init__.py Normal file
View File

@@ -0,0 +1,163 @@
from io import BytesIO
import pickle
import time
import torch
from tqdm import tqdm
from collections import OrderedDict
def load_inputs(path, device, is_half=False):
parm = torch.load(path, map_location=torch.device("cpu"))
for key in parm.keys():
parm[key] = parm[key].to(device)
if is_half and parm[key].dtype == torch.float32:
parm[key] = parm[key].half()
elif not is_half and parm[key].dtype == torch.float16:
parm[key] = parm[key].float()
return parm
def benchmark(
model, inputs_path, device=torch.device("cpu"), epoch=1000, is_half=False
):
parm = load_inputs(inputs_path, device, is_half)
total_ts = 0.0
bar = tqdm(range(epoch))
for i in bar:
start_time = time.perf_counter()
o = model(**parm)
total_ts += time.perf_counter() - start_time
print(f"num_epoch: {epoch} | avg time(ms): {(total_ts*1000)/epoch}")
def jit_warm_up(model, inputs_path, device=torch.device("cpu"), epoch=5, is_half=False):
benchmark(model, inputs_path, device, epoch=epoch, is_half=is_half)
def to_jit_model(
model_path,
model_type: str,
mode: str = "trace",
inputs_path: str = None,
device=torch.device("cpu"),
is_half=False,
):
model = None
if model_type.lower() == "synthesizer":
from .get_synthesizer import get_synthesizer
model, _ = get_synthesizer(model_path, device)
model.forward = model.infer
elif model_type.lower() == "rmvpe":
from .get_rmvpe import get_rmvpe
model = get_rmvpe(model_path, device)
elif model_type.lower() == "hubert":
from .get_hubert import get_hubert_model
model = get_hubert_model(model_path, device)
model.forward = model.infer
else:
raise ValueError(f"No model type named {model_type}")
model = model.eval()
model = model.half() if is_half else model.float()
if mode == "trace":
assert not inputs_path
inputs = load_inputs(inputs_path, device, is_half)
model_jit = torch.jit.trace(model, example_kwarg_inputs=inputs)
elif mode == "script":
model_jit = torch.jit.script(model)
model_jit.to(device)
model_jit = model_jit.half() if is_half else model_jit.float()
# model = model.half() if is_half else model.float()
return (model, model_jit)
def export(
model: torch.nn.Module,
mode: str = "trace",
inputs: dict = None,
device=torch.device("cpu"),
is_half: bool = False,
) -> dict:
model = model.half() if is_half else model.float()
model.eval()
if mode == "trace":
assert inputs is not None
model_jit = torch.jit.trace(model, example_kwarg_inputs=inputs)
elif mode == "script":
model_jit = torch.jit.script(model)
model_jit.to(device)
model_jit = model_jit.half() if is_half else model_jit.float()
buffer = BytesIO()
# model_jit=model_jit.cpu()
torch.jit.save(model_jit, buffer)
del model_jit
cpt = OrderedDict()
cpt["model"] = buffer.getvalue()
cpt["is_half"] = is_half
return cpt
def load(path: str):
with open(path, "rb") as f:
return pickle.load(f)
def save(ckpt: dict, save_path: str):
with open(save_path, "wb") as f:
pickle.dump(ckpt, f)
def rmvpe_jit_export(
model_path: str,
mode: str = "script",
inputs_path: str = None,
save_path: str = None,
device=torch.device("cpu"),
is_half=False,
):
if not save_path:
save_path = model_path.rstrip(".pth")
save_path += ".half.jit" if is_half else ".jit"
if "cuda" in str(device) and ":" not in str(device):
device = torch.device("cuda:0")
from .get_rmvpe import get_rmvpe
model = get_rmvpe(model_path, device)
inputs = None
if mode == "trace":
inputs = load_inputs(inputs_path, device, is_half)
ckpt = export(model, mode, inputs, device, is_half)
ckpt["device"] = str(device)
save(ckpt, save_path)
return ckpt
def synthesizer_jit_export(
model_path: str,
mode: str = "script",
inputs_path: str = None,
save_path: str = None,
device=torch.device("cpu"),
is_half=False,
):
if not save_path:
save_path = model_path.rstrip(".pth")
save_path += ".half.jit" if is_half else ".jit"
if "cuda" in str(device) and ":" not in str(device):
device = torch.device("cuda:0")
from .get_synthesizer import get_synthesizer
model, cpt = get_synthesizer(model_path, device)
assert isinstance(cpt, dict)
model.forward = model.infer
inputs = None
if mode == "trace":
inputs = load_inputs(inputs_path, device, is_half)
ckpt = export(model, mode, inputs, device, is_half)
cpt.pop("weight")
cpt["model"] = ckpt["model"]
cpt["device"] = device
save(cpt, save_path)
return cpt

342
infer/lib/jit/get_hubert.py Normal file
View File

@@ -0,0 +1,342 @@
import math
import random
from typing import Optional, Tuple
from fairseq.checkpoint_utils import load_model_ensemble_and_task
import numpy as np
import torch
import torch.nn.functional as F
# from fairseq.data.data_utils import compute_mask_indices
from fairseq.utils import index_put
# @torch.jit.script
def pad_to_multiple(x, multiple, dim=-1, value=0):
# Inspired from https://github.com/lucidrains/local-attention/blob/master/local_attention/local_attention.py#L41
if x is None:
return None, 0
tsz = x.size(dim)
m = tsz / multiple
remainder = math.ceil(m) * multiple - tsz
if int(tsz % multiple) == 0:
return x, 0
pad_offset = (0,) * (-1 - dim) * 2
return F.pad(x, (*pad_offset, 0, remainder), value=value), remainder
def extract_features(
self,
x,
padding_mask=None,
tgt_layer=None,
min_layer=0,
):
if padding_mask is not None:
x = index_put(x, padding_mask, 0)
x_conv = self.pos_conv(x.transpose(1, 2))
x_conv = x_conv.transpose(1, 2)
x = x + x_conv
if not self.layer_norm_first:
x = self.layer_norm(x)
# pad to the sequence length dimension
x, pad_length = pad_to_multiple(x, self.required_seq_len_multiple, dim=-2, value=0)
if pad_length > 0 and padding_mask is None:
padding_mask = x.new_zeros((x.size(0), x.size(1)), dtype=torch.bool)
padding_mask[:, -pad_length:] = True
else:
padding_mask, _ = pad_to_multiple(
padding_mask, self.required_seq_len_multiple, dim=-1, value=True
)
x = F.dropout(x, p=self.dropout, training=self.training)
# B x T x C -> T x B x C
x = x.transpose(0, 1)
layer_results = []
r = None
for i, layer in enumerate(self.layers):
dropout_probability = np.random.random() if self.layerdrop > 0 else 1
if not self.training or (dropout_probability > self.layerdrop):
x, (z, lr) = layer(
x, self_attn_padding_mask=padding_mask, need_weights=False
)
if i >= min_layer:
layer_results.append((x, z, lr))
if i == tgt_layer:
r = x
break
if r is not None:
x = r
# T x B x C -> B x T x C
x = x.transpose(0, 1)
# undo paddding
if pad_length > 0:
x = x[:, :-pad_length]
def undo_pad(a, b, c):
return (
a[:-pad_length],
b[:-pad_length] if b is not None else b,
c[:-pad_length],
)
layer_results = [undo_pad(*u) for u in layer_results]
return x, layer_results
def compute_mask_indices(
shape: Tuple[int, int],
padding_mask: Optional[torch.Tensor],
mask_prob: float,
mask_length: int,
mask_type: str = "static",
mask_other: float = 0.0,
min_masks: int = 0,
no_overlap: bool = False,
min_space: int = 0,
require_same_masks: bool = True,
mask_dropout: float = 0.0,
) -> torch.Tensor:
"""
Computes random mask spans for a given shape
Args:
shape: the the shape for which to compute masks.
should be of size 2 where first element is batch size and 2nd is timesteps
padding_mask: optional padding mask of the same size as shape, which will prevent masking padded elements
mask_prob: probability for each token to be chosen as start of the span to be masked. this will be multiplied by
number of timesteps divided by length of mask span to mask approximately this percentage of all elements.
however due to overlaps, the actual number will be smaller (unless no_overlap is True)
mask_type: how to compute mask lengths
static = fixed size
uniform = sample from uniform distribution [mask_other, mask_length*2]
normal = sample from normal distribution with mean mask_length and stdev mask_other. mask is min 1 element
poisson = sample from possion distribution with lambda = mask length
min_masks: minimum number of masked spans
no_overlap: if false, will switch to an alternative recursive algorithm that prevents spans from overlapping
min_space: only used if no_overlap is True, this is how many elements to keep unmasked between spans
require_same_masks: if true, will randomly drop out masks until same amount of masks remains in each sample
mask_dropout: randomly dropout this percentage of masks in each example
"""
bsz, all_sz = shape
mask = torch.full((bsz, all_sz), False)
all_num_mask = int(
# add a random number for probabilistic rounding
mask_prob * all_sz / float(mask_length)
+ torch.rand([1]).item()
)
all_num_mask = max(min_masks, all_num_mask)
mask_idcs = []
for i in range(bsz):
if padding_mask is not None:
sz = all_sz - padding_mask[i].long().sum().item()
num_mask = int(mask_prob * sz / float(mask_length) + np.random.rand())
num_mask = max(min_masks, num_mask)
else:
sz = all_sz
num_mask = all_num_mask
if mask_type == "static":
lengths = torch.full([num_mask], mask_length)
elif mask_type == "uniform":
lengths = torch.randint(mask_other, mask_length * 2 + 1, size=[num_mask])
elif mask_type == "normal":
lengths = torch.normal(mask_length, mask_other, size=[num_mask])
lengths = [max(1, int(round(x))) for x in lengths]
else:
raise Exception("unknown mask selection " + mask_type)
if sum(lengths) == 0:
lengths[0] = min(mask_length, sz - 1)
if no_overlap:
mask_idc = []
def arrange(s, e, length, keep_length):
span_start = torch.randint(low=s, high=e - length, size=[1]).item()
mask_idc.extend(span_start + i for i in range(length))
new_parts = []
if span_start - s - min_space >= keep_length:
new_parts.append((s, span_start - min_space + 1))
if e - span_start - length - min_space > keep_length:
new_parts.append((span_start + length + min_space, e))
return new_parts
parts = [(0, sz)]
min_length = min(lengths)
for length in sorted(lengths, reverse=True):
t = [e - s if e - s >= length + min_space else 0 for s, e in parts]
lens = torch.asarray(t, dtype=torch.int)
l_sum = torch.sum(lens)
if l_sum == 0:
break
probs = lens / torch.sum(lens)
c = torch.multinomial(probs.float(), len(parts)).item()
s, e = parts.pop(c)
parts.extend(arrange(s, e, length, min_length))
mask_idc = torch.asarray(mask_idc)
else:
min_len = min(lengths)
if sz - min_len <= num_mask:
min_len = sz - num_mask - 1
mask_idc = torch.asarray(
random.sample([i for i in range(sz - min_len)], num_mask)
)
mask_idc = torch.asarray(
[
mask_idc[j] + offset
for j in range(len(mask_idc))
for offset in range(lengths[j])
]
)
mask_idcs.append(torch.unique(mask_idc[mask_idc < sz]))
min_len = min([len(m) for m in mask_idcs])
for i, mask_idc in enumerate(mask_idcs):
if isinstance(mask_idc, torch.Tensor):
mask_idc = torch.asarray(mask_idc, dtype=torch.float)
if len(mask_idc) > min_len and require_same_masks:
mask_idc = torch.asarray(
random.sample([i for i in range(mask_idc)], min_len)
)
if mask_dropout > 0:
num_holes = int(round(len(mask_idc) * mask_dropout))
mask_idc = torch.asarray(
random.sample([i for i in range(mask_idc)], len(mask_idc) - num_holes)
)
mask[i, mask_idc.int()] = True
return mask
def apply_mask(self, x, padding_mask, target_list):
B, T, C = x.shape
torch.zeros_like(x)
if self.mask_prob > 0:
mask_indices = compute_mask_indices(
(B, T),
padding_mask,
self.mask_prob,
self.mask_length,
self.mask_selection,
self.mask_other,
min_masks=2,
no_overlap=self.no_mask_overlap,
min_space=self.mask_min_space,
)
mask_indices = mask_indices.to(x.device)
x[mask_indices] = self.mask_emb
else:
mask_indices = None
if self.mask_channel_prob > 0:
mask_channel_indices = compute_mask_indices(
(B, C),
None,
self.mask_channel_prob,
self.mask_channel_length,
self.mask_channel_selection,
self.mask_channel_other,
no_overlap=self.no_mask_channel_overlap,
min_space=self.mask_channel_min_space,
)
mask_channel_indices = (
mask_channel_indices.to(x.device).unsqueeze(1).expand(-1, T, -1)
)
x[mask_channel_indices] = 0
return x, mask_indices
def get_hubert_model(
model_path="assets/hubert/hubert_base.pt", device=torch.device("cpu")
):
models, _, _ = load_model_ensemble_and_task(
[model_path],
suffix="",
)
hubert_model = models[0]
hubert_model = hubert_model.to(device)
def _apply_mask(x, padding_mask, target_list):
return apply_mask(hubert_model, x, padding_mask, target_list)
hubert_model.apply_mask = _apply_mask
def _extract_features(
x,
padding_mask=None,
tgt_layer=None,
min_layer=0,
):
return extract_features(
hubert_model.encoder,
x,
padding_mask=padding_mask,
tgt_layer=tgt_layer,
min_layer=min_layer,
)
hubert_model.encoder.extract_features = _extract_features
hubert_model._forward = hubert_model.forward
def hubert_extract_features(
self,
source: torch.Tensor,
padding_mask: Optional[torch.Tensor] = None,
mask: bool = False,
ret_conv: bool = False,
output_layer: Optional[int] = None,
) -> Tuple[torch.Tensor, torch.Tensor]:
res = self._forward(
source,
padding_mask=padding_mask,
mask=mask,
features_only=True,
output_layer=output_layer,
)
feature = res["features"] if ret_conv else res["x"]
return feature, res["padding_mask"]
def _hubert_extract_features(
source: torch.Tensor,
padding_mask: Optional[torch.Tensor] = None,
mask: bool = False,
ret_conv: bool = False,
output_layer: Optional[int] = None,
) -> Tuple[torch.Tensor, torch.Tensor]:
return hubert_extract_features(
hubert_model, source, padding_mask, mask, ret_conv, output_layer
)
hubert_model.extract_features = _hubert_extract_features
def infer(source, padding_mask, output_layer: torch.Tensor):
output_layer = output_layer.item()
logits = hubert_model.extract_features(
source=source, padding_mask=padding_mask, output_layer=output_layer
)
feats = hubert_model.final_proj(logits[0]) if output_layer == 9 else logits[0]
return feats
hubert_model.infer = infer
# hubert_model.forward=infer
# hubert_model.forward
return hubert_model

View File

@@ -0,0 +1,12 @@
import torch
def get_rmvpe(model_path="assets/rmvpe/rmvpe.pt", device=torch.device("cpu")):
from infer.lib.rmvpe import E2E
model = E2E(4, 1, (2, 2))
ckpt = torch.load(model_path, map_location=device)
model.load_state_dict(ckpt)
model.eval()
model = model.to(device)
return model

View File

@@ -0,0 +1,37 @@
import torch
def get_synthesizer(pth_path, device=torch.device("cpu")):
from infer.lib.infer_pack.models import (
SynthesizerTrnMs256NSFsid,
SynthesizerTrnMs256NSFsid_nono,
SynthesizerTrnMs768NSFsid,
SynthesizerTrnMs768NSFsid_nono,
)
cpt = torch.load(pth_path, map_location=torch.device("cpu"))
# tgt_sr = cpt["config"][-1]
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0]
if_f0 = cpt.get("f0", 1)
version = cpt.get("version", "v1")
if version == "v1":
if if_f0 == 1:
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=False)
else:
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif version == "v2":
if if_f0 == 1:
net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=False)
else:
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"])
del net_g.enc_q
# net_g.forward = net_g.infer
# ckpt = {}
# ckpt["config"] = cpt["config"]
# ckpt["f0"] = if_f0
# ckpt["version"] = version
# ckpt["info"] = cpt.get("info", "0epoch")
net_g.load_state_dict(cpt["weight"], strict=False)
net_g = net_g.float()
net_g.eval().to(device)
return net_g, cpt

View File

@@ -1,8 +1,11 @@
import pdb, os
from io import BytesIO
import os
from typing import List, Optional, Tuple
import numpy as np
import torch
from infer.lib import jit
try:
# Fix "Torch not compiled with CUDA enabled"
import intel_extension_for_pytorch as ipex # pylint: disable=import-error, unused-import
@@ -11,7 +14,7 @@ try:
from infer.modules.ipex import ipex_init
ipex_init()
except Exception:
except Exception: # pylint: disable=broad-exception-caught
pass
import torch.nn as nn
import torch.nn.functional as F
@@ -23,58 +26,6 @@ import logging
logger = logging.getLogger(__name__)
###stft codes from https://github.com/pseeth/torch-stft/blob/master/torch_stft/util.py
def window_sumsquare(
window,
n_frames,
hop_length=200,
win_length=800,
n_fft=800,
dtype=np.float32,
norm=None,
):
"""
# from librosa 0.6
Compute the sum-square envelope of a window function at a given hop length.
This is used to estimate modulation effects induced by windowing
observations in short-time fourier transforms.
Parameters
----------
window : string, tuple, number, callable, or list-like
Window specification, as in `get_window`
n_frames : int > 0
The number of analysis frames
hop_length : int > 0
The number of samples to advance between frames
win_length : [optional]
The length of the window function. By default, this matches `n_fft`.
n_fft : int > 0
The length of each analysis frame.
dtype : np.dtype
The data type of the output
Returns
-------
wss : np.ndarray, shape=`(n_fft + hop_length * (n_frames - 1))`
The sum-squared envelope of the window function
"""
if win_length is None:
win_length = n_fft
n = n_fft + hop_length * (n_frames - 1)
x = np.zeros(n, dtype=dtype)
# Compute the squared window at the desired length
win_sq = get_window(window, win_length, fftbins=True)
win_sq = normalize(win_sq, norm=norm) ** 2
win_sq = pad_center(win_sq, n_fft)
# Fill the envelope
for i in range(n_frames):
sample = i * hop_length
x[sample : min(n, sample + n_fft)] += win_sq[: max(0, min(n_fft, n - sample))]
return x
class STFT(torch.nn.Module):
def __init__(
self, filter_length=1024, hop_length=512, win_length=None, window="hann"
@@ -101,17 +52,14 @@ class STFT(torch.nn.Module):
self.window = window
self.forward_transform = None
self.pad_amount = int(self.filter_length / 2)
scale = self.filter_length / self.hop_length
fourier_basis = np.fft.fft(np.eye(self.filter_length))
cutoff = int((self.filter_length / 2 + 1))
fourier_basis = np.vstack(
[np.real(fourier_basis[:cutoff, :]), np.imag(fourier_basis[:cutoff, :])]
)
forward_basis = torch.FloatTensor(fourier_basis[:, None, :])
inverse_basis = torch.FloatTensor(
np.linalg.pinv(scale * fourier_basis).T[:, None, :]
)
forward_basis = torch.FloatTensor(fourier_basis)
inverse_basis = torch.FloatTensor(np.linalg.pinv(fourier_basis))
assert filter_length >= self.win_length
# get window and zero center pad it to filter_length
@@ -121,12 +69,13 @@ class STFT(torch.nn.Module):
# window the bases
forward_basis *= fft_window
inverse_basis *= fft_window
inverse_basis = (inverse_basis.T * fft_window).T
self.register_buffer("forward_basis", forward_basis.float())
self.register_buffer("inverse_basis", inverse_basis.float())
self.register_buffer("fft_window", fft_window.float())
def transform(self, input_data):
def transform(self, input_data, return_phase=False):
"""Take input data (audio) to STFT domain.
Arguments:
@@ -138,33 +87,24 @@ class STFT(torch.nn.Module):
phase {tensor} -- Phase of STFT with shape (num_batch,
num_frequencies, num_frames)
"""
num_batches = input_data.shape[0]
num_samples = input_data.shape[-1]
self.num_samples = num_samples
# similar to librosa, reflect-pad the input
input_data = input_data.view(num_batches, 1, num_samples)
# print(1234,input_data.shape)
input_data = F.pad(
input_data.unsqueeze(1),
(self.pad_amount, self.pad_amount, 0, 0, 0, 0),
input_data,
(self.pad_amount, self.pad_amount),
mode="reflect",
).squeeze(1)
# print(2333,input_data.shape,self.forward_basis.shape,self.hop_length)
# pdb.set_trace()
forward_transform = F.conv1d(
input_data, self.forward_basis, stride=self.hop_length, padding=0
)
forward_transform = input_data.unfold(
1, self.filter_length, self.hop_length
).permute(0, 2, 1)
forward_transform = torch.matmul(self.forward_basis, forward_transform)
cutoff = int((self.filter_length / 2) + 1)
real_part = forward_transform[:, :cutoff, :]
imag_part = forward_transform[:, cutoff:, :]
magnitude = torch.sqrt(real_part**2 + imag_part**2)
# phase = torch.atan2(imag_part.data, real_part.data)
return magnitude # , phase
if return_phase:
phase = torch.atan2(imag_part.data, real_part.data)
return magnitude, phase
else:
return magnitude
def inverse(self, magnitude, phase):
"""Call the inverse STFT (iSTFT), given magnitude and phase tensors produced
@@ -180,42 +120,25 @@ class STFT(torch.nn.Module):
inverse_transform {tensor} -- Reconstructed audio given magnitude and phase. Of
shape (num_batch, num_samples)
"""
recombine_magnitude_phase = torch.cat(
cat = torch.cat(
[magnitude * torch.cos(phase), magnitude * torch.sin(phase)], dim=1
)
inverse_transform = F.conv_transpose1d(
recombine_magnitude_phase,
self.inverse_basis,
stride=self.hop_length,
padding=0,
fold = torch.nn.Fold(
output_size=(1, (cat.size(-1) - 1) * self.hop_length + self.filter_length),
kernel_size=(1, self.filter_length),
stride=(1, self.hop_length),
)
if self.window is not None:
window_sum = window_sumsquare(
self.window,
magnitude.size(-1),
hop_length=self.hop_length,
win_length=self.win_length,
n_fft=self.filter_length,
dtype=np.float32,
)
# remove modulation effects
approx_nonzero_indices = torch.from_numpy(
np.where(window_sum > tiny(window_sum))[0]
)
window_sum = torch.from_numpy(window_sum).to(inverse_transform.device)
inverse_transform[:, :, approx_nonzero_indices] /= window_sum[
approx_nonzero_indices
]
# scale by hop ratio
inverse_transform *= float(self.filter_length) / self.hop_length
inverse_transform = inverse_transform[..., self.pad_amount :]
inverse_transform = inverse_transform[..., : self.num_samples]
inverse_transform = inverse_transform.squeeze(1)
inverse_transform = torch.matmul(self.inverse_basis, cat)
inverse_transform = fold(inverse_transform)[
:, 0, 0, self.pad_amount : -self.pad_amount
]
window_square_sum = (
self.fft_window.pow(2).repeat(cat.size(-1), 1).T.unsqueeze(0)
)
window_square_sum = fold(window_square_sum)[
:, 0, 0, self.pad_amount : -self.pad_amount
]
inverse_transform /= window_square_sum
return inverse_transform
def forward(self, input_data):
@@ -228,7 +151,7 @@ class STFT(torch.nn.Module):
reconstruction {tensor} -- Reconstructed audio given magnitude and phase. Of
shape (num_batch, num_samples)
"""
self.magnitude, self.phase = self.transform(input_data)
self.magnitude, self.phase = self.transform(input_data, return_phase=True)
reconstruction = self.inverse(self.magnitude, self.phase)
return reconstruction
@@ -276,17 +199,15 @@ class ConvBlockRes(nn.Module):
nn.BatchNorm2d(out_channels, momentum=momentum),
nn.ReLU(),
)
# self.shortcut:Optional[nn.Module] = None
if in_channels != out_channels:
self.shortcut = nn.Conv2d(in_channels, out_channels, (1, 1))
self.is_shortcut = True
else:
self.is_shortcut = False
def forward(self, x):
if self.is_shortcut:
return self.conv(x) + self.shortcut(x)
else:
def forward(self, x: torch.Tensor):
if not hasattr(self, "shortcut"):
return self.conv(x) + x
else:
return self.conv(x) + self.shortcut(x)
class Encoder(nn.Module):
@@ -318,12 +239,12 @@ class Encoder(nn.Module):
self.out_size = in_size
self.out_channel = out_channels
def forward(self, x):
concat_tensors = []
def forward(self, x: torch.Tensor):
concat_tensors: List[torch.Tensor] = []
x = self.bn(x)
for i in range(self.n_encoders):
_, x = self.layers[i](x)
concat_tensors.append(_)
for i, layer in enumerate(self.layers):
t, x = layer(x)
concat_tensors.append(t)
return x, concat_tensors
@@ -342,8 +263,8 @@ class ResEncoderBlock(nn.Module):
self.pool = nn.AvgPool2d(kernel_size=kernel_size)
def forward(self, x):
for i in range(self.n_blocks):
x = self.conv[i](x)
for i, conv in enumerate(self.conv):
x = conv(x)
if self.kernel_size is not None:
return x, self.pool(x)
else:
@@ -364,8 +285,8 @@ class Intermediate(nn.Module): #
)
def forward(self, x):
for i in range(self.n_inters):
x = self.layers[i](x)
for i, layer in enumerate(self.layers):
x = layer(x)
return x
@@ -395,8 +316,8 @@ class ResDecoderBlock(nn.Module):
def forward(self, x, concat_tensor):
x = self.conv1(x)
x = torch.cat((x, concat_tensor), dim=1)
for i in range(self.n_blocks):
x = self.conv2[i](x)
for i, conv2 in enumerate(self.conv2):
x = conv2(x)
return x
@@ -412,9 +333,9 @@ class Decoder(nn.Module):
)
in_channels = out_channels
def forward(self, x, concat_tensors):
for i in range(self.n_decoders):
x = self.layers[i](x, concat_tensors[-1 - i])
def forward(self, x: torch.Tensor, concat_tensors: List[torch.Tensor]):
for i, layer in enumerate(self.layers):
x = layer(x, concat_tensors[-1 - i])
return x
@@ -442,7 +363,7 @@ class DeepUnet(nn.Module):
self.encoder.out_channel, en_de_layers, kernel_size, n_blocks
)
def forward(self, x):
def forward(self, x: torch.Tensor) -> torch.Tensor:
x, concat_tensors = self.encoder(x)
x = self.intermediate(x)
x = self.decoder(x, concat_tensors)
@@ -536,33 +457,28 @@ class MelSpectrogram(torch.nn.Module):
keyshift_key = str(keyshift) + "_" + str(audio.device)
if keyshift_key not in self.hann_window:
self.hann_window[keyshift_key] = torch.hann_window(win_length_new).to(
# "cpu"if(audio.device.type=="privateuseone") else audio.device
audio.device
)
# fft = torch.stft(#doesn't support pytorch_dml
# # audio.cpu() if(audio.device.type=="privateuseone")else audio,
# audio,
# n_fft=n_fft_new,
# hop_length=hop_length_new,
# win_length=win_length_new,
# window=self.hann_window[keyshift_key],
# center=center,
# return_complex=True,
# )
# magnitude = torch.sqrt(fft.real.pow(2) + fft.imag.pow(2))
# print(1111111111)
# print(222222222222222,audio.device,self.is_half)
if hasattr(self, "stft") == False:
# print(n_fft_new,hop_length_new,win_length_new,audio.shape)
self.stft = STFT(
filter_length=n_fft_new,
if "privateuseone" in str(audio.device):
if not hasattr(self, "stft"):
self.stft = STFT(
filter_length=n_fft_new,
hop_length=hop_length_new,
win_length=win_length_new,
window="hann",
).to(audio.device)
magnitude = self.stft.transform(audio)
else:
fft = torch.stft(
audio,
n_fft=n_fft_new,
hop_length=hop_length_new,
win_length=win_length_new,
window="hann",
).to(audio.device)
magnitude = self.stft.transform(audio) # phase
# if (audio.device.type == "privateuseone"):
# magnitude=magnitude.to(audio.device)
window=self.hann_window[keyshift_key],
center=center,
return_complex=True,
)
magnitude = torch.sqrt(fft.real.pow(2) + fft.imag.pow(2))
if keyshift != 0:
size = self.n_fft // 2 + 1
resize = magnitude.size(1)
@@ -573,17 +489,16 @@ class MelSpectrogram(torch.nn.Module):
if self.is_half == True:
mel_output = mel_output.half()
log_mel_spec = torch.log(torch.clamp(mel_output, min=self.clamp))
# print(log_mel_spec.device.type)
return log_mel_spec
class RMVPE:
def __init__(self, model_path, is_half, device=None):
def __init__(self, model_path: str, is_half, device=None, use_jit=False):
self.resample_kernel = {}
self.resample_kernel = {}
self.is_half = is_half
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
device = "cuda:0" if torch.cuda.is_available() else "cpu"
self.device = device
self.mel_extractor = MelSpectrogram(
is_half, 128, 16000, 1024, 160, None, 30, 8000
@@ -597,13 +512,56 @@ class RMVPE:
)
self.model = ort_session
else:
model = E2E(4, 1, (2, 2))
ckpt = torch.load(model_path, map_location="cpu")
model.load_state_dict(ckpt)
model.eval()
if is_half == True:
model = model.half()
self.model = model
if str(self.device) == "cuda":
self.device = torch.device("cuda:0")
def get_jit_model():
jit_model_path = model_path.rstrip(".pth")
jit_model_path += ".half.jit" if is_half else ".jit"
reload = False
if os.path.exists(jit_model_path):
ckpt = jit.load(jit_model_path)
model_device = ckpt["device"]
if model_device != str(self.device):
reload = True
else:
reload = True
if reload:
ckpt = jit.rmvpe_jit_export(
model_path=model_path,
mode="script",
inputs_path=None,
save_path=jit_model_path,
device=device,
is_half=is_half,
)
model = torch.jit.load(BytesIO(ckpt["model"]), map_location=device)
return model
def get_default_model():
model = E2E(4, 1, (2, 2))
ckpt = torch.load(model_path, map_location="cpu")
model.load_state_dict(ckpt)
model.eval()
if is_half:
model = model.half()
else:
model = model.float()
return model
if use_jit:
if is_half and "cpu" in str(self.device):
logger.warning(
"Use default rmvpe model. \
Jit is not supported on the CPU for half floating point"
)
self.model = get_default_model()
else:
self.model = get_jit_model()
else:
self.model = get_default_model()
self.model = self.model.to(device)
cents_mapping = 20 * np.arange(360) + 1997.3794084376191
self.cents_mapping = np.pad(cents_mapping, (4, 4)) # 368
@@ -611,9 +569,9 @@ class RMVPE:
def mel2hidden(self, mel):
with torch.no_grad():
n_frames = mel.shape[-1]
mel = F.pad(
mel, (0, 32 * ((n_frames - 1) // 32 + 1) - n_frames), mode="constant"
)
n_pad = 32 * ((n_frames - 1) // 32 + 1) - n_frames
if n_pad > 0:
mel = F.pad(mel, (0, n_pad), mode="constant")
if "privateuseone" in str(self.device):
onnx_input_name = self.model.get_inputs()[0].name
onnx_outputs_names = self.model.get_outputs()[0].name
@@ -622,6 +580,7 @@ class RMVPE:
input_feed={onnx_input_name: mel.cpu().numpy()},
)[0]
else:
mel = mel.half() if self.is_half else mel.float()
hidden = self.model(mel)
return hidden[:, :n_frames]