在raft协议中,snapshot主要用来压缩raft日志、减少raft日志的数量,一旦正确产生并持久化了一个snapshot,那么在这个snapshot之前的日志全部都可以直接丢掉。
snapshot定义
etcd中对raft snapshot的定义如下(在文件raft.pb.go):
type Snapshot struct { Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"` Metadata SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"` XXX_unrecognized []byte `json:"-"` }
其中的Data为snapshot的数据部分,这部分通常就是应用状态机数据,而Metadata则是snaoshot的元信息(包括集群当前的配置状态、日志索引、该条索引日志对应的任期号),定义如下:
// snapshot的元数据 type SnapshotMetadata struct { ConfState ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"` // 最后一次的配置状态 Index uint64 `protobuf:"varint,2,opt,name=index" json:"index"` // 被快照取代的最后的条目在日志中的索引值(appliedIndex) Term uint64 `protobuf:"varint,3,opt,name=term" json:"term"` // 该条目的任期号 XXX_unrecognized []byte `json:"-"` }
集群配置状态定义如下:
type ConfState struct { Nodes []uint64 `protobuf:"varint,1,rep,name=nodes" json:"nodes,omitempty"` Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"` XXX_unrecognized []byte `json:"-"` }
snapshot新建
想要在指定目录下新创建一个snapshot,可以使用如下方法(snapshotter.go):
type Snapshotter struct { dir string } func New(dir string) *Snapshotter { return &Snapshotter{ dir: dir, } }
snapshot持久化
// 对外暴露的接口,存储并持久化一个snapshot func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) error { if raft.IsEmptySnap(snapshot) { return nil } return s.save(&snapshot) } // 将raft snapshot序列化后持久化到磁盘 func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error { // 产生snapshot的时间 start := time.Now() // snapshot的文件名Term-Index.snap fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix) // 将raft snapshot序列化 b := pbutil.MustMarshal(snapshot) // 算CRC crc := crc32.Update(0, crcTable, b) // 将数据和crc一起打包 snap := snappb.Snapshot{Crc: crc, Data: b} // 使用pb方式序列化 d, err := snap.Marshal() if err != nil { return err } else { marshallingDurations.Observe(float64(time.Since(start)) / float64(time.Second)) } // 持久化(必须刷盘) err = pioutil.WriteAndSyncFile(filepath.Join(s.dir, fname), d, 0666) if err == nil { saveDurations.Observe(float64(time.Since(start)) / float64(time.Second)) } else { err1 := os.Remove(filepath.Join(s.dir, fname)) if err1 != nil { plog.Errorf("failed to remove broken snapshot file %s", filepath.Join(s.dir, fname)) } } return err }
由上面的代码可以看出,raft snapshot最终会封装成定义在snap.pb.go的Snapshot进行持久化存储:
type Snapshot struct { Crc uint32 `protobuf:"varint,1,opt,name=crc" json:"crc"` Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"` XXX_unrecognized []byte `json:"-"` }
snapshot加载
加载snapshot对外暴露的api是Load(),定义如下:
// 加载raft快照 func (s *Snapshotter) Load() (*raftpb.Snapshot, error) { // returns the filename of the snapshots in logical time order (from newest to oldest). // If there is no available snapshots, an ErrNoSnapshot will be returned. names, err := s.snapNames() if err != nil { return nil, err } var snap *raftpb.Snapshot // 从时间最近到最旧来遍历所有snapshot文件 for _, name := range names { // 加载snapshot if snap, err = loadSnap(s.dir, name); err == nil { // 只要成功加载了snapshot就跳出 break } } if err != nil { return nil, ErrNoSnapshot } return snap, nil }
其中snapNames用于返回所有的snapshot文件的文件名,并且是按照时间排序好的,其实现如下:
// snapNames returns the filename of the snapshots in logical time order (from newest to oldest). // If there is no available snapshots, an ErrNoSnapshot will be returned. func (s *Snapshotter) snapNames() ([]string, error) { dir, err := os.Open(s.dir) // 打开snapshot保存目录 if err != nil { return nil, err } defer dir.Close() names, err := dir.Readdirnames(-1) if err != nil { return nil, err } snaps := checkSuffix(names) // 检查文件名后缀是否正确 if len(snaps) == 0 { return nil, ErrNoSnapshot } sort.Sort(sort.Reverse(sort.StringSlice(snaps))) // 对目录下所有snapshot文件名进行排序 return snaps, nil }
checkSuffix用于对文件名后缀的合法性检查,如下:
func checkSuffix(names []string) []string { snaps := []string{} for i := range names { // 必须以 ".snap"结尾 if strings.HasSuffix(names[i], snapSuffix) { // snapSuffix 为 ".snap" snaps = append(snaps, names[i]) } else { // If we find a file which is not a snapshot then check if it's // a vaild file. If not throw out a warning. if _, ok := validFiles[names[i]]; !ok { plog.Warningf("skipped unexpected non snapshot file %v", names[i]) } } } return snaps }
拿到所有文件名之后(按照新旧排序),接下来便从最新的文件开始进行loadSnap:
// 参数为snapshot保存目录和snapshot文件名,返回值为加载到的raft snapshot func loadSnap(dir, name string) (*raftpb.Snapshot, error) { fpath := filepath.Join(dir, name) // 构造文件路径 snap, err := Read(fpath) // 读snapshot if err != nil { renameBroken(fpath) // 如果读遇到错误,就将该snapshot标记为损坏 } return snap, err }
Read本质就是Save的逆过程,因此逻辑也比较简单,根绝Save的书序进行相应的反序列化就行了,定义如下:
// Read reads the snapshot named by snapname and returns the snapshot. func Read(snapname string) (*raftpb.Snapshot, error) { // 读snapshot文件内容 b, err := ioutil.ReadFile(snapname) if err != nil { plog.Errorf("cannot read file %v: %v", snapname, err) return nil, err } if len(b) == 0 { plog.Errorf("unexpected empty snapshot") return nil, ErrEmptySnapshot } var serializedSnap snappb.Snapshot // 反序列化 if err = serializedSnap.Unmarshal(b); err != nil { plog.Errorf("corrupted snapshot file %v: %v", snapname, err) return nil, err } if len(serializedSnap.Data) == 0 || serializedSnap.Crc == 0 { plog.Errorf("unexpected empty snapshot") return nil, ErrEmptySnapshot } // 校验CRC crc := crc32.Update(0, crcTable, serializedSnap.Data) if crc != serializedSnap.Crc { plog.Errorf("corrupted snapshot file %v: crc mismatch", snapname) return nil, ErrCRCMismatch } // 反序列化,还原raft snapshot var snap raftpb.Snapshot if err = snap.Unmarshal(serializedSnap.Data); err != nil { plog.Errorf("corrupted snapshot file %v: %v", snapname, err) return nil, err } return &snap, nil }
etcd对raft snapshot的实现主要在snapshotter.go中,整个逻辑比较简单,在后面的文章中,将会多出看到snapshot接口的使用身影。
本系列文章
1、etcd raft模块分析--kv存储引擎
2、etcd raft模块分析--raft snapshot
3、etcd raft模块分析--raft wal日志
4、etcd raft模块分析--raft node
5、etcd raft模块分析--raft 协议
6、etcd raft模块分析--raft transport
7、etcd raft模块分析--raft storage
8、etcd raft模块分析--raft progress