goraft 源码分析

24 Mar 2024 | distribute system, code

修订历史

  • 2024.03.24 创建笔记

交互接口

应用程序需要实现 Apply 接口:

// raft.go
type StateMachine interface {
	Apply(cmd []byte) ([]byte, error)
}

type Server struct { 
	statemachine StateMachine   // User-provided state machine
    ...
}

// cmd/kvapi.go
type statemachine struct { ... }
func (s *statemachine) Apply(cmd []byte) ([]byte, error) {
	...     // decode `cmd` to command `c`
	case setCommand:
		s.db.Store(c.key, c.value)
	    return nil, nil
	case getCommand:
		value, ok := s.db.Load(c.key)
		return []byte(value.(string)), nil
    ...
}

应用程序收到 http 请求,Apply 到 raft 主节点,等待结果返回:

// cmd/kvapi.go:
func (hs httpServer) getHandler(w http.ResponseWriter, r *http.Request) {
	...     // extract commmand `c` from http req
    var results []goraft.ApplyResult
	results, err = hs.raft.Apply([][]byte{encodeCommand(c)})
    ...     // send http respone using `results`
}

func (s *Server) Apply(commands [][]byte) ([]ApplyResult, error) {
    ...
    resultChans := make([]chan ApplyResult, len(commands))
	for i, command := range commands {
		resultChans[i] = make(chan ApplyResult)
		s.log = append(s.log, Entry{
			Term:    s.currentTerm,
			Command: command,
			result:  resultChans[i],
		})
	}
    ...     // persist and append entries
    ...     // get `results` from `resultChans`
	return results, nil
}

func (s *Server) advanceCommitIndex() {
    ... 
    log := s.log[s.lastApplied]
    res, err := s.statemachine.Apply(log.Command)
	log.result <- ApplyResult{
		Result: res,
		Error:  err,
				
	}
    ...
}

Older · View Archive (39)

gtctl 源码分析

Newer

Unification & Type inference