-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathimport-to-postgres.py
executable file
·353 lines (300 loc) · 14.6 KB
/
import-to-postgres.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
#!/usr/bin/env python3
import argparse
import logging
import plistlib
import psycopg2
DEFAULT_LIBRARY_FILE_LOCATION = '/Users/stephan/Music/iTunes/iTunes Music Library.xml'
DEFAULT_DATABASE_NAME = 'music'
DEFAULT_SCHEMA_NAME = 'public'
DEFAULT_USER_NAME = 'postgres'
DEFAULT_PASSWORD = 'postgres'
DEFAULT_PORT = 5432
TEMPORARY_TABLE_NAME = 'itunes'
TEMPORARY_SCHEMA_NAME = 'itunes'
def main(arg_list=None):
args = parse_args(arg_list)
library_xml = args.library_xml
db_name = args.database_name
schema_name = args.schema_name
port = args.port
username = args.username
password = args.password
logging.warning("Connecting to database %s on port %s with username %s. Importing data to schema %s",
db_name, port, username, schema_name)
logging.warning("Parsing library file at location: %s", library_xml.name)
library = plistlib.load(library_xml)
logging.warning("Extracting track data from library file...")
tracks_table, tracks = process_tracks(library)
# Import data 'as-is' to postgres
conn, cur = open_db(db_name, port, username, password)
logging.warning("Importing data into temp schema...")
import_itunes_data(cur, tracks, tracks_table)
close_db(conn, cur)
# Create normalised data structure
conn, cur = open_db(db_name, port, username, password)
logging.warning("Creating the new tables...")
create_normalised_tables(cur, schema_name)
close_db(conn, cur)
# Migrate data over to new structure
conn, cur = open_db(db_name, port, username, password)
logging.warning("Migrating data to new tables...")
normalise_data(cur, schema_name)
close_db(conn, cur)
def parse_args(arg_list):
parser = argparse.ArgumentParser()
parser.add_argument('--library',
help='Path to XML library file',
dest='library_xml',
type=argparse.FileType('rb'),
default=DEFAULT_LIBRARY_FILE_LOCATION)
parser.add_argument('--db', '-d',
help='Name of postgres database [%(default)s]',
dest='database_name',
default=DEFAULT_DATABASE_NAME)
parser.add_argument('--port', '-p',
help='Local database port [%(default)s]',
dest='port',
default=DEFAULT_PORT)
parser.add_argument('--schema', '-s',
help='Name of database schema [%(default)s]',
dest='schema_name',
default=DEFAULT_SCHEMA_NAME)
parser.add_argument('--user', '-u',
help='Postgres username [%(default)s]',
dest='username',
default=DEFAULT_USER_NAME)
parser.add_argument('--pass', '-x',
help='Postgres password [%(default)s]',
dest='password',
default=DEFAULT_PASSWORD)
args = parser.parse_args(arg_list)
return args
def import_itunes_data(db, tracks, tracks_table):
db.execute("DROP SCHEMA IF EXISTS itunes CASCADE")
db.execute("CREATE SCHEMA itunes")
db.execute("DROP TABLE IF EXISTS {0}.{1}".format(TEMPORARY_SCHEMA_NAME, TEMPORARY_TABLE_NAME))
db.execute(tracks_table)
db.execute("CREATE UNIQUE INDEX idx_itunes_itunes_id ON itunes.itunes (persistent_id);")
for query in tracks:
db.execute(query[0], list(query[1]))
def create_normalised_tables(db, schema_name):
db.execute("CREATE SCHEMA IF NOT EXISTS {0}".format(schema_name))
db.execute("CREATE TABLE IF NOT EXISTS {0}.artist ("
"artist_id BIGINT GENERATED BY DEFAULT AS IDENTITY,"
"artist_name TEXT NOT NULL,"
"CONSTRAINT pk_artist PRIMARY KEY (artist_id),"
"CONSTRAINT uk_artist_name UNIQUE (artist_name)"
"); "
.format(schema_name))
db.execute("CREATE TABLE IF NOT EXISTS {0}.album ("
"album_id BIGINT GENERATED BY DEFAULT AS IDENTITY,"
"album_name TEXT NOT NULL,"
"artist_id BIGINT NOT NULL,"
"release_year INT,"
"CONSTRAINT pk_album PRIMARY KEY (album_id),"
"CONSTRAINT fk_album_artist_id FOREIGN KEY (artist_id) REFERENCES {0}.artist (artist_id),"
"CONSTRAINT ck_album_release_year CHECK (release_year BETWEEN 1900 AND 2050),"
"CONSTRAINT uk_album_artist UNIQUE (album_name, artist_id)"
"); "
.format(schema_name))
db.execute("CREATE TABLE IF NOT EXISTS {0}.track ("
"track_id BIGINT GENERATED BY DEFAULT AS IDENTITY, "
"track_name TEXT NOT NULL, "
"length BIGINT NOT NULL, "
"album_id BIGINT NOT NULL, "
"artist_id BIGINT NOT NULL, "
"play_count INT NOT NULL, "
"last_played TIMESTAMP, "
"date_added TIMESTAMP, "
"track_number INT NOT NULL, "
"bpm INT, "
"loved BOOLEAN NOT NULL, "
"itunes_id VARCHAR(16) NOT NULL, "
"CONSTRAINT pk_track PRIMARY KEY (track_id), "
"CONSTRAINT fk_track_artist_id FOREIGN KEY (artist_id) REFERENCES {0}.artist (artist_id), "
"CONSTRAINT fk_track_album_id FOREIGN KEY (album_id) REFERENCES {0}.album (album_id), "
"CONSTRAINT ck_track_date_added CHECK (date_added <= current_timestamp :: TIMESTAMP), "
"CONSTRAINT ck_track_play_count CHECK (play_count >= 0), "
"CONSTRAINT uk_track_artist_album UNIQUE (track_name, album_id, artist_id, track_number), "
"CONSTRAINT uk_track_itunes_id UNIQUE (itunes_id));"
.format(schema_name))
db.execute("CREATE TABLE IF NOT EXISTS {0}.play ("
"play_id BIGINT GENERATED BY DEFAULT AS IDENTITY,"
"track_id BIGINT,"
"played_at TIMESTAMP NOT NULL,"
"CONSTRAINT pk_play PRIMARY KEY (play_id),"
"CONSTRAINT fk_play_track_id FOREIGN KEY (track_id) REFERENCES {0}.track (track_id),"
"CONSTRAINT uk_play_track_play_at UNIQUE (track_id, played_at)"
");"
.format(schema_name))
db.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_track_itunes_id ON {0}.track (itunes_id);".format(schema_name))
def normalise_data(db, schema_name):
db.execute("INSERT INTO {0}.artist (artist_name) "
"SELECT artist "
" FROM itunes.itunes "
" WHERE artist IS NOT NULL "
" GROUP BY artist "
" ON CONFLICT (artist_name) "
" DO NOTHING;"
.format(schema_name))
db.execute("INSERT INTO {0}.album (album_name, artist_id, release_year) "
"SELECT album, "
" (SELECT artist_id "
" FROM {0}.artist "
" WHERE artist_name = artist), "
" MAX(year :: INT) "
" FROM itunes.itunes "
" WHERE album IS NOT NULL "
" AND year IS NOT NULL "
" GROUP BY album, artist "
" ON CONFLICT (album_name, artist_id)"
" DO NOTHING;"
.format(schema_name))
db.execute("UPDATE {0}.track t "
" SET track_name = i.name, "
" length = i.total_time, "
" album_id = i.album_id, "
" artist_id = i.artist_id, "
" play_count = i.play_count, "
" last_played = i.play_date_utc, "
" date_added = i.date_added, "
" track_number = i.track_number, "
" itunes_id = i.persistent_id, "
" bpm = i.bpm, "
" loved = i.loved "
" FROM (SELECT name, "
" total_time :: BIGINT, "
" al.album_id, "
" ar.artist_id, "
" COALESCE(play_count :: INT, 0) AS play_count, "
" play_date_utc :: TIMESTAMP, "
" date_added :: TIMESTAMP, "
" COALESCE(track_number :: INT, 1) AS track_number, "
" bpm :: INT, "
" COALESCE(loved :: BOOLEAN, FALSE) AS loved, "
" persistent_id "
" FROM itunes.itunes t "
" JOIN {0}.artist ar ON (artist_name = artist) "
" JOIN {0}.album al ON (album_name = album "
" AND al.artist_id = ar.artist_id) "
") AS i "
"WHERE t.itunes_id = i.persistent_id "
" OR (t.track_name, t.album_id, t.artist_id, t.track_number) "
" = (i.name, i.album_id, i.artist_id, i.track_number); "
.format(schema_name))
db.execute("INSERT INTO {0}.track (track_name, "
" length, "
" album_id, "
" artist_id, "
" play_count, "
" last_played, "
" date_added, "
" track_number, "
" bpm, "
" loved, "
" itunes_id) "
" SELECT name AS track_name, "
" total_time, "
" album_id, "
" artist_id, "
" play_count, "
" play_date_utc, "
" date_added, "
" track_number, "
" bpm, "
" loved, "
" itunes_id "
" FROM (SELECT name, "
" total_time :: BIGINT, "
" al.album_id, "
" ar.artist_id, "
" COALESCE(play_count :: INT, 0) AS play_count, "
" play_date_utc :: TIMESTAMP, "
" date_added :: TIMESTAMP, "
" COALESCE(track_number :: INT, 1) AS track_number, "
" bpm :: INT, "
" COALESCE(loved :: BOOLEAN, FALSE) AS loved, "
" persistent_id AS itunes_id,"
" ROW_NUMBER() OVER "
" (PARTITION BY name, "
" album, "
" artist, "
" COALESCE(track_number :: INT, 1) "
" ORDER BY COALESCE(play_count :: INT, 0) DESC) "
" FROM itunes.itunes i "
" JOIN {0}.artist ar "
" ON (artist_name = artist) "
" JOIN {0}.album al "
" ON (album_name = album "
" AND release_year = year :: int "
" AND al.artist_id = ar.artist_id) "
" WHERE NOT EXISTS (SELECT "
" FROM {0}.track t "
" WHERE t.itunes_id = i.persistent_id "
" OR (t.track_name, t.album_id, t.artist_id, t.track_number) "
" = (i.name, "
" al.album_id, "
" ar.artist_id, "
" COALESCE(track_number :: INT, 1))"
" )) AS a "
" WHERE row_number = 1; "
.format(schema_name))
db.execute("INSERT INTO {0}.play (track_id, played_at)"
"SELECT track_id,"
" last_played"
" FROM {0}.track"
" WHERE last_played IS NOT NULL"
" AND NOT EXISTS (SELECT "
" FROM {0}.play"
" WHERE track_id = track_id"
" AND played_at = last_played);"
.format(schema_name))
def process_tracks(library):
all_keys = set()
inserts = []
for track_id in library['Tracks'].keys():
track = library['Tracks'][track_id]
track_keys = list(map(slugify, track.keys()))
if 'Podcast' in track and track['Podcast']:
track.pop('Podcast')
continue
if 'Music Video' in track:
track.pop('Music Video')
continue
if 'Artist' not in track:
continue
if 'Album' not in track:
continue
if 'Year' not in track:
continue
if 'Location' in track and 'iTunes%20Media/Audiobooks' in track['Location']:
continue
track_values = track.values()
all_keys = all_keys.union(set(track_keys))
inserts.append(get_parameterized(track_keys, track_values))
all_keys = list(map(slugify, all_keys))
all_keys_with_type = (key + " TEXT" for key in all_keys)
return "CREATE TABLE IF NOT EXISTS {0}.{1} ({2})".format(TEMPORARY_SCHEMA_NAME, TEMPORARY_TABLE_NAME,
', '.join(all_keys_with_type)), inserts
def get_parameterized(keys, values):
return (
"INSERT INTO {0}.{1} ({2}) VALUES ({3})".format(
TEMPORARY_SCHEMA_NAME,
TEMPORARY_TABLE_NAME,
', '.join(map(str, keys)),
', '.join(['%s'] * len(values))
),
[value for value in values]
)
def slugify(name):
return name.lower().replace(' ', '_')
def open_db(name, port, user, password):
conn = psycopg2.connect(host="localhost", port=port, database=name, user=user, password=password)
cur = conn.cursor()
return conn, cur
def close_db(conn, cur):
conn.commit()
cur.close()
conn.close()
if __name__ == '__main__':
main()