Go netpoll (下篇)- 数据接收发送和关闭

admin 2023年6月16日16:32:11评论15 views字数 8369阅读27分53秒阅读模式

接收 TCP 连接流程

TCP 连接对象

type TCPConn struct {
  conn
}

type conn struct {
  fd *netFD
}

Conn 接口

Conn 表示通用的面向流的网络连接。

type Conn interface {
 Read(b []byte) (n int, err error)

 Write(b []byte) (n int, err error)

 Close() error

 LocalAddr() Addr

 RemoteAddr() Addr

 SetDeadline(t time.Time) error

 SetReadDeadline(t time.Time) error

 SetWriteDeadline(t time.Time) error
}

接收 TCP 连接

TCPListener (TCP 监听对象) 的 Accept 方法返回一个 TCP 连接对象。

func (l *TCPListener) Accept() (Conn, error) {
 ...
 c, err := l.accept()
 ...
 return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
 fd, err := ln.fd.accept()
 ...
 tc := newTCPConn(fd)
 ...
 return tc, nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
 d, rsa, errcall, err := fd.pfd.Accept()
 ...
 return netfd, nil
}

FD.Accept 方法内部不断轮询调用 accept 方法获取 TCP 连接并处理相应的错误。

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
 ...

 for {
  // 轮询调用 accept 方法获取 TCP 连接
  s, rsa, errcall, err := accept(fd.Sysfd)
  if err == nil {
   return s, rsa, "", err
  }
  switch err {
    ...
  }
  return -1nil, errcall, err
 }
}

accept 方法内部封装了一层 系统调用 accept,返回一个非阻塞的文件描述符。

func accept(s int) (int, syscall.Sockaddr, string, error) {
 // 先尝试 accept4 调用,如果报错了,改用 accept
 // nonblock: 设置为非阻塞模式

 // accept4 通过 1 次系统调用完成 accept 和设置 nonblock 两个操作
 ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)

 switch err {
 case nil:
  return ns, sa, ""nil
  ...
 }

 // accept 通过 2 次系统调用完成 accept 和设置 nonblock 两个操作
 ns, sa, err = AcceptFunc(s)

 ...

 if err = syscall.SetNonblock(ns, true); err != nil {
   ...
 }
 return ns, sa, ""nil
}

newTCPConn 方法返回一个包装好的 TCP 连接对象。

func newTCPConn(fd *netFD) *TCPConn {
 c := &TCPConn{conn{fd}}
 setNoDelay(c.fd, true)
 return c
}

接收 TCP 连接流程图

Go netpoll (下篇)- 数据接收发送和关闭

TCPAccept

数据接收和发送

接收方法

接收数据的对象是具体的 TCP 连接,所以从 conn.Read 方法开始。

func (c *conn) Read(b []byte) (int, error) {
 ...
 n, err := c.fd.Read(b)
 ...
 return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
 n, err = fd.pfd.Read(p)
 // 文件描述符保活机制
 runtime.KeepAlive(fd)
 return n, wrapSyscallError(readSyscallName, err)
}

FD.Read 方法内部不断轮询 系统调用 Read 并处理相应的错误。

func (fd *FD) Read(p []byte) (int, error) {
 ...

 for {
  n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
  if err != nil {
   n = 0
   if err == syscall.EAGAIN && fd.pd.pollable() {
    // 如果没有可用数据,抛出 syscall.EAGAIN
    // 将当前连接所在的 goroutine 休眠
    if err = fd.pd.waitRead(fd.isFile); err == nil {
     continue
    }
   }
  }
  err = fd.eofError(n, err)
  return n, err
 }
}

func (pd *pollDesc) waitRead(isFile bool) error {
 return pd.wait('r', isFile)
}


func (pd *pollDesc) wait(mode int, isFile bool) error {
 ...

 // runtime_pollWait 通过链接器指向了 poll_runtime_pollWait
 res := runtime_pollWait(pd.runtimeCtx, mode)
 return convertErr(res, isFile)
}

poll_runtime_pollWait 方法等待网络文件描述符准备好读或写 (读写取决于参数 mode)。

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
 ...

 for !netpollblock(pd, int32(mode), false) {
  errcode = netpollcheckerr(pd, int32(mode))
  if errcode != pollNoError {
   return errcode
  }
 }
 return pollNoError
}

netpollblock 方法用于检测网络文件描述符准备好读或写。

// 如果 IO 已经准备好,返回 true
// 如果 IO 已经超时或关闭,返回 false
// 如果 waitio 参数为 true, 阻塞等待 IO 完成, 忽略错误
// 禁止使用同一种模式并发调用 netpollblock
// 因为 pollDesc 只能为每种模式保存 1 个等待的 goroutine
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
 gpp := &pd.rg
 if mode == 'w' {
  gpp = &pd.wg
 }

 for {
  if gpp.CompareAndSwap(pdReady, 0) {
   return true
  }
  if gpp.CompareAndSwap(0, pdWait) {
   break
  }
  if v := gpp.Load(); v != pdReady && v != 0 {
   throw("runtime: double wait")
  }
 }

 if waitio || netpollcheckerr(pd, mode) == pollNoError {
  // 休眠 goroutine, 等待 IO 完成
  gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
 }

 ...

 return old == pdReady
}

func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
 if r {
  // 增加等待网络轮询器的 goroutine 数量
  // 调度器使用这个值决定是否阻塞,如果没有其他工作的情况下,调度器会阻塞等待网络轮询器的 IO 事件
  atomic.Xadd(&netpollWaiters, 1)
 }
 return r
}

发送方法

发送数据的对象是具体的 TCP 连接,所以从 conn.Write 方法开始。

func (c *conn) Write(b []byte) (int, error) {
 ...
 n, err := c.fd.Write(b)
 ...
 return n, err
}

func (fd *netFD) Write(p []byte) (nn int, err error) {
 nn, err = fd.pfd.Write(p)
 // 文件描述符保活机制
 runtime.KeepAlive(fd)
 return nn, wrapSyscallError(writeSyscallName, err)
}

FD.Write 方法内部不断轮询 系统调用 Write 并处理相应的错误。

func (fd *FD) Write(p []byte) (int, error) {
 ...

 var nn int
 for {
  ...

  n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])

  ...

  if err == syscall.EAGAIN && fd.pd.pollable() {
   if err = fd.pd.waitWrite(fd.isFile); err == nil {
    continue
   }
  }

  ...
 }
}

代码执行到这里,后面的流程就和 Read 接收数据 流程一样了,这里不再赘述。

func (pd *pollDesc) waitWrite(isFile bool) error {
 return pd.wait('w', isFile)
}

小结

Go netpoll (下篇)- 数据接收发送和关闭

数据发送和接收流程图

网络轮询器

netpoll 方法用于检测网络轮询器并返回已经就绪的 goroutine 列表。

// 轮询检测准备就绪的网络连接
// 返回一个可运行 (可读/可写/可读写) 的 goroutine 列表
// 参数规则:
//  delay < 0: 无限阻塞
//  delay == 0: 非阻塞
//  delay > 0: 阻塞时间 (单位: 纳秒)
func netpoll(delay int64) gList {
 if epfd == -1 {
  return gList{}
 }

 ...

 // 每次读取 128 个 IO 事件
 var events [128]epollevent
retry:
 // 调用 epoll_wait 获取接收到的 IO 事件
 n := epollwait(epfd, &events[0], int32(len(events)), waitms)
 if n < 0 {
  ...

  if waitms > 0 {
   return gList{}
  }
  goto retry
 }

 var toRun gList
 for i := int32(0); i < n; i++ {
  ev := &events[i]

  ...

  var mode int32
  if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
   mode += 'r'
  }
  if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
   mode += 'w'
  }
  if mode != 0 {
   pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
   pd.setEventErr(ev.events == _EPOLLERR)
   netpollready(&toRun, pd, mode)
  }
 }
 return toRun
}

netpollready 方法表示网络文件描述符关联的 IO 事件已经就绪,并将参数 pd 网络文件描述符内部的 goroutine 添加到参数队列中。

// 参数 toRun 是一个 goroutine 列表
// 参数 mode 规则
//  'r': IO 读
//  'w': IO 写
//  'r'+'w': IO 读写
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
 var rg, wg *g
 if mode == 'r' || mode == 'r'+'w' {
  rg = netpollunblock(pd, 'r'true)
 }
 if mode == 'w' || mode == 'r'+'w' {
  wg = netpollunblock(pd, 'w'true)
 }
 if rg != nil {
  toRun.push(rg)
 }
 if wg != nil {
  toRun.push(wg)
 }
}

netpollunblock 方法将网络文件描述符中的读信号或者写信号转换为 pdReady 状态,然后返回存储在内部的 goroutine。

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
 ...

 for {
  ...
  var new uintptr
  if ioready {
   new = pdReady
  }
  if gpp.CompareAndSwap(old, new) {
   if old == pdWait {
    old = 0
   }
   return (*g)(unsafe.Pointer(old))
  }
 }
}

小结

Go netpoll (下篇)- 数据接收发送和关闭

网络轮询器调用关系图

netpoll 方法会返回一个可运行的 goroutine 列表,然后调用方会将返回的 goroutine 逐个加入处理器的本地队列或者全局队列。 从图中可以看到调用方主要有 4 个,其中调度线程 schedule 和监控线程 sysmon 在 GMP 调度器一文中已经讲过了,这里不再赘述,剩下的 GC 和 STW 后面有机会再讲。

超时控制

接收数据超时

conn.SetReadDeadline 方法设置连接的接收数据超时时间。

func (c *conn) SetReadDeadline(t time.Time) error {
 ...
 if err := c.fd.SetReadDeadline(t); err != nil {
  return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
 }
 return nil
}

func (fd *netFD) SetWriteDeadline(t time.Time) error {
 return fd.pfd.SetWriteDeadline(t)
}

func (fd *FD) SetWriteDeadline(t time.Time) error {
 return setDeadlineImpl(fd, t, 'w')
}

func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
 ...

 runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
 return nil
}

poll_runtime_pollSetDeadline 方法会设置参数 pd 网络文件描述符内部的定时器 (goroutine 持有),并在定时器到期后进行相关的操作。

func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
 // 主要是对 pd 进行定时器的相关设置,这里直接跳过这部分内容
 ...

 // 如果截止时间已经过期,取消等待 IO 而导致的阻塞
 var rg, wg *g
 if pd.rd < 0 {
  rg = netpollunblock(pd, 'r'false)
 }
 if pd.wd < 0 {
  wg = netpollunblock(pd, 'w'false)
 }

 // 如果有取消读事件的 goroutine, 则进行唤醒
 if rg != nil {
  netpollgoready(rg, 3)
 }
    // 如果有取消写事件的 goroutine, 则进行唤醒
 if wg != nil {
  netpollgoready(wg, 3)
 }
}

func netpollgoready(gp *g, traceskip int) {
    atomic.Xadd(&netpollWaiters, -1)
    goready(gp, traceskip+1)
}

发送数据超时

发送数据超时和接收数据流程基本一致,只是调用的方法不同,这里就不再展开了。

关闭连接

conn.Close 方法用于关闭网络连接。

func (c *conn) Close() error {
 ...
 err := c.fd.Close()
 ...
 return err
}

func (fd *netFD) Close() error {
 runtime.SetFinalizer(fd, nil)
 return fd.pfd.Close()
}

func (fd *FD) Close() error {
 ...
 fd.pd.evict()
 ...
 return err
}

evict 方法会关闭网络文件描述符,并取消所有阻塞在等待该文件描述符的 IO 事件。

func (pd *pollDesc) evict() {
 ...
 runtime_pollUnblock(pd.runtimeCtx)
}

func poll_runtime_pollUnblock(pd *pollDesc) {
 ...

 pd.closing = true
 var rg, wg *g
 rg = netpollunblock(pd, 'r'false)
 wg = netpollunblock(pd, 'w'false)

 ...

 if rg != nil {
  netpollgoready(rg, 3)
 }
 if wg != nil {
  netpollgoready(wg, 3)
 }
}

流程图

Go netpoll (下篇)- 数据接收发送和关闭

关闭连接流程图

小结

本文用一个基础的服务器网络程序为示例,分析了网络标准库中的端口监听、接收连接、发送/接收数据, 关闭连接 4 个主要流程的 Linux 版本实现代码。 Go 网络标准库通过在底层封装 epoll 实现了 IO 多路复用,通过网络轮询器加 GMP 调度器避免了传统网络编程中的线程切换和 IO 阻塞,两者的完美配合是 Go 网络编程高性能的基石。

Go netpoll (上篇)- 数据结构和初始化

想要了解Go更多内容,欢迎扫描下方👇关注公众号,回复关键词 [实战群]  ,就有机会进群和我们进行交流

Go netpoll (下篇)- 数据接收发送和关闭

分享、在看与点赞Go Go netpoll (下篇)- 数据接收发送和关闭

原文始发于微信公众号(GoCN):Go netpoll (下篇)- 数据接收发送和关闭

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2023年6月16日16:32:11
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   Go netpoll (下篇)- 数据接收发送和关闭http://cn-sec.com/archives/1813885.html

发表评论

匿名网友 填写信息