-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtrain.py
141 lines (107 loc) · 8.07 KB
/
train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import os
import numpy as np
import argparse
import time
import librosa
from preprocess import *
from model import CycleGAN
from hyparams import *
def train(train_A_dir, train_B_dir, model_dir, model_name, random_seed, validation_A_dir, output_dir, tensorboard_log_dir):
np.random.seed(random_seed)
print('Preprocessing Data...')
start_time = time.time()
wavs_A = load_wavs(wav_dir = train_A_dir, sr = sampling_rate)
wavs_B = load_wavs(wav_dir = train_B_dir, sr = sampling_rate)
f0s_A, timeaxes_A, sps_A, aps_A, coded_sps_A = world_encode_data(wavs = wavs_A, fs = sampling_rate, frame_period = frame_period, coded_dim = num_mcep)
f0s_B, timeaxes_B, sps_B, aps_B, coded_sps_B = world_encode_data(wavs = wavs_B, fs = sampling_rate, frame_period = frame_period, coded_dim = num_mcep)
log_f0s_mean_A, log_f0s_std_A = logf0_statistics(f0s_A)
log_f0s_mean_B, log_f0s_std_B = logf0_statistics(f0s_B)
print('Log Pitch A')
print('Mean: %f, Std: %f' %(log_f0s_mean_A, log_f0s_std_A))
print('Log Pitch B')
print('Mean: %f, Std: %f' %(log_f0s_mean_B, log_f0s_std_B))
coded_sps_A_transposed = transpose_in_list(lst = coded_sps_A)
coded_sps_B_transposed = transpose_in_list(lst = coded_sps_B)
coded_sps_A_norm, coded_sps_A_mean, coded_sps_A_std = coded_sps_normalization_fit_transoform(coded_sps = coded_sps_A_transposed)
print("Input data fixed.")
coded_sps_B_norm, coded_sps_B_mean, coded_sps_B_std = coded_sps_normalization_fit_transoform(coded_sps = coded_sps_B_transposed)
if not os.path.exists(model_dir):
os.makedirs(model_dir)
np.savez(os.path.join(model_dir, 'logf0s_normalization.npz'), mean_A = log_f0s_mean_A, std_A = log_f0s_std_A, mean_B = log_f0s_mean_B, std_B = log_f0s_std_B)
np.savez(os.path.join(model_dir, 'mcep_normalization.npz'), mean_A = coded_sps_A_mean, std_A = coded_sps_A_std, mean_B = coded_sps_B_mean, std_B = coded_sps_B_std)
if validation_A_dir is not None:
validation_A_output_dir = os.path.join(output_dir, 'converted_A')
if not os.path.exists(validation_A_output_dir):
os.makedirs(validation_A_output_dir)
end_time = time.time()
time_elapsed = end_time - start_time
print('Preprocessing Done.')
print('Time Elapsed for Data Preprocessing: %02d:%02d:%02d' % (time_elapsed // 3600, (time_elapsed % 3600 // 60), (time_elapsed % 60 // 1)))
model = CycleGAN(num_features = num_mcep)
for epoch in range(1,num_epochs+1):
print('Epoch: %d' % epoch)
start_time_epoch = time.time()
dataset_A, dataset_B = sample_train_data(dataset_A = coded_sps_A_norm, dataset_B = coded_sps_B_norm, n_frames = n_frames)
n_samples = dataset_A.shape[0]
global generator_learning_rate, generator_learning_rate_decay, \
discriminator_learning_rate, discriminator_learning_rate_decay, decay_threshold, \
lambda_cycle, lambda_identity, check_epoch
iter_range = n_samples // mini_batch_size
for i in range(iter_range):
num_iterations = iter_range * epoch + i
if epoch > decay_threshold/5:
lambda_identity = 0
if epoch > decay_threshold:
generator_learning_rate = max(0, generator_learning_rate - generator_learning_rate_decay)
discriminator_learning_rate = max(0, discriminator_learning_rate - discriminator_learning_rate_decay)
start = i * mini_batch_size
end = (i + 1) * mini_batch_size
generator_loss, discriminator_loss = model.train(input_A = dataset_A[start:end], input_B = dataset_B[start:end], lambda_cycle = lambda_cycle, lambda_identity = lambda_identity, generator_learning_rate = generator_learning_rate, discriminator_learning_rate = discriminator_learning_rate)
if i == 0:
print('Iteration: {:07d}, Generator Learning Rate: {:.7f}, Discriminator Learning Rate: {:.7f}, Generator Loss : {:.3f}, Discriminator Loss : {:.3f}'.format(num_iterations, generator_learning_rate, discriminator_learning_rate, generator_loss, discriminator_loss))
if epoch % check_epoch == 0:
model.save(directory = model_dir, filename = model_name)
end_time_epoch = time.time()
time_elapsed_epoch = end_time_epoch - start_time_epoch
print(f'Time Elapsed for This Epoch: {time_elapsed_epoch:.3f} sec')
# TEST CONVERSION
if validation_A_dir is not None:
if epoch % (check_epoch//2) == 0:
file_extension = f"-CONV-{epoch:04d}-EPOCH"
print('Generating Validation Data B from A...')
for file in os.listdir(validation_A_dir):
filepath = os.path.join(validation_A_dir, file)
wav, _ = librosa.load(filepath, sr = sampling_rate, mono = True)
wav = wav_padding(wav = wav, sr = sampling_rate, frame_period = frame_period, multiple = 4)
f0, timeaxis, sp, ap = world_decompose(wav = wav, fs = sampling_rate, frame_period = frame_period)
f0_converted = pitch_conversion(f0 = f0, mean_log_src = log_f0s_mean_A, std_log_src = log_f0s_std_A, mean_log_target = log_f0s_mean_B, std_log_target = log_f0s_std_B)
coded_sp = world_encode_spectral_envelop(sp = sp, fs = sampling_rate, dim = num_mcep)
coded_sp_transposed = coded_sp.T
coded_sp_norm = (coded_sp_transposed - coded_sps_A_mean) / coded_sps_A_std
coded_sp_converted_norm = model.test(inputs = np.array([coded_sp_norm]), direction = 'A2B')[0]
coded_sp_converted = coded_sp_converted_norm * coded_sps_B_std + coded_sps_B_mean
coded_sp_converted = coded_sp_converted.T
coded_sp_converted = np.ascontiguousarray(coded_sp_converted)
decoded_sp_converted = world_decode_spectral_envelop(coded_sp = coded_sp_converted, fs = sampling_rate)
wav_transformed = world_speech_synthesis(f0 = f0_converted, decoded_sp = decoded_sp_converted, ap = ap, fs = sampling_rate, frame_period = frame_period)
librosa.output.write_wav(os.path.join(validation_A_output_dir, os.path.splitext(os.path.basename(file))[0]+file_extension), wav_transformed, sampling_rate)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description = 'Train CycleGAN model for datasets.')
parser.add_argument('--train_A_dir', type = str, help = 'Directory for A.', default = train_A_dir_default)
parser.add_argument('--train_B_dir', type = str, help = 'Directory for B.', default = train_B_dir_default)
parser.add_argument('--model_dir', type = str, help = 'Directory for saving models.', default = model_dir_default)
parser.add_argument('--model_name', type = str, help = 'File name for saving model.', default = model_name_default)
parser.add_argument('--random_seed', type = int, help = 'Random seed for model training.', default = random_seed_default)
parser.add_argument('--validation_A_dir', type = str, help = 'Convert validation A after each training epoch. If set none, no conversion would be done during the training.', default = validation_A_dir_default)
parser.add_argument('--output_dir', type = str, help = 'Output directory for converted validation voices.', default = output_dir_default)
parser.add_argument('--tensorboard_log_dir', type = str, help = 'TensorBoard log directory.', default = tensorboard_log_dir_default)
argv = parser.parse_args()
train_A_dir = argv.train_A_dir
train_B_dir = argv.train_B_dir
model_dir = argv.model_dir
model_name = argv.model_name
random_seed = argv.random_seed
validation_A_dir = None if argv.validation_A_dir == 'None' or argv.validation_A_dir == 'none' else argv.validation_A_dir
output_dir = argv.output_dir
tensorboard_log_dir = argv.tensorboard_log_dir
train(train_A_dir = train_A_dir, train_B_dir = train_B_dir, model_dir = model_dir, model_name = model_name, random_seed = random_seed, validation_A_dir = validation_A_dir, output_dir = output_dir, tensorboard_log_dir = tensorboard_log_dir)