-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiolib.py
2752 lines (2157 loc) · 82.9 KB
/
iolib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# pylint: disable=C0302, dangerous-default-value, no-member,
# expression-not-assigned, locally-disabled, not-context-manager,
# redefined-builtin, consider-using-set-comprehension
"""My file input and output library, e.g. for _csv handling.
Also for general IO to the console"""
from __future__ import print_function as _print_function
from warnings import warn as _warn
from enum import Enum as _Enum
import inspect as _inspect
import stat as _stat
import pathlib as _pathlib
import os as _os
import os.path as _path
import shutil as _shutil
import errno as _errno
import csv as _csv
import glob as _glob
import itertools as _itertools
import time as _time
import string as _string
import tempfile as _tempfile
import subprocess as _subprocess
import sys as _sys
import datetime as _datetime
import pickle as _pickle
import copy as _copy
import platform as _platform
from contextlib import contextmanager as _contextmanager
import numpy as _numpy
import fuckit as _fuckit
import funclite.stringslib as _stringslib
from funclite.stringslib import pretty_date_now, pretty_date_time_now, pretty_date # noqa useful for generating file names when using funcs in this module
from funclite.stringslib import get_splits as filename_get_splits # noqa
from funclite.numericslib import round_normal as _rndnorm
from funclite.stopwatch import StopWatch as _StopWatch
import funclite.baselib as _baselib
_NOTEPADPP_PATH = 'C:\\Program Files (x86)\\Notepad++\\notepad++.exe'
def _var_get_name(var):
"""(Any)->str
Get name of var as string
Parameters
var: Any variable
Example:
>>> _var_get_name(var)
'var'
"""
# see https://stackoverflow.com/questions/18425225/getting-the-name-of-a-variable-as-a-string
callers_local_vars = _inspect.currentframe().f_back.f_locals.items()
return [var_name for var_name, var_val in callers_local_vars if var_val is var]
class CSVIo:
"""class for reading/writing _csv objects
can work standalone or as the backbone for CSVMatch"""
def __init__(self, filepath):
"""init"""
self.filepath = filepath
self.values = []
self.rows = []
self.read()
def read(self, val_funct=lambda val: val):
"""use val_funct to operate on all the values before as they are read in"""
with open(self.filepath, 'rU', encoding='utf-8') as f:
raw_csv = _csv.DictReader(f)
for row in raw_csv:
row = {key: val_funct(val) for key, val in row.items()}
self.rows.append(row)
self.values += row.values()
return
def save(self, filepath=None):
"""save"""
if not filepath:
filepath = self.filepath
with open(filepath, 'w', encoding='utf-8') as f:
writer = _csv.DictWriter(f, self.rows[0].keys()) # noqa
writer.writeheader()
for row in self.rows:
writer.writerow(row)
return
class CSVMatch(CSVIo):
"""CSVMatch class"""
def row_for_value(self, key, value):
"""returns a list of matching rows
key = the column name on the _csv
value = the value to match in that column
Returns None if no match
"""
if value or not value not in self.values:
return None
match = None
for row in self.rows:
if row[key] == value:
if match:
raise MultipleMatchError()
match = row
return match
def row_for_object(self, match_function, obj):
"""
like row_for_value, but allows for a more complicated match.
match_function takes three parameters (vals, row, object) and return true/false
Returns:
None if no match, else the returns the row
"""
for row in self.rows:
if match_function(row, obj):
return row
return None
class MultipleMatchError(RuntimeError):
"""helper"""
pass
class FileProcessTracker:
"""Manages recording of the processing status
of files. This can be used in processing
pipelines to skip files that have already
been processed.
It stores each file along with a status and
error message (if applicable) in a list which
is pickled to the file defined at instance creation.
A single status record is a 3-list:
list[0] = image path
list[1] = status (eProgressStatus value)
list[2] = error message (if relevant)
Args:
files_folder (str): folder with the files
Examples:
>>> T = FileProcessTracker('C:/temp', 'C:/temp/tracker.lst') # noqa
"""
# print('Initialising ProgressStatus...')
class eFileProcessStatus(_Enum):
"""progress status"""
NotProcessed = 0
Errored = 1
Success = 2
FileDoesNotExist = 3
class eListIndex(_Enum):
"""list index for progressstatus list"""
file_path = 0
status = 1
error = 2
def __init__(self, files_folder, pickle_file_path):
self.files_folder = _path.normpath(files_folder)
self._pickle_file_path = _path.normpath(pickle_file_path)
try:
if file_exists(self._pickle_file_path):
self._status_list = unpickle(self._pickle_file_path)
else:
self._status_list = []
except Exception as _:
_warn('Failed to load status file %s' % pickle_file_path)
def __repr__(self):
"""repr"""
return 'Status tracker for folder: %s\nStatus File:%s\n%s files tracked' % (self.files_folder, self._pickle_file_path, len(self._status_list) if self._status_list else 'None')
def save(self):
"""save status_list to the file system"""
pickle(self._status_list, self._pickle_file_path)
def get_file_status(self, file_path):
"""(str) -> Enum:FileProcessTracker.eFileProcessStatus
Get status of the file defined by file_path.
"""
files = [f[FileProcessTracker.eListIndex.file_path.value] for f in self._status_list]
if _path.normpath(file_path) in files:
return FileProcessTracker.eFileProcessStatus(self._status_list[files.index(file_path)][FileProcessTracker.eListIndex.status.value])
return FileProcessTracker.eFileProcessStatus.NotProcessed
def status_add(self, file_path, status=eFileProcessStatus.Success, err='', ignore_item_exists=False, save_=True):
"""(str, Enum, bool, bool) -> void
Add status for the file defined by file_path
Parameters:
file_path (str): file path
ignore_item_exists: adds the status to the list
status: The status to set
err: error to add (if required)
ignore_item_exists: raises ValueError if the item already exists and this is false
save_: picke the list after item added
"""
file_path = _path.normpath(file_path)
if self.get_file_status(file_path) == FileProcessTracker.eFileProcessStatus.NotProcessed:
self._status_list.append([file_path, status.value, err])
if save_:
self.save()
else:
if ignore_item_exists:
pass
else:
raise ValueError('Image "%s" is already in the processed list' % file_path)
def status_edit(self, file_path, status=eFileProcessStatus.Success, err='', ignore_no_item=True):
"""record image file as processed"""
file_path = _path.normpath(file_path)
files = [f[FileProcessTracker.eListIndex.file_path.value] for f in self._status_list]
i = None
if ignore_no_item:
try:
i = files.index(file_path)
except ValueError as _:
pass
else:
i = files.index(file_path) # noqa
files[i] = [file_path, status.value, err]
def status_del(self, file_path, ignore_no_item=True):
"""delete a status
Parameters:
file_path: the file to set
ignore_no_item: suppress erros if file_path not in the status list
"""
file_path = _path.normpath(file_path)
files = [f[self.eListIndex.file_path.value] for f in self._status_list]
if ignore_no_item:
i = files.index(file_path) if file_path in files else None
if i:
del self._status_list[i]
else:
i = files.index(file_path)
del self._status_list[i]
def clean(self, save=True):
"""(bool) -> void
Cleans the in-memory list of files
which are in the list, but not present
in the folder
Parameters:
save: save the in-memory list to disk
"""
new_lst = [s for s in self._status_list if file_exists(s[FileProcessTracker.eListIndex.file_path.value])]
self._status_list = new_lst
if save:
self.save()
class PickleHelper:
"""Pickle and unpickle an individual variable to the file system.
Instantiate a PickleHelper instance for each variable.
If the var is None, will attempt to load var from the file system
Args:
root (str): root folder to save pickle
pkl_file_name (str): filename of the pickle file (e.g. myfile.pkl)
var (any): the variable to handle
type_ (object): A type, used to check if the pkl has loaded (e.g. str, int, list, Pandas.DataFrame)
force_load_from_pkl (bool): force loading from the pickle even if var is not None
err_if_none (bool): raise an error if we couldn't get var, and var was none in the first place
Raises:
ValueError: If the variable to pickle is None and err_if_none == True
Notes:
Works on a deepcopy of var
Examples:
>>> lst = list([1,2,3])
>>> P = PickleHelper('c:/temp', 'list.pkl', lst)
>>> print(P.var)
"""
def __init__(self, root: str, pkl_file_name: str, var: any, type_, force_load_from_pkl: bool = False, err_if_none: bool = False):
self._root = root
self._pkl_file_name = pkl_file_name
self.var = _copy.deepcopy(var)
self._force_load = force_load_from_pkl
self._type = type_
self._dump_fqn = _path.normpath(_path.join(self._root, pkl_file_name))
self._load()
if err_if_none and self.var is None:
raise ValueError('PickleHelper variable was None and you asked to raise an error')
def _load(self):
"""load the var"""
if (self._force_load or self.var is None) and file_exists(self._dump_fqn):
self.var = unpickle(self._dump_fqn)
if not isinstance(self.var, self._type) and not self.var: # noqa
self.var = self._type() # noqa
self.loaded = isinstance(self.var, self._type) # noqa
def delete_pkl(self):
"""delete the pickle"""
file_delete(self._dump_fqn)
def dump(self, var=None):
"""Pickle the var to root/pkl_file_name.
Args:
var (any): Pickle the passed var instead of self.var
Raises:
ValueError: If self.type_ does not match the type of var (or self.var). So make sure you pass the right type when creating an instance.
Examples:
>>> lst = list([1,2,3])
>>> P = PickleHelper('c:/temp', 'list.pkl', lst)
>>> P.var.append(4) # add a 4 to P.var
>>> P.dump() # dump out P.var (i.e. [1,2,3,4] to c:/temp/list.pkl)
"""
if var is not None:
if not isinstance(var, self._type) and self._type is not None: # noqa
raise ValueError('Expected variable var to have type %s, got %s' % (type(self._type), type(var)))
self.var = var
pickle(self.var, self._dump_fqn)
# region _csv IO
def write_to_eof(filename, thetext):
"""(_string,_string) ->void
Write thetext to the end of the file given in filename.
"""
try:
with open(filename, 'a+', encoding='utf-8') as fid:
fid.write(thetext)
except Exception as _:
pass
def readcsv_as_dict(filename, first_row_as_key=True, error_on_dup_key=False):
"""(str,bool) -> dict
read a csv file as a dict
filename: file path
first_row_as_key:
True: first row contains dict keys. Subsequent rows are the values
False: the first column contains keys, subsequent columns contain values, no header row is assumed.
if the key column (first) contains duplicate values, rows containing the duplicate key
will be skipped
Example:
"""
result = {}
filename = _path.normpath(filename)
with open(filename, encoding='utf-8') as csvfile:
reader = _csv.DictReader(csvfile, skipinitialspace=True)
if first_row_as_key:
if error_on_dup_key and len(reader.fieldnames) > len(set(reader.fieldnames)):
raise ValueError('First row had duplicate values, which is a duplicate key condition')
result = {name: [] for name in reader.fieldnames}
for row in reader:
for name in reader.fieldnames:
result[name].append(row[name])
else:
csv_list = [[val.strip() for val in r.split(",")] for r in csvfile.readlines()]
(_, *header), *data = csv_list
for row in data:
key, *values = row
if key not in result:
result[key] = {key: value for key, value in zip(header, values)}
else:
if error_on_dup_key: raise ValueError('First column had duplicate values, which is a duplicate key condition')
return result
def readcsv(filename, cols=1, startrow=0, numericdata=False, error_on_no_file=True):
"""(string, int, bool, int, bool, bool) -> list
Reads a _csv file into a list.
cols:Number of columns to retrieve, DOES NOT support any fancy indexing
start_row: row to start reading from. 0 is the first row
numericdata: force all data to be a number, raises error if any non-numeric encountered
error_on_no_file: Raise error if file filename does not exist, else return empty list
Example:
a, b, c
1, 2, 3
10, 11, 12
readcsv(fname, 1, 1)
[[1, 10]]
a, b, c
1, 2, 3
10, 11, 12
readcsv(fname, 2, 0)
[[a, 1, 10], [b, 2, 11]]
"""
filename = _path.normpath(filename)
if not file_exists(filename) and not error_on_no_file:
return []
data = [0] * cols
for i in range(cols):
data[i] = [] # noqa
if _sys.version_info.major == 2:
with open(filename, 'rb') as csvfile: # open the file, and iterate over its data
csvdata = _csv.reader(csvfile) # tell python that the file is a _csv
for i in range(0, startrow): # skip to the startrow
next(csvdata)
for row in csvdata: # iterate over the rows in the _csv
# Assign the cols of each row to a variable
for items in range(cols): # read in the text values as floats in the array
if numericdata:
data[items].append(float(row[items])) # noqa
else:
data[items].append(row[items]) # noqa
elif _sys.version_info.major == 3:
with open(filename, newline='', encoding='utf-8') as csvfile: # open the file, and iterate over its data
csvdata = _csv.reader(csvfile) # tell python that the file is a _csv
for i in range(0, startrow): # skip to the startrow
next(csvdata)
for row in csvdata: # iterate over the rows in the _csv
# Assign the cols of each row to a variable
for items in range(cols): # read in the text values as floats in the array
if numericdata:
data[items].append(float(row[items])) # noqa
else:
data[items].append(row[items]) # noqa
else:
_sys.stderr.write('You need to use python 2* or 3* \n')
exit()
return data
def readcsv_by_row(fname: str, skip_first_n: int = 0) -> list:
"""read a csv file into a list of lists
where list item is a row
Args:
fname (str): filename
skip_first_n (int): skip first n rows
Returns:
2n list of lists
Examples:
a, b, c
1, 2, 3
10, 11, 12
Basic read
>>> readcsv_by_row('c:/my.csv')
[[a, b, c], [1,2,3], [10,11,12]]
Skip first row
>>> readcsv_by_row('c:/my.csv', skip_first_n=1)
[[a, b, c], [1,2,3], [10,11,12]]
"""
fname = _path.normpath(fname)
with open(fname, newline='') as f:
reader = _csv.reader(f)
data = list(reader)
return data[skip_first_n:]
def writecsv(filename, datalist, header=(), inner_as_rows=True, append=False, skip_first_row_if_file_exists=False) -> None:
"""
Writes a list to filename.
Think of inner_as_rows=False as a vstack of the nested lists (see examples)
Args:
filename (str): Filename to export csv to
datalist (list, tuple): the list of lists
header (list, tuple): header row
inner_as_rows (bool): vstack or hstack the nested list (see exmaples)
append (bool): Append datalist to filename, or overwrite existing csv
skip_first_row_if_file_exists (bool): if filename exists, skip the first row from datalist (to test)
Returns:
None
Examples:
>>> lst = [[1,'a'],[2,'b']]
>>> writecsv('c:\my.log', lst, inner_as_row=True)
1,2
a,b
>>> writecsv('c:\my.log', lst, header=('cola', 'colb') inner_as_row=False)
cola,colb
1,a
2,b
"""
csvfile = [] # noqa
useheader = False
exists = file_exists(filename)
if not append:
exists = False
try:
if append:
csvfile = open(filename, 'a', newline='', encoding='utf-8')
else:
csvfile = open(filename, 'w', newline='', encoding='utf-8')
except FileNotFoundError as _:
print("Could not create file %s, check the file's folder exists." % filename)
return
except Exception as e:
raise e
# if user passed a numpy array, convert it
if isinstance(datalist, _numpy.ndarray):
datalist = datalist.T
datalist = datalist.tolist()
# if there is no data, close the file
if len(datalist) < 1:
csvfile.close()
return
# check to see if datalist is a single list or list of lists
is_listoflists = False # noqa
list_len = 0 # noqa
num_lists = 0 # noqa
if isinstance(datalist[0], (list, tuple)): # check the first element in datalist
is_listoflists = True
list_len = len(datalist[0])
num_lists = len(datalist)
else:
is_listoflists = False
list_len = len(datalist)
num_lists = 1
# if a list then make sure everything is the same length
if is_listoflists:
for list_index in range(1, len(datalist)):
if len(datalist[list_index]) != list_len:
_sys.stderr.write('All lists in datalist must be the same length \n')
csvfile.close()
return
# if header is present, make sure it is the same length as the number of
# cols
if header:
if len(header) != list_len:
_sys.stderr.write('Header length did not match the number of columns, ignoring header.\n')
else:
useheader = True
# now that we've checked the inputs, loop and write outputs
writer = _csv.writer(csvfile,
delimiter=',',
quotechar='|',
quoting=_csv.QUOTE_MINIMAL) # Create writer object
if useheader:
writer.writerow(header)
if inner_as_rows:
for i, row in enumerate(range(0, list_len)):
if i == 0 and skip_first_row_if_file_exists and exists:
pass
else:
thisrow = []
if num_lists > 1:
for col in range(0, num_lists):
thisrow.append(datalist[col][row])
else:
thisrow.append(datalist[row])
writer.writerow(thisrow)
else:
for i, row in enumerate(datalist):
if i == 0 and skip_first_row_if_file_exists and exists:
pass
else:
writer.writerow(row)
csvfile.close()
# endregion
# region file system
def temp_folder(subfolder=''):
"""Returns a folder in the users temporary space.
subfolder:
if !== '': create the defined subfolder
otherwise uses a datetime stamp
"""
fld = datetime_stamp() if subfolder == '' else subfolder
return _path.normpath(_path.join(_tempfile.gettempdir(), fld))
def datetime_stamp(datetimesep=''):
"""(str) -> str
Returns clean date-_time stamp for file names etc
e.g 01 June 2016 11:23 would be 201606011123
str is optional seperator between the date and _time
"""
fmtstr = '%Y%m%d' + datetimesep + '%H%m%S'
return _time.strftime(fmtstr)
def exit(): # noqa
"""override exit to detect platform"""
if get_platform() == 'windows':
_os.system("pause")
else:
_os.system('read -s -n 1 -p "Press any key to continue..."')
_sys.exit()
def get_platform() -> str:
"""
Get platform/os name as string.
Returns:
str: Platform, IN ['windows', 'mac', 'linux']
"""
s = _sys.platform.lower()
if s in ("linux", "linux2"):
return 'linux'
if s == "darwin":
return 'mac'
if s in ("win32", "windows"):
return 'windows'
return 'linux'
def get_file_count(paths: (str, list), recurse: bool = False) -> int:
"""
Get file count in a folder or list of folders.
Args:
paths (str, list): Path or list of paths
recurse (bool): Recurse paths
Returns:
int: file count
Notes:
Left here to not break other code.
See file_count and file_count2 which support matching
Examples:
>>> get_file_count('C:/TEMP', False)
5
"""
cnt = 0
if isinstance(paths, str):
paths = [paths]
for ind, val in enumerate(paths):
paths[ind] = _path.normpath(val)
if recurse:
for thedir in paths:
cnt += sum((len(f) for _, _, f in _os.walk(thedir)))
else:
for thedir in paths:
cnt += len([item for item in _os.listdir(thedir)
if _path.isfile(_path.join(thedir, item))])
return cnt
def file_count_to_list(pth, out='', match='.*'):
"""
Get a list 2-deep list of file counts in a root directory
Args:
pth: root folder to count
out: Optional file to dume csv results to
match: iterable of starred matches, e.g. ('*.jpg','*.gif')
Examples:
>>> file_count_to_list('c:/temp','c:/out.csv', match=('.jpg','.gif')) # noqa
[['c:/temp',10],['c:/temp/subfld',12]]
"""
# TODO Debug this, might be issues with the wildcarding
R = []
for d, _, _ in folder_generator(_path.normpath(pth)):
i = file_count2(d, match=match)
R.append([d, i])
if out:
out = _path.normpath(out)
writecsv(out, R, inner_as_rows=False, header=['dir', 'n'])
return R
def hasext(path, ext):
"""(str, str|iter)->bool
Does the file have extension ext
ext can be a list of extensions
"""
if isinstance(ext, str):
return get_file_parts2(path)[2] == ext
return get_file_parts2(path)[2] in ext
def hasdir(path, fld):
"""(str, str|list)->bool
Is the file in folder fld.
fld can be a list of folders (strings)
"""
if isinstance(path, str):
return get_file_parts2(path)[0] == fld
return get_file_parts2(path)[0] in fld
def hasfile(path, fname):
"""(str, str|list)->bool
Does path contain the filename fname.
path:
full path name to a file
fname:
the file name
Example:
>>>hasfile('c:/tmp/myfile.txt', 'myfile.txt')
True
Returns:
true if fname is the file in path.
"""
if isinstance(path, str):
return get_file_parts2(path)[1] == fname
return get_file_parts2(path)[1] in fname
def drive_get_uuid(drive='C:', strip=('-',), return_when_unidentified='??'):
"""get uuid of drive"""
proc = _os.popen('vol %s' % drive)
try:
drive = proc.readlines()[1].split()[-1]
if not drive:
drive = return_when_unidentified
for char in strip:
drive = drive.replace(char, '')
except Exception as _:
pass
finally:
try:
proc.close()
except:
pass
# work
return drive
def get_file_parts(filepath: str) -> list:
"""
Given path to a file, split it into path,
file part and extension.
Args:
filepath (str): full path to a file.
Returns:
list: [folder, filename sans extension, extension]
Examples:
>>> get_file_parts('c:/temp/myfile.txt')
'c:/temp', 'myfile', '.txt'
"""
filepath = _path.normpath(filepath)
folder, fname = _path.split(filepath)
fname, ext = _path.splitext(fname)
return [folder, fname, ext]
def get_file_parts2(filepath: str) -> list:
"""
Split a full file path into path, file name with extension and dotted extension.
Args:
filepath (str): full path to a file.
Returns:
list: [folder, file name with extension, dotted extension]
Examples:
>>> get_file_parts2('c:/temp/myfile.txt')
'c:/temp', 'myfile.txt', '.txt'
"""
folder, fname = _path.split(filepath)
ext = _path.splitext(fname)[1]
return [folder, fname, ext]
def folder_has_files(fld: str, ext_dotted: tuple[str] = ()) -> bool:
"""(str, str|list) -> bool
Does the folder contain files, optionally matching
extensions. Extensions are dotted.
This checks in subfolders. So even if fld has no files, True will be returned if any subdir has files
fld: folder path
ext_dotted: list of extensions to match
Returns:
False if the folder does not exist, else True.
Examples:
>>> folder_has_files('C:/windows')
True
specify file extensions
>>> folder_has_files('C:/windows', ['.dll'])
True
"""
if isinstance(ext_dotted, str):
ext_dotted = [ext_dotted]
for _, _, files in _os.walk(_path.normpath(fld)):
if files and not ext_dotted:
return True
for fname in files:
for ext in ext_dotted:
if fname.endswith(ext):
return True
return False
def get_available_drives(strip=('-',), return_when_unidentified='??'):
"""->dictionary
gets a list of available drives as the key, with uuids as the values
eg. {'c:':'abcd1234','d:':'12345678'}
"""
drives = [
'%s:' % d for d in _string.ascii_uppercase if _path.exists('%s:' % d)]
uuids = [drive_get_uuid(drv, strip, return_when_unidentified)
for drv in drives]
return dict(zip(drives, uuids))
def get_available_drive_uuids(strip=('-',), return_when_unidentified='??'):
"""->dictionary
gets a list of available drives with uuids as the key
eg. {'c:':'abcd1234','d:':'12345678'}
"""
s = _string.ascii_uppercase
drives = ['%s:' % d for d in s if _path.exists('%s:' % d)]
uuids = [drive_get_uuid(drv, strip, return_when_unidentified)
for drv in drives]
return dict(zip(uuids, drives))
def get_drive_from_uuid(uuid, strip=('-',)):
"""str, str iterable, bool->str | None
given a uuid get the drive letter
uuid is expected to be lower case
Returns None if not found
"""
for char in strip:
uuid = uuid.replace(char, '')
# first val is drive, second is the uuid
drives = get_available_drive_uuids(strip)
if uuid in drives:
return drives[uuid]
if uuid.lower() in drives:
return drives[uuid]
return None
def folder_copy(src: str, dest: str, ignore: (list, tuple) = (), raise_error: bool = False) -> None:
"""
Recursive copy of folder src to folder dest.
This copies all files and folders BELOW dest
Args:
src (str): Source folder
dest (str): Dest folder
ignore (list, tuple): ignore these patterns (see shutil.ignore_patterns)
raise_error (bool): Raise an error if it occurs
Returns:
None
Notes:
Will fail if dest already exists.
Examples:
Copy all files and folders, ignoring some image types.
>>> folder_copy('C:/TEMP/mydir', 'C:/TEMP/mydir_copy', ignore=['*.jpg'. '*.gif'])
"""
src = _path.normpath(src)
dest = _path.normpath(dest)
try:
if ignore:
_shutil.copytree(src, dest, ignore=_shutil.ignore_patterns(*ignore))
else:
_shutil.copytree(src, dest)
except OSError as e:
# If the error was caused because the source wasn't a directory
if e.errno == _errno.ENOTDIR:
_shutil.copy(src, dest)
else:
if raise_error:
raise e
else:
print('Directory not copied. Error: %s' % e)
def folder_generator(paths: (str, list)):
"""
Yield subfolders in paths with wildcard match on any in match.
Args:
paths (str, list): Paths to iterate
Yields:
str: subfolders in paths
Notes:
Also see folder_generator2 which supports wildcard matching
Examples:
>>> [s for s in folder_generator2('C:/temp', 'folder')] # noqa
['C:/temp/folder_for_me', 'C:/temp/folder_for_you']
"""
if isinstance(paths, str):
paths = [paths]
paths = [_path.normpath(p) for p in paths]
for pth in paths:
for fld, _, _ in _os.walk(pth):
if fld in paths:
continue
yield fld
def folder_generator2(paths: (str, list), match: (str, list) = (), ignore_case: bool = True) -> str:
"""
Yield subfolders in paths with wildcard match on any in match.
Args:
paths (str, list): Paths to iterate
match (str, list): Wildcard match on this. If empty or None, no filter is applied (i.e. every dir is yielded)
ignore_case (bool): Make match case insensitive
Yields:
str: subfolders in paths
Examples:
>>> [s for s in folder_generator2('C:/temp', 'folder')] # noqa
['C:/temp/folder_for_me', 'C:/temp/folder_for_you']
"""
if isinstance(paths, str):
paths = [paths]
if isinstance(match, str):
match = [match]
paths = [_path.normpath(p) for p in paths]
for pth in paths:
for fld, _, _ in _os.walk(pth):
if fld in paths:
continue
if _stringslib.iter_member_in_str(fld, list(map(str, match)), ignore_case):
yield fld
def file_list_generator(paths: (str, list, tuple), wildcards: (str, list, tuple)):
"""