-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdbc2csv.py
111 lines (91 loc) · 3.39 KB
/
dbc2csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import os
import sys
import log
import config
# print(sys.path)
# sys.path.append("/home/ivo/Dropbox/DataSUS/DataSUS/venv/")
import csv # https://docs.python.org/3/library/csv.html
import csv2df
# import dbf
# import PyTables
from tempfile import NamedTemporaryFile
# import geopandas as gpd
# from io import BytesIO
# from pysus.utilities.readdbc import read_dbc # not work
# from simpledbf import Dbf5
from dbfread import DBF #https://dbfread.readthedocs.io/en/latest/
from _readdbc import ffi, lib
# for file in os.listdir("data"):
# if file.endswith(".dbf"):
# dbf = Dbf5("data/" + file, codec='utf-8')
# dbf.to_csv("data/csv/" + os.path.splitext(file)[0] + ".csv")
log = log.get_logger(config.logger_name)
def dbc2dbf(infile, outfile):
"""
Converts a DATASUS dbc file to a DBF database.
:param infile: .dbc file name
:param outfile: name of the .dbf file to be created.
"""
if isinstance(infile, str):
infile = infile.encode()
if isinstance(outfile, str):
outfile = outfile.encode()
p = ffi.new('char[]', os.path.abspath(infile))
q = ffi.new('char[]', os.path.abspath(outfile))
lib.dbc2dbf([p], [q])
def read_dbc(file, encoding='utf-8', raw=False):
"""
Opens a DATASUS .dbc file and return its contents as a pandas
Dataframe.
:param filename: .dbc filename
:param encoding: encoding of the data
:param raw: Skip type conversion. Set it to True to avoid type conversion errors
:return: Pandas Dataframe.
"""
if isinstance(file, str):
filename = file.encode()
tf = NamedTemporaryFile(delete=False)
dbc2dbf(filename, tf.name.encode())
try:
db = DBF(tf.name, encoding=encoding, raw=raw)
except:
#db = dbf.Table(tf.name)
try:
db = DBF(tf.name, encoding='latin1', raw=raw)
except:
log.fatal("Error read DBF {}".format(tf.name))
return
# df = pd.DataFrame(iter(dbf))
# dbf = Dbf5(tf.name, codec=encoding)
# df = dbf.to_dataframe() # gpd.GeoDataFrame(list(dbf))
if not os.path.isfile(file + '.csv'):
csvfile = open(file + '.csv', 'w', newline='')
log.info("Creating CSV file {}...".format(file + '.csv'))
writer = csv.writer(csvfile, delimiter='|')
writer.writerow(db.field_names)
for record in db:
writer.writerow(list(record.values()))
del db
del writer
log.info("Created CSV file {}...".format(file))
csv2df.execute(file + '.csv')
# os.system("zip " + file + ".zip " + file + ".csv > /dev/null")
# os.system("rm " + file + ".csv")
##print('shape', df.shape)
# compression_opts = dict(method='zip', archive_name=file+ '.csv')
# df.to_csv(file + '.zip', sep='|', compression=compression_opts)
## del df
# with NamedTemporaryFile(delete=False) as tf:
# dbc2dbf(filename, tf.name.encode())
# # print('read')
# dbf = DBF(tf.name, encoding=encoding, raw=raw)
# df = pd.DataFrame(iter(dbf))
# # dbf = Dbf5(tf.name, codec=encoding)
# # df = dbf.to_dataframe() # #gpd.GeoDataFrame(list(dbf))
# del dbf
# print('shape', df.shape)
# compression_opts = dict(method='zip', archive_name=file+ '.csv')
# df.to_csv(file + '.zip', sep='|', compression=compression_opts)
# del df
os.unlink(tf.name)
del tf