修订历史
- 2024.03.07 创建笔记
// echo server example
var events evio.Events
events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {
out = in
return
}
...
evio.Serve(events, "tcp://:7890?reuseport=true")
internal/internal_*.go
不同平台上 Poll 的实现internal/notequeue.go
消息队列evio.go
接口定义evio_std.go
使用 net/conn 实现的仿 Reactor 网络模型evio_unix.go
使用 Poll 实现的 Reactor 网络模型// internal/internal_linux.go
type Poll struct {
fd int // epoll fd
wfd int // wake fd
notes noteQueue
}
func (p *Poll) Trigger(note interface{}) error {
p.notes.Add(note)
syscall.Write(p.wfd, (*(*[8]byte)(unsafe.Pointer(&x)))[:])
...
}
func (p *Poll) Wait(iter func(fd int, note interface{}) error) error {
events := make([]syscall.EpollEvent, 64)
for {
n, err := syscall.EpollWait(p.fd, events, 100)
if err != nil && err != syscall.EINTR {
return err
}
if err := p.notes.ForEach(func(note interface{}) error {
return iter(0, note)
}); err != nil {
return err
}
for i := 0; i < n; i++ {
if fd := int(events[i].Fd); fd != p.wfd {
if err := iter(fd, nil); err != nil {
return err
}
} else if fd == p.wfd {
var data [8]byte
syscall.Read(p.wfd, data[:])
}
}
}
}
调用 Trigger,会把参数 note 传入 noteQueue,并唤醒 epoll。note 会在 epoll 唤醒后被回调函数 iter 处理。
note 有三种:
iter 中的 err 会被传播到最外层,终止 poll.Wait
iter 函数除了处理 note,也会处理 fd 事件:
// evio_unix.go
func loopRun(s *server, l *loop) {
...
l.poll.Wait(func(fd int, note interface{}) error {
if fd == 0 {
return loopNote(s, l, note)
}
c := l.fdconns[fd]
switch {
case c == nil:
return loopAccept(s, l, fd)
case !c.opened:
return loopOpened(s, l, c)
case len(c.out) > 0:
return loopWrite(s, l, c)
case c.action != None:
return loopAction(s, l, c)
default:
return loopRead(s, l, c)
}
})
}
evio listener 事件会被所有 loop 监听:
// evio_unix.go
func serve(events Events, listeners []*listener) error {
numLoops := events.NumLoops
...
for i := 0; i < numLoops; i++ {
l := &loop{
idx: i,
poll: internal.OpenPoll(),
packet: make([]byte, 0xFFFF),
fdconns: make(map[int]*conn),
}
for _, ln := range listeners {
l.poll.AddRead(ln.fd)
}
s.loops = append(s.loops, l)
}
...
for _, l := range s.loops {
go loopRun(s, l)
}
...
}
经过负载均衡的选择,只有一个 loop Accept 并监听新 Conn 的事件:
// evio_unix.go
func loopAccept(s *server, l *loop, fd int) error {
for i, ln := range s.lns {
if ln.fd == fd {
if len(s.loops) > 1 {
switch s.balance {
case LeastConnections: ...
case RoundRobin: ...
}
}
...
nfd, sa, err := syscall.Accept(fd)
c := &conn{fd: nfd, sa: sa, lnidx: i, loop: l}
c.out = nil
l.fdconns[c.fd] = c
l.poll.AddReadWrite(c.fd)
break
}
}
...
}
入口函数启动 numLoops 个 loop 协程和 len(listeners) 个监听协程:
// evio_std.go
func stdserve(events Events, listeners []*listener) error {
...
for i := 0; i < numLoops; i++ {
go stdloopRun(s, s.loops[i])
}
for i := 0; i < len(listeners); i++ {
go stdlistenerRun(s, listeners[i], i)
}
}
有新链接到达,监听协程为 net/conn 开新协程:
func stdlistenerRun(s *stdserver, ln *listener, lnidx int) {
...
for {
...
conn, err := ln.ln.Accept()
l := s.loops[int(atomic.AddUintptr(&s.accepted, 1))%len(s.loops)]
c := &stdconn{conn: conn, loop: l, lnidx: lnidx}
l.ch <- c
go func(c *stdconn) {
var packet [0xFFFF]byte
for {
n, err := c.conn.Read(packet[:])
l.ch <- &stdin{c, append([]byte{}, packet[:n]...)}
}
}(c)
}
}
读写事件由 net/conn 库负责,loop 协程执行 Events 回调函数:
func stdloopRun(s *stdserver, l *stdloop) {
...
for {
select {
case <-tick: ...
case v := <-l.ch:
switch v := v.(type) {
case error:
err = v
case *stdconn:
err = stdloopAccept(s, l, v)
case *stdin:
err = stdloopRead(s, l, v.c, v.in)
case *stdudpconn:
err = stdloopReadUDP(s, l, v)
case *stderr:
err = stdloopError(s, l, v.c, v.err)
case wakeReq:
err = stdloopRead(s, l, v.c, nil)
}
}
...
}
}
不支持主动写