evio 源码分析

07 Mar 2024 | net, code

修订历史

  • 2024.03.07 创建笔记

使用方式

// echo server example

var events evio.Events
events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {
	out = in
	return
}
... 
evio.Serve(events, "tcp://:7890?reuseport=true")

目录结构

核心代码

// internal/internal_linux.go

type Poll struct {
	fd    int // epoll fd
	wfd   int // wake fd
	notes noteQueue
}
func (p *Poll) Trigger(note interface{}) error {
	p.notes.Add(note)
	syscall.Write(p.wfd, (*(*[8]byte)(unsafe.Pointer(&x)))[:])
	...
}
func (p *Poll) Wait(iter func(fd int, note interface{}) error) error {
	events := make([]syscall.EpollEvent, 64)
	for {
		n, err := syscall.EpollWait(p.fd, events, 100)
		if err != nil && err != syscall.EINTR {
			return err
		}
		if err := p.notes.ForEach(func(note interface{}) error {
			return iter(0, note)
		}); err != nil {
			return err
		}
		for i := 0; i < n; i++ {
			if fd := int(events[i].Fd); fd != p.wfd {
				if err := iter(fd, nil); err != nil {
					return err
				}
			} else if fd == p.wfd {
				var data [8]byte
				syscall.Read(p.wfd, data[:])
			}
		}
	}
}

调用 Trigger,会把参数 note 传入 noteQueue,并唤醒 epoll。note 会在 epoll 唤醒后被回调函数 iter 处理。

note 有三种:

iter 中的 err 会被传播到最外层,终止 poll.Wait

iter 函数除了处理 note,也会处理 fd 事件:

// evio_unix.go

func loopRun(s *server, l *loop) {
    ... 

	l.poll.Wait(func(fd int, note interface{}) error {
		if fd == 0 {
			return loopNote(s, l, note)
		}
		c := l.fdconns[fd]
		switch {
		case c == nil:
			return loopAccept(s, l, fd)
		case !c.opened:
			return loopOpened(s, l, c)
		case len(c.out) > 0:
			return loopWrite(s, l, c)
		case c.action != None:
			return loopAction(s, l, c)
		default:
			return loopRead(s, l, c)
		}
	})
}

evio listener 事件会被所有 loop 监听:

// evio_unix.go

func serve(events Events, listeners []*listener) error {
	numLoops := events.NumLoops
    ...
	for i := 0; i < numLoops; i++ {
		l := &loop{
			idx:     i,
			poll:    internal.OpenPoll(),
			packet:  make([]byte, 0xFFFF),
			fdconns: make(map[int]*conn),
		}
		for _, ln := range listeners {
			l.poll.AddRead(ln.fd)
		}
		s.loops = append(s.loops, l)
	}
    ...
	for _, l := range s.loops {
		go loopRun(s, l)
	}
	...
}

经过负载均衡的选择,只有一个 loop Accept 并监听新 Conn 的事件:

// evio_unix.go

func loopAccept(s *server, l *loop, fd int) error {
	for i, ln := range s.lns {
		if ln.fd == fd {
			if len(s.loops) > 1 {
				switch s.balance {
				case LeastConnections:  ...
				case RoundRobin:        ...
				}
			}
            ...
			nfd, sa, err := syscall.Accept(fd)
			c := &conn{fd: nfd, sa: sa, lnidx: i, loop: l}
			c.out = nil
			l.fdconns[c.fd] = c
			l.poll.AddReadWrite(c.fd)
			break
		}
	}
    ...
}

其他细节

使用 net/conn 的实现

入口函数启动 numLoops 个 loop 协程和 len(listeners) 个监听协程:

// evio_std.go

func stdserve(events Events, listeners []*listener) error {
	...
	for i := 0; i < numLoops; i++ {
		go stdloopRun(s, s.loops[i])
	}
	for i := 0; i < len(listeners); i++ {
		go stdlistenerRun(s, listeners[i], i)
	}
}

有新链接到达,监听协程为 net/conn 开新协程:

func stdlistenerRun(s *stdserver, ln *listener, lnidx int) {
	...
	for {
		...
		conn, err := ln.ln.Accept()
		l := s.loops[int(atomic.AddUintptr(&s.accepted, 1))%len(s.loops)]
		c := &stdconn{conn: conn, loop: l, lnidx: lnidx}
		l.ch <- c
		go func(c *stdconn) {
			var packet [0xFFFF]byte
			for {
				n, err := c.conn.Read(packet[:])
				l.ch <- &stdin{c, append([]byte{}, packet[:n]...)}
			}
		}(c)
	}
}

读写事件由 net/conn 库负责,loop 协程执行 Events 回调函数:

func stdloopRun(s *stdserver, l *stdloop) {
	...
	for {
		select {
		case <-tick:	...
		case v := <-l.ch:
			switch v := v.(type) {
			case error:
				err = v
			case *stdconn:
				err = stdloopAccept(s, l, v)
			case *stdin:
				err = stdloopRead(s, l, v.c, v.in)
			case *stdudpconn:
				err = stdloopReadUDP(s, l, v)
			case *stderr:
				err = stdloopError(s, l, v.c, v.err)
			case wakeReq:
				err = stdloopRead(s, l, v.c, nil)
			}
		}
		...
	}
}

不支持主动写


Older · View Archive (37)

一个作业评测系统的全栈实现

Newer

Bocker - Docker implemented in around 100 lines of bash