-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathduplicate_exact_videos.py
192 lines (151 loc) · 9.06 KB
/
duplicate_exact_videos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import os
import cv2
import time
import argparse
import numpy as np
from PIL import Image
from tqdm import tqdm
video_extensions = ['.mov', '.avi', '.mp4', '.wmv', '.flv', '.mkv']
# Calculate distances among arrays
def find_duplicates(video, thresh=1e-3):
"""
Function for finding pair of duplicates in an array.
As all samples combination pairs needs to be tested , I decided to use cosine distance among samples as it is fast to compute.
A threshold is used for deciding if samples are same or not. Found 1e-3 to work the best.
Parameters:
video (numpy array) : 2D array of shape (n_files, flattened_frame_pixels)
threshold for cosine distance (float) : threshold for deciding duplicate or not.
Returns:
bool numpy array of shape (n_files, n_files)
"""
video_unit = video / np.linalg.norm(video, ord=2, axis=1, keepdims=True) # Convert videos to unit vectors
video_distances = 1 - video_unit @ video_unit.T # Calculate cosine distance among videos
return video_distances < thresh # Using 0.0001 distance as threshold to call it a duplicate video
# Find videos with same number of frames
def find_duplicate_frames(n_frames):
m = n_frames.shape[0]
frame_duplicates = n_frames.reshape(1, -1).repeat(m, 0) - n_frames.reshape(-1, 1) # N, N - N, N = N, N
frame_duplicates = frame_duplicates == 0 # Idenitfy where number of frames are the same
return frame_duplicates
def check_folder(folder, compare_size):
"""
Function for reading all the files inside the folder and comparing them.
Parameters:
folder (directory_path) : Path to the folder that needs to be checked.
compare_size (int) : The first frame will be resized to this size for comparison.
Higher consumes memory and Low can lead to inaccurate results.
300 is a good spot which means every image will be resized to 300 X 300.
Returns:
array of the file path that are duplicate.
"""
print("Checking folder → " + folder)
files = os.listdir(folder) # Get all files in a folder
files.sort()
videos_name = []
all_videos = []
all_videos_length = []
to_delete = []
# Read all videos in the folder
time.sleep(0.1) # Otherwise TQDM's output are messed up
for file_name in tqdm(files):
extension_match = [extension in file_name.lower() for extension in video_extensions] # Filter file extensions to limit scope
if any(extension_match):
# Try to read each file as a video
try:
video = cv2.VideoCapture(os.path.join(folder, file_name)) # Read video
success, first_image = video.read() # Get the first frame
if first_image is not None:
video_length = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) # Get number of frames in the video
if video_length > 1: # Must have more than 1 frame to be a video
first_image = cv2.cvtColor(first_image, cv2.COLOR_BGR2GRAY) # Compatibility conversion
first_image = Image.fromarray(first_image).convert('L') # Convert to grayscale after reading
first_image = first_image.resize((compare_size, compare_size)) # Resize image to compare size
first_image = np.array(first_image).reshape(-1) # Convert 2D frame to 1D array for easier computation
videos_name.append(file_name)
all_videos.append(first_image)
all_videos_length.append(video_length)
except Exception as e:
print(f"Failed to read file {file_name} due to {e}")
time.sleep(0.1)
m = len(all_videos)
print("Total videos found = " + str(m))
if m < 2: # Duplicates not possible if the number of videos are 0 or 1.
print()
return to_delete # Return empty array
# Combine video arrays together and flag duplicates
all_videos = np.stack(all_videos)
print(f"Finding duplicate videos within the folder {folder}...")
videos_duplicates = find_duplicates(all_videos)
# Combine video lengths and flag videos with same length
all_videos_length = np.stack(all_videos_length)
frames_duplicates = find_duplicate_frames(all_videos_length)
# Combine video level and frame level duplicates
videos_duplicates = np.logical_and(videos_duplicates, frames_duplicates)
def video_size(video):
"""
Function to get file size to decide which video to keep
Parameters:
video : index of video
Returns:
file size
"""
return os.path.getsize(os.path.join(folder, videos_name[video]))
# Collect all flagged duplicates and decide which files to keep.
visited = [False] * m # To avoid unnecessary calls to duplicate videos
files_duplicates = [] # Store duplicates for each file
for i in tqdm(range(m)):
if visited[i]:
continue
visited[i] = True
video_duplicates = videos_duplicates[i] # Get files with duplicates flags for the current file
if sum(video_duplicates) > 1: # If more than 1 file similar to the current file (including current file)
duplicate_idx = np.where(video_duplicates)[0].tolist() # Get indexes of all duplicate videos
duplicate_idx.sort(key=video_size, reverse=args.keep_largest == 1) # Sort the list based on their size, Reverse is set based on which file size is preferred.
for j, idx in enumerate(duplicate_idx):
visited[idx] = True # Set visited of duplicate videos to True to avoid unnecessary calls
if j > 0:
to_delete.append(os.path.join(folder, images_name[idx])) # Add all duplicate files for deletion except the prefered first file
files_duplicates.append(duplicate_idx)
time.sleep(0.1)
# Display all duplicate files within the folder.
if len(files_duplicates) > 0:
print("\nDuplicates:")
for i in range(len(files_duplicates)):
for j in range(len(files_duplicates[i])):
print(videos_name[files_duplicates[i][j]], end="\t")
print()
else:
print("No duplicates found.")
print()
return to_delete
def main(args):
print(f"Folder to be explored: {args.folder}\n\n")
folders = [folder[0] for folder in os.walk(args.folder)] # Find all folders to be searched recursively
folders.sort()
all_deletes = [] # Stores all files to deleted
for folder in folders:
all_deletes += check_folder(folder, args.compare_size)
print('-----------------------------Overall Report----------------------------------')
if len(all_deletes) == 0:
print("No duplicates found.")
exit()
# Display all files marked for delete
print("\nFiles marked for delete:")
for file in all_deletes:
print(file)
# Prompt to decide whether to delete
print("Print Y to delete")
inp = input()
if inp.lower() == 'y':
for file in all_deletes:
os.remove(file)
print("Done.")
else:
print("Files not deleted.")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='find-exact-duplicate-videos')
parser.add_argument('--folder', type=str, default=".", help='Directory of videos. Default is current directory.')
parser.add_argument('--keep_largest', type=int, default=0, help='0: keeps the smallest file among duplicates; 1: keeps the largest file among duplicates')
parser.add_argument('--compare_size', type=int, default=300, help='Size used for comparison. Found 300 to be best for performance and results.')
args = parser.parse_args()
main(args)