optimize real-time vc

This commit is contained in:
yxlllc
2023-12-26 00:23:36 +08:00
parent 78f03e7dc0
commit 3dec36568c
6 changed files with 211 additions and 135 deletions

159
gui_v1.py
View File

@@ -22,6 +22,26 @@ def printt(strr, *args):
print(strr % args)
def phase_vocoder(a, b, fade_out, fade_in):
window = torch.sqrt(fade_out * fade_in)
fa = torch.fft.rfft(a * window)
fb = torch.fft.rfft(b * window)
absab = torch.abs(fa) + torch.abs(fb)
n = a.shape[0]
if n % 2 == 0:
absab[1:-1] *= 2
else:
absab[1:] *= 2
phia = torch.angle(fa)
phib = torch.angle(fb)
deltaphase = phib - phia
deltaphase = deltaphase - 2 * np.pi * torch.floor(deltaphase / 2 / np.pi + 0.5)
w = 2 * np.pi * torch.arange(n // 2 + 1).to(a) + deltaphase
t = torch.arange(n).unsqueeze(-1).to(a) / n
result = a * (fade_out ** 2) + b * (fade_in ** 2) + torch.sum(absab * torch.cos(w * t + phia), -1) * window / n
return result
class Harvest(multiprocessing.Process):
def __init__(self, inp_q, opt_q):
multiprocessing.Process.__init__(self)
@@ -118,6 +138,8 @@ if __name__ == "__main__":
try:
with open("configs/config.json", "r") as j:
data = json.load(j)
data["sr_model"] = data["sr_type"] == "sr_model"
data["sr_device"] = data["sr_type"] == "sr_device"
data["pm"] = data["f0method"] == "pm"
data["harvest"] = data["f0method"] == "harvest"
data["crepe"] = data["f0method"] == "crepe"
@@ -134,6 +156,7 @@ if __name__ == "__main__":
"index_path": " ",
"sg_input_device": input_devices[sd.default.device[0]],
"sg_output_device": output_devices[sd.default.device[1]],
"sr_type": "sr_model",
"threhold": "-60",
"pitch": "0",
"index_rate": "0",
@@ -143,7 +166,10 @@ if __name__ == "__main__":
"extra_time": "2.5",
"f0method": "rmvpe",
"use_jit": False,
"use_pv": False,
}
data["sr_model"] = data["sr_type"] == "sr_model"
data["sr_device"] = data["sr_type"] == "sr_device"
data["pm"] = data["f0method"] == "pm"
data["harvest"] = data["f0method"] == "harvest"
data["crepe"] = data["f0method"] == "crepe"
@@ -207,7 +233,25 @@ if __name__ == "__main__":
default_value=data.get("sg_output_device", ""),
),
],
[sg.Button(i18n("重载设备列表"), key="reload_devices")],
[
sg.Button(i18n("重载设备列表"), key="reload_devices"),
sg.Radio(
i18n("使用模型采样率"),
"sr_type",
key="sr_model",
default=data.get("sr_model", True),
enable_events=True,
),
sg.Radio(
i18n("使用设备采样率"),
"sr_type",
key="sr_device",
default=data.get("sr_device", False),
enable_events=True,
),
sg.Text(i18n("采样率:")),
sg.Text("", key="sr_stream"),
],
],
title=i18n("音频设备(请使用同种类驱动)"),
)
@@ -222,7 +266,7 @@ if __name__ == "__main__":
key="threhold",
resolution=1,
orientation="h",
default_value=data.get("threhold", "-60"),
default_value=data.get("threhold", -60),
enable_events=True,
),
],
@@ -233,7 +277,7 @@ if __name__ == "__main__":
key="pitch",
resolution=1,
orientation="h",
default_value=data.get("pitch", "0"),
default_value=data.get("pitch", 0),
enable_events=True,
),
],
@@ -244,7 +288,7 @@ if __name__ == "__main__":
key="index_rate",
resolution=0.01,
orientation="h",
default_value=data.get("index_rate", "0"),
default_value=data.get("index_rate", 0),
enable_events=True,
),
],
@@ -255,7 +299,7 @@ if __name__ == "__main__":
key="rms_mix_rate",
resolution=0.01,
orientation="h",
default_value=data.get("rms_mix_rate", "0"),
default_value=data.get("rms_mix_rate", 0),
enable_events=True,
),
],
@@ -265,35 +309,35 @@ if __name__ == "__main__":
"pm",
"f0method",
key="pm",
default=data.get("pm", "") == True,
default=data.get("pm", False),
enable_events=True,
),
sg.Radio(
"harvest",
"f0method",
key="harvest",
default=data.get("harvest", "") == True,
default=data.get("harvest", False),
enable_events=True,
),
sg.Radio(
"crepe",
"f0method",
key="crepe",
default=data.get("crepe", "") == True,
default=data.get("crepe", False),
enable_events=True,
),
sg.Radio(
"rmvpe",
"f0method",
key="rmvpe",
default=data.get("rmvpe", "") == True,
default=data.get("rmvpe", False),
enable_events=True,
),
sg.Radio(
"fcpe",
"f0method",
key="fcpe",
default=data.get("fcpe", "") == True,
default=data.get("fcpe", True),
enable_events=True,
),
],
@@ -305,11 +349,11 @@ if __name__ == "__main__":
[
sg.Text(i18n("采样长度")),
sg.Slider(
range=(0.05, 2.4),
range=(0.02, 2.4),
key="block_time",
resolution=0.01,
orientation="h",
default_value=data.get("block_time", "0.25"),
default_value=data.get("block_time", 0.25),
enable_events=True,
),
],
@@ -320,7 +364,7 @@ if __name__ == "__main__":
# key="device_latency",
# resolution=0.001,
# orientation="h",
# default_value=data.get("device_latency", "0.1"),
# default_value=data.get("device_latency", 0.1),
# enable_events=True,
# ),
# ],
@@ -344,7 +388,7 @@ if __name__ == "__main__":
key="crossfade_length",
resolution=0.01,
orientation="h",
default_value=data.get("crossfade_length", "0.05"),
default_value=data.get("crossfade_length", 0.05),
enable_events=True,
),
],
@@ -355,7 +399,7 @@ if __name__ == "__main__":
key="extra_time",
resolution=0.01,
orientation="h",
default_value=data.get("extra_time", "2.5"),
default_value=data.get("extra_time", 2.5),
enable_events=True,
),
],
@@ -370,6 +414,12 @@ if __name__ == "__main__":
key="O_noise_reduce",
enable_events=True,
),
sg.Checkbox(
i18n("启用相位声码器"),
key="use_pv",
default=data.get("use_pv", False),
enable_events=True,
),
# sg.Checkbox(
# "JIT加速",
# default=self.config.use_jit,
@@ -443,6 +493,12 @@ if __name__ == "__main__":
"index_path": values["index_path"],
"sg_input_device": values["sg_input_device"],
"sg_output_device": values["sg_output_device"],
"sr_type": ["sr_model", "sr_device"][
[
values["sr_model"],
values["sr_device"],
].index(True)
],
"threhold": values["threhold"],
"pitch": values["pitch"],
"rms_mix_rate": values["rms_mix_rate"],
@@ -454,6 +510,7 @@ if __name__ == "__main__":
"n_cpu": values["n_cpu"],
# "use_jit": values["use_jit"],
"use_jit": False,
"use_pv": values["use_pv"],
"f0method": ["pm", "harvest", "crepe", "rmvpe", "fcpe"][
[
values["pm"],
@@ -477,6 +534,7 @@ if __name__ == "__main__":
)
if values["I_noise_reduce"]:
self.delay_time += values["crossfade_length"]
self.window["sr_stream"].update(self.gui_config.samplerate)
self.window["delay_time"].update(int(self.delay_time * 1000))
if event == "stop_vc" and self.flag_vc == True:
self.flag_vc = False
@@ -505,6 +563,8 @@ if __name__ == "__main__":
self.window["delay_time"].update(int(self.delay_time * 1000))
elif event == "O_noise_reduce":
self.gui_config.O_noise_reduce = values["O_noise_reduce"]
elif event == "use_pv":
self.gui_config.use_pv = values["use_pv"]
elif event in ["vc", "im"]:
self.function = event
elif event != "start_vc" and self.flag_vc == True:
@@ -531,6 +591,12 @@ if __name__ == "__main__":
# self.device_latency = values["device_latency"]
self.gui_config.pth_path = values["pth_path"]
self.gui_config.index_path = values["index_path"]
self.gui_config.sr_type = ["sr_model", "sr_device"][
[
values["sr_model"],
values["sr_device"],
].index(True)
]
self.gui_config.threhold = values["threhold"]
self.gui_config.pitch = values["pitch"]
self.gui_config.block_time = values["block_time"]
@@ -538,6 +604,7 @@ if __name__ == "__main__":
self.gui_config.extra_time = values["extra_time"]
self.gui_config.I_noise_reduce = values["I_noise_reduce"]
self.gui_config.O_noise_reduce = values["O_noise_reduce"]
self.gui_config.use_pv = values["use_pv"]
self.gui_config.rms_mix_rate = values["rms_mix_rate"]
self.gui_config.index_rate = values["index_rate"]
self.gui_config.n_cpu = values["n_cpu"]
@@ -566,8 +633,8 @@ if __name__ == "__main__":
self.config,
self.rvc if hasattr(self, "rvc") else None,
)
self.gui_config.samplerate = self.rvc.tgt_sr
self.zc = self.rvc.tgt_sr // 100
self.gui_config.samplerate = self.rvc.tgt_sr if self.gui_config.sr_type == "sr_model" else self.get_device_samplerate()
self.zc = self.gui_config.samplerate // 100
self.block_frame = (
int(
np.round(
@@ -589,6 +656,7 @@ if __name__ == "__main__":
)
* self.zc
)
self.sola_buffer_frame = min(self.crossfade_frame, 4 * self.zc)
self.sola_search_frame = self.zc
self.extra_frame = (
int(
@@ -622,14 +690,14 @@ if __name__ == "__main__":
dtype="float64",
)
self.sola_buffer: torch.Tensor = torch.zeros(
self.crossfade_frame, device=self.config.device, dtype=torch.float32
self.sola_buffer_frame, device=self.config.device, dtype=torch.float32
)
self.nr_buffer: torch.Tensor = self.sola_buffer.clone()
self.output_buffer: torch.Tensor = self.input_wav.clone()
self.res_buffer: torch.Tensor = torch.zeros(
2 * self.zc, device=self.config.device, dtype=torch.float32
)
self.valid_rate = 1 - (self.extra_frame - 1) / self.input_wav.shape[0]
self.skip_head = self.extra_frame // self.zc
self.fade_in_window: torch.Tensor = (
torch.sin(
0.5
@@ -637,7 +705,7 @@ if __name__ == "__main__":
* torch.linspace(
0.0,
1.0,
steps=self.crossfade_frame,
steps=self.sola_buffer_frame,
device=self.config.device,
dtype=torch.float32,
)
@@ -650,6 +718,14 @@ if __name__ == "__main__":
new_freq=16000,
dtype=torch.float32,
).to(self.config.device)
if self.rvc.tgt_sr != self.gui_config.samplerate:
self.resampler2 = tat.Resample(
orig_freq=self.rvc.tgt_sr,
new_freq=self.gui_config.samplerate,
dtype=torch.float32,
).to(self.config.device)
else:
self.resampler2 = None
self.tg = TorchGate(
sr=self.gui_config.samplerate, n_fft=4 * self.zc, prop_decrease=0.9
).to(self.config.device)
@@ -710,11 +786,11 @@ if __name__ == "__main__":
input_wav = self.tg(
input_wav.unsqueeze(0), self.input_wav.unsqueeze(0)
)[0, 2 * self.zc :]
input_wav[: self.crossfade_frame] *= self.fade_in_window
input_wav[: self.crossfade_frame] += (
input_wav[: self.sola_buffer_frame] *= self.fade_in_window
input_wav[: self.sola_buffer_frame] += (
self.nr_buffer * self.fade_out_window
)
self.nr_buffer[:] = input_wav[-self.crossfade_frame :]
self.nr_buffer[:] = input_wav[self.block_frame : self.block_frame + self.sola_buffer_frame]
input_wav = torch.cat(
(self.res_buffer[:], input_wav[: self.block_frame])
)
@@ -728,23 +804,16 @@ if __name__ == "__main__":
)[160:]
# infer
if self.function == "vc":
f0_extractor_frame = self.block_frame_16k + 800
if self.gui_config.f0method == "rmvpe":
f0_extractor_frame = (
5120 * ((f0_extractor_frame - 1) // 5120 + 1) - 160
)
infer_wav = self.rvc.infer(
self.input_wav_res,
self.input_wav_res[-f0_extractor_frame:].cpu().numpy(),
self.block_frame_16k,
self.valid_rate,
self.skip_head,
self.pitch,
self.pitchf,
self.gui_config.f0method,
)
infer_wav = infer_wav[
-self.crossfade_frame - self.sola_search_frame - self.block_frame :
]
if self.resampler2 is not None:
infer_wav = self.resampler2(infer_wav)
else:
infer_wav = self.input_wav[
-self.crossfade_frame - self.sola_search_frame - self.block_frame :
@@ -794,13 +863,13 @@ if __name__ == "__main__":
)
# SOLA algorithm from https://github.com/yxlllc/DDSP-SVC
conv_input = infer_wav[
None, None, : self.crossfade_frame + self.sola_search_frame
None, None, : self.sola_buffer_frame + self.sola_search_frame
]
cor_nom = F.conv1d(conv_input, self.sola_buffer[None, None, :])
cor_den = torch.sqrt(
F.conv1d(
conv_input**2,
torch.ones(1, 1, self.crossfade_frame, device=self.config.device),
torch.ones(1, 1, self.sola_buffer_frame, device=self.config.device),
)
+ 1e-8
)
@@ -813,9 +882,16 @@ if __name__ == "__main__":
infer_wav = infer_wav[
sola_offset : sola_offset + self.block_frame + self.crossfade_frame
]
infer_wav[: self.crossfade_frame] *= self.fade_in_window
infer_wav[: self.crossfade_frame] += self.sola_buffer * self.fade_out_window
self.sola_buffer[:] = infer_wav[-self.crossfade_frame :]
if "privateuseone" in str(self.config.device) or not self.gui_config.use_pv:
infer_wav[: self.sola_buffer_frame] *= self.fade_in_window
infer_wav[: self.sola_buffer_frame] += self.sola_buffer * self.fade_out_window
else:
infer_wav[: self.sola_buffer_frame] = phase_vocoder(
self.sola_buffer,
infer_wav[: self.sola_buffer_frame],
self.fade_out_window,
self.fade_in_window)
self.sola_buffer[:] = infer_wav[self.block_frame : self.block_frame + self.sola_buffer_frame]
if sys.platform == "darwin":
outdata[:] = (
infer_wav[: -self.crossfade_frame].cpu().numpy()[:, np.newaxis]
@@ -864,7 +940,7 @@ if __name__ == "__main__":
input_devices_indices,
output_devices_indices,
)
def set_devices(self, input_device, output_device):
"""设置输出设备"""
(
@@ -881,5 +957,8 @@ if __name__ == "__main__":
]
printt("Input device: %s:%s", str(sd.default.device[0]), input_device)
printt("Output device: %s:%s", str(sd.default.device[1]), output_device)
def get_device_samplerate(self):
return int(sd.query_devices(device=sd.default.device[0])['default_samplerate'])
gui = GUI()