-
Notifications
You must be signed in to change notification settings - Fork 4
/
dbwriter.go
632 lines (516 loc) · 14.9 KB
/
dbwriter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
// dbwriter.go -- Constant DB built on top of the BBHash MPH
//
// Author: Sudhi Herle <sudhi@herle.net>
//
// This software does not come with any express or implied
// warranty; it is provided "as is". No claim is made to its
// suitability for any purpose.
package bbhash
import (
"bufio"
"crypto/sha512"
"encoding/binary"
"encoding/csv"
"errors"
"fmt"
"io"
"os"
"runtime"
"strings"
"sync"
"github.com/dchest/siphash"
"github.com/opencoff/go-fasthash"
)
// Most data is serialized as big-endian integers. The exceptions are:
// Offset table:
// This is mmap'd into the process and written as a little-endian uint64.
// This is arguably an optimization -- most systems we work with are
// little-endian. On big-endian systems, the DBReader code will convert
// it on the fly to native-endian.
// DBWriter represents an abstraction to construct a read-only constant database.
// This database uses BBHash as the underlying mechanism for constant time lookups
// of keys; keys and values are represented as arbitrary byte sequences ([]byte).
// The DB meta-data is protected by strong checksum (SHA512-256) and each key/value
// record is protected by a distinct siphash-2-4. Records can be added to the DB via
// plain delimited text files or CSV files. Once all addition of key/val is complete,
// the DB is written to disk via the Freeze() function.
//
// The DB has the following general structure:
// - 64 byte file header:
// * magic [4]byte "BBHH"
// * flags uint32 for now, all zeros
// * salt uint64 random salt for hash functions
// * nkeys uint64 Number of keys in the DB
// * offtbl uint64 file offset where the 'key/val' offsets start
//
// - Contiguous series of records; each record is a key/value pair:
// * keylen uint16 length of the key
// * vallen uint32 length of the value
// * cksum uint64 Siphash checksum of key, value, offset
// * key []byte keylen bytes of key
// * val []byte vallen bytes of value
//
// - Possibly a gap until the next PageSize boundary (4096 bytes)
// - Offset table: nkeys worth of file offsets. Entry 'i' is the perfect
// hash index for some key 'k' and offset[i] is the offset in the DB
// where the key and value can be found.
// - Marshaled BBHash bytes (BBHash:MarshalBinary())
// - 32 bytes of strong checksum (SHA512_256); this checksum is done over
// the file header, offset-table and marshaled bbhash.
type DBWriter struct {
fd *os.File
// to detect duplicates
keymap map[uint64]*record
// list of unique keys
keys []uint64
// hash salt for hashing keys
salt uint64
// siphash key: just binary encoded salt
saltkey []byte
// running count of current offset within fd where we are writing
// records
off uint64
bb *BBHash
fntmp string
fn string
frozen bool
}
type header struct {
magic [4]byte // file magic
resv00 uint32 // reserved - in future flags, algo choices etc.
salt uint64 // hash salt
nkeys uint64 // number of keys in the system
offtbl uint64 // file location where offset-table starts
resv01 [4]uint64
}
type record struct {
hash uint64
key []byte
val []byte
// siphash of the key+val+offset+hash.
csum uint64
// offset where this record is written
off uint64
}
// NewDBWriter prepares file 'fn' to hold a constant DB built using
// BBHash minimal perfect hash function. Once written, the DB is "frozen"
// and readers will open it using NewDBReader() to do constant time lookups
// of key to value.
func NewDBWriter(fn string) (*DBWriter, error) {
tmp := fmt.Sprintf("%s.tmp.%d", fn, rand64())
fd, err := os.OpenFile(tmp, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return nil, err
}
w := &DBWriter{
fd: fd,
keymap: make(map[uint64]*record),
keys: make([]uint64, 0, 65536),
salt: rand64(),
saltkey: make([]byte, 16),
off: 64,
fn: fn,
fntmp: tmp,
}
// Leave some space for a header; we will fill this in when we
// are done Freezing.
var z [64]byte
nw, err := fd.Write(z[:])
if err != nil {
return nil, w.error("can't write header: %s", err)
}
if nw != 64 {
return nil, w.error("can't write blank-header: %s", err)
}
binary.BigEndian.PutUint64(w.saltkey[:8], w.salt)
binary.BigEndian.PutUint64(w.saltkey[8:], ^w.salt)
return w, nil
}
// TotalKeys returns the total number of distinct keys in the DB
func (w *DBWriter) TotalKeys() int {
return len(w.keys)
}
// AddKeyVals adds a series of key-value matched pairs to the db. If they are of
// unequal length, only the smaller of the lengths are used. Records with duplicate
// keys are discarded.
// Returns number of records added.
func (w *DBWriter) AddKeyVals(keys [][]byte, vals [][]byte) (uint64, error) {
if w.frozen {
return 0, ErrFrozen
}
n := len(keys)
if len(vals) < n {
n = len(vals)
}
var z uint64
for i := 0; i < n; i++ {
r := &record{
key: keys[i],
val: vals[i],
}
ok, err := w.addRecord(r)
if err != nil {
return z, err
}
if ok {
z++
}
}
return z, nil
}
// AddTextFile adds contents from text file 'fn' where key and value are separated
// by one of the characters in 'delim'. Duplicates, Empty lines or lines with no value
// are skipped. This function just opens the file and calls AddTextStream()
// Returns number of records added.
func (w *DBWriter) AddTextFile(fn string, delim string) (uint64, error) {
if w.frozen {
return 0, ErrFrozen
}
fd, err := os.Open(fn)
if err != nil {
return 0, err
}
if len(delim) == 0 {
delim = " \t"
}
defer fd.Close()
return w.AddTextStream(fd, delim)
}
// AddTextStream adds contents from text stream 'fd' where key and value are separated
// by one of the characters in 'delim'. Duplicates, Empty lines or lines with no value
// are skipped.
// Returns number of records added.
func (w *DBWriter) AddTextStream(fd io.Reader, delim string) (uint64, error) {
if w.frozen {
return 0, ErrFrozen
}
rd := bufio.NewReader(fd)
sc := bufio.NewScanner(rd)
ch := make(chan *record, 10)
// do I/O asynchronously
go func(sc *bufio.Scanner, ch chan *record) {
for sc.Scan() {
s := strings.TrimSpace(sc.Text())
if len(s) == 0 {
continue
}
i := strings.IndexAny(s, delim)
if i < 0 {
continue
}
k := s[:i]
v := s[i:]
// ignore items that are too large
if len(k) > 65535 || len(v) >= 4294967295 {
continue
}
r := &record{
key: []byte(k),
val: []byte(v),
}
ch <- r
}
close(ch)
}(sc, ch)
return w.addFromChan(ch)
}
// AddCSVFile adds contents from CSV file 'fn'. If 'kwfield' and 'valfield' are
// non-negative, they indicate the field# of the key and value respectively; the
// default value for 'kwfield' & 'valfield' is 0 and 1 respectively.
// If 'comma' is not 0, the default CSV delimiter is ','.
// If 'comment' is not 0, then lines beginning with that rune are discarded.
// Records where the 'kwfield' and 'valfield' can't be evaluated are discarded.
// Returns number of records added.
func (w *DBWriter) AddCSVFile(fn string, comma, comment rune, kwfield, valfield int) (uint64, error) {
if w.frozen {
return 0, ErrFrozen
}
fd, err := os.Open(fn)
if err != nil {
return 0, err
}
defer fd.Close()
return w.AddCSVStream(fd, comma, comment, kwfield, valfield)
}
// AddCSVStream adds contents from CSV file 'fn'. If 'kwfield' and 'valfield' are
// non-negative, they indicate the field# of the key and value respectively; the
// default value for 'kwfield' & 'valfield' is 0 and 1 respectively.
// If 'comma' is not 0, the default CSV delimiter is ','.
// If 'comment' is not 0, then lines beginning with that rune are discarded.
// Records where the 'kwfield' and 'valfield' can't be evaluated are discarded.
// Returns number of records added.
func (w *DBWriter) AddCSVStream(fd io.Reader, comma, comment rune, kwfield, valfield int) (uint64, error) {
if w.frozen {
return 0, ErrFrozen
}
if kwfield < 0 {
kwfield = 0
}
if valfield < 0 {
valfield = 1
}
var max int = valfield
if kwfield > valfield {
max = kwfield
}
max += 1
ch := make(chan *record, 10)
cr := csv.NewReader(fd)
cr.Comma = comma
cr.Comment = comment
cr.FieldsPerRecord = -1
cr.TrimLeadingSpace = true
cr.ReuseRecord = true
go func(cr *csv.Reader, ch chan *record) {
for {
v, err := cr.Read()
if err != nil {
break
}
if len(v) < max {
continue
}
r := &record{
key: []byte(v[kwfield]),
val: []byte(v[valfield]),
}
ch <- r
}
close(ch)
}(cr, ch)
return w.addFromChan(ch)
}
// Freeze builds the minimal perfect hash, writes the DB and closes it.
// For very large key spaces, a higher 'g' value is recommended (2.5~4.0); otherwise,
// the Freeze() function will fail to generate an MPH.
func (w *DBWriter) Freeze(g float64) error {
if w.frozen {
return ErrFrozen
}
bb, err := New(g, w.keys)
if err != nil {
return ErrMPHFail
}
offset := make([]uint64, len(w.keys))
err = w.buildOffsets(bb, offset)
if err != nil {
return err
}
// We align the offset table to pagesize - so we can mmap it when we read it back.
pgsz := uint64(os.Getpagesize())
pgsz_m1 := pgsz - 1
offtbl := w.off + pgsz_m1
offtbl &= ^pgsz_m1
var ehdr [64]byte
// save info for building the file header.
hdr := &header{
magic: [4]byte{'B', 'B', 'H', 'H'},
salt: w.salt,
nkeys: uint64(len(w.keys)),
offtbl: offtbl,
}
/*
hdr.magic[0] = 'B'
hdr.magic[1] = 'B'
hdr.magic[2] = 'H'
hdr.magic[3] = 'H'
*/
hdr.encode(ehdr[:])
w.fd.Seek(int64(offtbl), 0)
// We won't encode concurrently and write to disk for two reasons:
// 1. To make the I/O safe - we have to encode an entire worker's worth of offsets;
// this costs additional memory.
// 2. There is no safe, portable way to do concurrent disk write without corrupting the
// file.
var z [8]byte
le := binary.LittleEndian
// we calculate strong checksum for all data from this point on.
h := sha512.New512_256()
h.Write(ehdr[:])
tee := io.MultiWriter(w.fd, h)
for _, o := range offset {
le.PutUint64(z[:], o)
n, err := tee.Write(z[:])
if err != nil {
return err
}
if n != 8 {
return fmt.Errorf("%s: partial write of offsets; exp %d saw %d", w.fntmp, 8, n)
}
}
// We now encode the bbhash and write to disk.
err = bb.MarshalBinary(tee)
if err != nil {
return err
}
// Trailer is the checksum of the meta-data.
cksum := h.Sum(nil)
n, err := w.fd.Write(cksum[:])
if err != nil {
return err
}
if n != sha512.Size256 {
return fmt.Errorf("%s: partial write of checksum; exp %d saw %d", w.fntmp, sha512.Size256, n)
}
w.fd.Seek(0, 0)
n, err = w.fd.Write(ehdr[:])
if err != nil {
return err
}
if n != 64 {
return fmt.Errorf("%s: partial write of file header; exp %d saw %d", w.fntmp, 64, n)
}
w.frozen = true
w.fd.Sync()
w.fd.Close()
err = os.Rename(w.fntmp, w.fn)
if err != nil {
return err
}
return nil
}
// encode header 'h' into bytestream 'b'
func (h *header) encode(b []byte) {
be := binary.BigEndian
copy(b[:4], h.magic[:])
i := 8
be.PutUint64(b[i:i+8], h.salt)
i += 8
be.PutUint64(b[i:i+8], h.nkeys)
i += 8
be.PutUint64(b[i:i+8], h.offtbl)
}
// Abort stops the construction of the perfect hash db
func (w *DBWriter) Abort() {
w.fd.Close()
os.Remove(w.fntmp)
}
// build the offset mapping table: map of MPH index to a record offset.
// We opportunistically exploit concurrency to build the table faster.
func (w *DBWriter) buildOffsets(bb *BBHash, offset []uint64) error {
if len(w.keys) >= MinParallelKeys {
return w.buildOffsetsConcurrent(bb, offset)
}
return w.buildOffsetSingle(bb, offset, w.keys)
}
// serialized/single-threaded construction of the offset table.
func (w *DBWriter) buildOffsetSingle(bb *BBHash, offset, keys []uint64) error {
for _, k := range keys {
r := w.keymap[k]
i := bb.Find(k)
if i == 0 {
return fmt.Errorf("%s: key <%s> with hash %#x can't be mapped", w.fn, string(r.key), k)
}
offset[i-1] = r.off
}
return nil
}
// concurrent construction of the offset table.
func (w *DBWriter) buildOffsetsConcurrent(bb *BBHash, offset []uint64) error {
ncpu := runtime.NumCPU()
n := len(w.keys) / ncpu
r := len(w.keys) % ncpu
errch := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(ncpu)
go func() {
wg.Wait()
close(errch)
}()
// shard keys across n cpus and find the MPH index for each key.
for i := 0; i < ncpu; i++ {
x := n * i
y := x + n
if i == (ncpu - 1) {
y += r
}
// XXX keymap may have to be locked for concurrent reads?
go func(keys []uint64) {
err := w.buildOffsetSingle(bb, offset, keys)
if err != nil {
errch <- err
}
wg.Done()
}(w.keys[x:y])
}
// XXX What is the design pattern for returning errors from multiple workers?
err := <-errch
return err
}
// read partial records from the chan, complete them and write them to disk.
// Build up the internal tables as we go
func (w *DBWriter) addFromChan(ch chan *record) (uint64, error) {
var n uint64
for r := range ch {
ok, err := w.addRecord(r)
if err != nil {
return n, err
}
if ok {
n++
}
}
return n, nil
}
// compute checksums and add a record to the file at the current offset.
func (w *DBWriter) addRecord(r *record) (bool, error) {
buf := make([]byte, 0, 65536)
r.hash = fasthash.Hash64(w.salt, r.key)
if _, ok := w.keymap[r.hash]; ok {
return false, nil
}
r.off = w.off
r.csum = r.checksum(w.saltkey, w.off)
b := r.encode(buf)
nw, err := w.fd.Write(b)
if err != nil {
return false, err
}
if nw != len(b) {
return false, fmt.Errorf("%s: partial write; exp %d saw %d", w.fntmp, len(b), nw)
}
w.keymap[r.hash] = r
w.keys = append(w.keys, r.hash)
w.off += uint64(nw)
return true, nil
}
// cleanup intermediate work and return an error instance
func (w *DBWriter) error(f string, v ...interface{}) error {
w.fd.Close()
os.Remove(w.fntmp)
return fmt.Errorf(f, v...)
}
// Calculate a semi-strong checksum on the important fields of the record
// at offset 'off'. In our implementation, we use siphash-24 (64-bit) as
// the strong checksum; and we use the offset as one of the items being
// protected.
func (r *record) checksum(key []byte, off uint64) uint64 {
var b [8]byte
be := binary.BigEndian
h := siphash.New(key)
h.Write(r.key)
h.Write(r.val)
be.PutUint64(b[:], off)
h.Write(b[:])
return h.Sum64()
}
// Provide a disk encoding of record r
func (r *record) encode(buf []byte) []byte {
var b [2 + 4 + 8]byte
klen := len(r.key)
vlen := len(r.val)
be := binary.BigEndian
be.PutUint16(b[:2], uint16(klen))
be.PutUint32(b[2:6], uint32(vlen))
be.PutUint64(b[6:], r.csum)
buf = append(buf, b[:]...)
buf = append(buf, r.key...)
buf = append(buf, r.val...)
return buf
}
// ErrMPHFail is returned when the gamma value provided to Freeze() is too small to
// build a minimal perfect hash table.
var ErrMPHFail = errors.New("failed to build MPH; gamma possibly small")
// ErrFrozen is returned when attempting to add new records to an already frozen DB
// It is also returned when trying to freeze a DB that's already frozen.
var ErrFrozen = errors.New("DB already frozen")