You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
// ┌───────────┬──────────┬────────────┬──────────────┐// │ type <1b> │ len <2b> │ CRC32 <4b> │ data <bytes> │// └───────────┴──────────┴────────────┴──────────────┘func (r*Reader) next() (errerror) {
// We have to use r.buf since allocating byte arrays here fails escape// analysis and ends up on the heap, even though it seemingly should not.hdr:=r.buf[:recordHeaderSize]
// 数据部分buf:=r.buf[recordHeaderSize:]
// 清空r.rec=r.rec[:0]
// snappy 算法r.snappyBuf=r.snappyBuf[:0]
i:=0for {
// 获取第一个字节if_, err=io.ReadFull(r.rdr, hdr[:1]); err!=nil {
returnerrors.Wrap(err, "read first header byte")
}
r.total++r.curRecTyp=recTypeFromHeader(hdr[0])
// 是否被压缩compressed:=hdr[0]&snappyMask!=0// Gobble up zero bytes.ifr.curRecTyp==recPageTerm {
// recPageTerm is a single byte that indicates the rest of the page is padded.// If it's the first byte in a page, buf is too small and// needs to be resized to fit pageSize-1 bytes.buf=r.buf[1:]
// We are pedantic and check whether the zeros are actually up// to a page boundary.// It's not strictly necessary but may catch sketchy state early.k:=pageSize- (r.total%pageSize)
ifk==pageSize {
continue// Initial 0 byte was last page byte.
}
n, err:=io.ReadFull(r.rdr, buf[:k])
iferr!=nil {
returnerrors.Wrap(err, "read remaining zeros")
}
r.total+=int64(n)
for_, c:=rangebuf[:k] {
ifc!=0 {
returnerrors.New("unexpected non-zero byte in padded page")
}
}
continue
}
// 剩下的部分n, err:=io.ReadFull(r.rdr, hdr[1:])
iferr!=nil {
returnerrors.Wrap(err, "read remaining header")
}
// 整个读取真正的数据r.total+=int64(n)
var (
// 长度,2 个字节length=binary.BigEndian.Uint16(hdr[1:])
// crc, 4 个字节crc=binary.BigEndian.Uint32(hdr[3:])
)
// 记录的 record 大于 一页iflength>pageSize-recordHeaderSize {
returnerrors.Errorf("invalid record size %d", length)
}
// 读取数据部分n, err=io.ReadFull(r.rdr, buf[:length])
iferr!=nil {
returnerr
}
// 真正读取部分r.total+=int64(n)
ifn!=int(length) {
returnerrors.Errorf("invalid size: expected %d, got %d", length, n)
}
// 计算数据的 hash 值ifc:=crc32.Checksum(buf[:length], castagnoliTable); c!=crc {
returnerrors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
ifcompressed {
// 压缩过r.snappyBuf=append(r.snappyBuf, buf[:length]...)
} else {
// 数据部分r.rec=append(r.rec, buf[:length]...)
}
// 开始验证数据部分// 数据 type 的类型iferr:=validateRecord(r.curRecTyp, i); err!=nil {
returnerr
}
ifr.curRecTyp==recLast||r.curRecTyp==recFull {
ifcompressed&&len(r.snappyBuf) >0 {
// The snappy library uses `len` to calculate if we need a new buffer.// In order to allocate as few buffers as possible make the length// equal to the capacity.r.rec=r.rec[:cap(r.rec)]
r.rec, err=snappy.Decode(r.rec, r.snappyBuf)
returnerr
}
returnnil
}
// Only increment i for non-zero records since we use it// to determine valid content record sequences.i++
}
}
包含头信息和数据信息,头中包含了类别,长度 crc 校验和。
注意一个 WAL 文件,默认是128MB,也就是一个 segment 的大小,
一条 WAL record 记录是 32KB。
在存 record 的记录的时候,不足 32KB 的记录按照一条来存,多余 record 的记录,那就分多页,这多个页,
包含的头 type 也不同,具体的 type 取值为:
0: rest of page will be empty
1: a full record encoded in a single fragment
2: first fragment of a record
3: middle fragment of a record
4: final fragment of a record
recType
const (
recPageTermrecType=0// Rest of page is empty.recFullrecType=1// Full record.recFirstrecType=2// First fragment of a record.recMiddlerecType=3// Middle fragments of a record.recLastrecType=4// Final fragment of a record.
)
类别,然后是 series 的 id 号,接着是该 series 对应的 label 的数量,最后是 label 的 name 和 value。
其中 type 的可能取值为:
// Unknown is returned for unrecognised WAL record types.UnknownType=255// Series is used to match WAL records of type Series.SeriesType=1// Samples is used to match WAL records of type Samples.SamplesType=2// Tombstones is used to match WAL records of type Tombstones.TombstonesType=3// Exemplars is used to match WAL records of type Exemplars.ExemplarsType=4
其中,如果有多条,那么接下来的还是有 id, label 等同样的数据结构
func (d*Decoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
dec:= encoding.Decbuf{B: rec}
// record 类型不是 seriesifType(dec.Byte()) !=Series {
returnnil, errors.New("invalid record type")
}
forlen(dec.B) >0&&dec.Err() ==nil {
// 获取该 series 的 idref:=storage.SeriesRef(dec.Be64())
lset:=make(labels.Labels, dec.Uvarint())
fori:=rangelset {
lset[i].Name=dec.UvarintStr()
lset[i].Value=dec.UvarintStr()
}
sort.Sort(lset)
series=append(series, RefSeries{
Ref: chunks.HeadSeriesRef(ref),
Labels: lset,
})
}
ifdec.Err() !=nil {
returnnil, dec.Err()
}
// 还剩余数据iflen(dec.B) >0 {
returnnil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
returnseries, nil
}
// Series appends the encoded series to b and returns the resulting slice.func (e*Encoder) Series(series []RefSeries, b []byte) []byte {
// bufferbuf:= encoding.Encbuf{B: b}
// 设置 typebuf.PutByte(byte(Series))
for_, s:=rangeseries {
// 设置 series idbuf.PutBE64(uint64(s.Ref))
buf.PutUvarint(len(s.Labels))
for_, l:=ranges.Labels {
buf.PutUvarintStr(l.Name)
buf.PutUvarintStr(l.Value)
}
}
// 获取序列化后的 bytereturnbuf.Get()
}
WAL disk 磁盘格式
┌───────────┬──────────┬────────────┬──────────────┐
│ type <1b> │ len <2b> │ CRC32 <4b> │ data <bytes> │
└───────────┴──────────┴────────────┴──────────────┘
参考资料
chunk
读 WAL 文件
0
: rest of page will be empty1
: a full record encoded in a single fragment2
: first fragment of a record3
: middle fragment of a record4
: final fragment of a recordrecType
series 类型的 record
WAL disk 磁盘格式
index 文件格式
部分都以 len 字段开始
。symbol table
series
格式
label index
posting
posing offset table
TOC
#type/mysql #type/golang #public
The text was updated successfully, but these errors were encountered: