24 Mar 2024 | distribute system, code
修订历史
- 2024.03.24 创建笔记
应用程序需要实现 Apply 接口:
// raft.go
type StateMachine interface {
Apply(cmd []byte) ([]byte, error)
}
type Server struct {
statemachine StateMachine // User-provided state machine
...
}
// cmd/kvapi.go
type statemachine struct { ... }
func (s *statemachine) Apply(cmd []byte) ([]byte, error) {
... // decode `cmd` to command `c`
case setCommand:
s.db.Store(c.key, c.value)
return nil, nil
case getCommand:
value, ok := s.db.Load(c.key)
return []byte(value.(string)), nil
...
}
应用程序收到 http 请求,Apply 到 raft 主节点,等待结果返回:
// cmd/kvapi.go:
func (hs httpServer) getHandler(w http.ResponseWriter, r *http.Request) {
... // extract commmand `c` from http req
var results []goraft.ApplyResult
results, err = hs.raft.Apply([][]byte{encodeCommand(c)})
... // send http respone using `results`
}
func (s *Server) Apply(commands [][]byte) ([]ApplyResult, error) {
...
resultChans := make([]chan ApplyResult, len(commands))
for i, command := range commands {
resultChans[i] = make(chan ApplyResult)
s.log = append(s.log, Entry{
Term: s.currentTerm,
Command: command,
result: resultChans[i],
})
}
... // persist and append entries
... // get `results` from `resultChans`
return results, nil
}
func (s *Server) advanceCommitIndex() {
...
log := s.log[s.lastApplied]
res, err := s.statemachine.Apply(log.Command)
log.result <- ApplyResult{
Result: res,
Error: err,
}
...
}