-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsplit.py
163 lines (130 loc) · 5.59 KB
/
split.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import os
import numpy as np
from pydub import AudioSegment
import scipy.io.wavfile as wavfile
import featureExtraction as FE
import trainAudio as TA
import re
def read_audio_file(input_file):
sampling_rate = -1
signal = np.array([])
try:
audiofile = AudioSegment.from_file(input_file)
data = np.array([])
if audiofile.sample_width == 2:
data = np.fromstring(audiofile._data, np.int16)
elif audiofile.sample_width == 4:
data = np.fromstring(audiofile._data, np.int32)
if data.size > 0:
sampling_rate = audiofile.frame_rate
temp_signal = []
for chn in list(range(audiofile.channels)):
temp_signal.append(data[chn::audiofile.channels])
signal = np.array(temp_signal).T
except:
print("Error: file not found or other I/O error. (DECODING FAILED)")
if signal.ndim == 2 and signal.shape[1] == 1:
signal = signal.flatten()
return sampling_rate, signal
def smooth_moving_avg(signal, window=11):
window = int(window)
if signal.ndim != 1:
raise ValueError("")
if signal.size < window:
raise ValueError("Input vector needs to be bigger than window size.")
if window < 3:
return signal
s = np.r_[2 * signal[0] - signal[window - 1::-1],
signal, 2 * signal[-1] - signal[-1:-window:-1]]
w = np.ones(window, 'd')
y = np.convolve(w / w.sum(), s, mode='same')
return y[window:-window + 1]
def silence_removal(signal, sampling_rate, st_win, st_step,smooth_window=0.5,weight=0.5):
if weight >= 1:
weight = 0.99
if weight <= 0:
weight = 0.01
# Step 1: feature extraction
#signal = stereo_to_mono(signal)
st_feats, _ = FE.feature_extraction(signal, sampling_rate,
st_win * sampling_rate,
st_step * sampling_rate)
# Step 2: train binary svm classifier of low vs high energy frames
# keep only the energy short-term sequence (2nd feature)
st_energy = st_feats[1, :]
en = np.sort(st_energy)
# number of 10% of the total short-term windows
st_windows_fraction = int(len(en) / 10)
# compute "lower" 10% energy threshold
low_threshold = np.mean(en[0:st_windows_fraction]) + 1e-15
# compute "higher" 10% energy threshold
high_threshold = np.mean(en[-st_windows_fraction:-1]) + 1e-15
# get all features that correspond to low energy
low_energy = st_feats[:, np.where(st_energy <= low_threshold)[0]]
# get all features that correspond to high energy
high_energy = st_feats[:, np.where(st_energy >= high_threshold)[0]]
# form the binary classification task and ...
features = [low_energy.T, high_energy.T]
# normalize and train the respective svm probabilistic model
# (ONSET vs SILENCE)
features_norm, mean, std = TA.normalize_features(features)
svm = TA.train_svm(features_norm, 1.0)
# Step 3: compute onset probability based on the trained svm
prob_on_set = []
for index in range(st_feats.shape[1]):
# for each frame
cur_fv = (st_feats[:, index] - mean) / std
# get svm probability (that it belongs to the ONSET class)
prob_on_set.append(svm.predict_proba(cur_fv.reshape(1, -1))[0][1])
prob_on_set = np.array(prob_on_set)
# smooth probability:
prob_on_set = smooth_moving_avg(prob_on_set, smooth_window / st_step)
# Step 4A: detect onset frame indices:
prog_on_set_sort = np.sort(prob_on_set)
# find probability Threshold as a weighted average
# of top 10% and lower 10% of the values
nt = int(prog_on_set_sort.shape[0] / 10)
threshold = (np.mean((1 - weight) * prog_on_set_sort[0:nt]) +
weight * np.mean(prog_on_set_sort[-nt::]))
max_indices = np.where(prob_on_set > threshold)[0]
# get the indices of the frames that satisfy the thresholding
index = 0
seg_limits = []
time_clusters = []
# Step 4B: group frame indices to onset segments
while index < len(max_indices):
# for each of the detected onset indices
cur_cluster = [max_indices[index]]
if index == len(max_indices) - 1:
break
while max_indices[index + 1] - cur_cluster[-1] <= 2:
cur_cluster.append(max_indices[index + 1])
index += 1
if index == len(max_indices) - 1:
break
index += 1
time_clusters.append(cur_cluster)
seg_limits.append([cur_cluster[0] * st_step,
cur_cluster[-1] * st_step])
# Step 5: Post process: remove very small segments:
min_duration = 0.2
seg_limits_2 = []
for s_lim in seg_limits:
if s_lim[1] - s_lim[0] > min_duration:
seg_limits_2.append(s_lim)
seg_limits = seg_limits_2
return seg_limits
def sortAudioFiles(data):
convert = lambda text: int(text) if text.isdigit() else text.lower()
alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
return sorted(data, key=alphanum_key)
def splitAudio(input_file,audio_directory,smoothing_window=1.0, weight=0.2):
if not os.path.isfile(input_file):
raise Exception("Input audio file not found!")
[fs, x] = read_audio_file(input_file)
segmentLimits = silence_removal(x, fs, 0.05, 0.05, smoothing_window,weight)
for i, s in enumerate(segmentLimits):
strOut = "{0:s}_{1:.3f}-{2:.3f}.wav".format(input_file[0:-4], s[0], s[1])
wavfile.write(strOut, fs, x[int(fs * s[0]):int(fs * s[1])])
audiofiles=sortAudioFiles(os.listdir(audio_directory))
return audiofiles