Go netpoll (上篇)- 数据结构和初始化

admin 2023年6月13日18:14:52评论9 views字数 8781阅读29分16秒阅读模式

概述

下面是一个基础的服务器网络程序,主要包含如下功能:

  • 监听 TCP 连接,绑定 8888 端口
  • 收到新的客户端连接后,启动一个新的 goroutine 进行处理
  • 收到客户端的数据后,不做任何处理,原样返回
package main

import (
 "log"
 "net"
)

func main() {
 // 初始化监听
 listener, err := net.ListenTCP("tcp", &net.TCPAddr{
  IP:   []byte("127.0.0.1"),
  Port: 8888,
 })
 if err != nil {
  panic(err)
 }

 for {
  // 接收请求
  conn, err := listener.Accept()
  if err != nil {
   panic(err)
  }

  // 启动 1 个 goroutine 处理请求
  go handle(conn)
 }
}

// 处理客户端连接请求
func handle(conn net.Conn) {
 defer func() {
  _ = conn.Close()
 }()

 buf := make([]byte1024)
 for {
  // 接收数据
  n, err := conn.Read(buf[:])
  if err != nil {
   log.Printf("conn Read %v", err)
   break
  }
  // 如果接收到了数据,原样返回
  if n > 0 {
   // 发送数据
   _, err = conn.Write(buf)
  }
 }
}

上述代码采用了类似 同步模型 代码的方式实现了功能,但是这种方式真的可以支撑高性能网络编程吗?答案就在隐藏在同步模型后面的底层系统调用和网络轮询器。

前置知识复习

在正式开始研究源代码之前,先来复习两个基础知识点。

1. 多路复用接口

I/O 多路复用于处理同一个事件循环中的多个 I/O 事件,这里的「多路」指多个 IO 事件,「复用」指处理事件的程序 (线程) 是同一个。

Go 网络标准库和一般的 接口 约束形式不同,并没有明确给出具体的 多路复用接口,但是不同平台上面都实现了如下几个方法:

// 初始化网络轮询器 
func netpollinit() {}

// 检测网络文件描述符是否被网络轮询器使用
func netpollIsPollDescriptor(fd uintptr) bool {}

// 创建监听事件并监听网络文件描述符
func netpollopen(fd uintptr, pd *pollDesc) int32 {}

// 删除网络文件描述符
func netpollclose(fd uintptr) int32 {}

// 检测网络轮询器并返回已经就绪的 goroutine 列表
func netpoll(delay int64) gList {}

// 唤醒网络轮询器
func netpollBreak() {}

例如 Linux 实现直接复用了底层 epoll 的相关方法, 方法定义在 $GOROOT/src/runtime/netpoll_epoll.go 文件中,MacOS 实现直接复用了底层 kqueue, 方法定义在 $GOROOT/src/runtime/netpoll_kqueue.go 文件中,其他平台以此类推。

最后,编译器利用条件编译规则,根据不同的平台编译对应的代码,例如 Linux 直接编译 $GOROOT/src/runtime/netpoll_epoll.go 文件。

2. epoll API

epoll 是 Linux 系统提供的一种 I/O 多路复用机制,它可以同时监听多个文件描述符的 I/O 事件,当其中任意一个文件描述符发生 I/O 事件时,就会触发相应的回调函数。与传统的 select 和 poll 模型相比,epoll 的性能更好,具有更高的可扩展性和更好的业务逻辑处理能力。

epoll 的三个核心 API 如下:

  1. epoll_create: 创建一个新的 epoll 实例,返回一个 epoll 文件描述符,该文件描述符可用于 epoll_ctlepoll_wait 函数调用
  2. epoll_ctl   : 管理 epoll 实例中的所有文件描述符 (内部使用红黑树数据结构进行管理),可以注册、修改或删除要监听的文件描述符,设置相应的事件类型和回调函数
  3. epoll_wait  : 等待任意文件描述符监听的事件发生,当有事件触发时,函数返回一个非零值,并将所有到达的事件按顺序存入队列 (数组) 中

内部实现

结合文章开头的示例代码,接下来我们一起探究 网络轮询器 的内部实现,相关文件目录为 $GOROOT/src/runtime,笔者的 Go 版本为 go1.19 linux/amd64

本文着重分析一下 netpoll 的数据结构以及 IO 读写流程中涉及到的一些底层方法。

文件描述符数据结构

文件描述符

FD 对象表示最基础的文件描述符抽象,net 和 os 包使用该类型来表示网络连接或操作系统文件。

type FD struct {
 // 对 Sysfd 加锁,串行化 Read 和 Write
 fdmu fdMutex

 // 操作系统的文件描述符
 Sysfd int

 // 网络轮询 IO 描述符
 pd pollDesc

 // 描述符关闭信号
 csema uint32

 // 是否阻塞模式
 isBlocking uint32

 // 区分当前描述符是一个 stream, 还是一个基于包的描述符 (区分 TCP/UDP)
 // 不可变
 IsStream bool

 // 读取零字节是否表示 EOF
 ZeroReadIsEOF bool

 // 区分当前描述符是一个文件,还是一个 socket
 isFile bool
}

文件描述符初始化方法如下:

func (fd *FD) Init(net string, pollable bool) error {
    ...
 err := fd.pd.init(fd)
    ...
 return err
}

网络文件描述符

netFD 对象表示网络文件描述符。

type netFD struct {
 pfd poll.FD // 包装了一个 FD 结构体

 // 下列字段在 Close 之前不可变
 family      int
 sotype      int
 isConnected bool
 net         string
 laddr       Addr
 raddr       Addr
}

newFD 方法实例化一个 netFD 对象,并返回该对象的指针。

func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
 ret := &netFD{
  ...
 }
 return ret, nil
}

网络文件描述符方法如下:

func (fd *netFD) init() error {
 return fd.pfd.Init(fd.net, true)
}

网络轮询 IO 文件描述符

pollDesc 对象表示网络轮询 IO 文件描述符,主要用于被 Go 的网络轮询器监听状态变化,是网络底层实现中的核心对象。

这里有一个需要学习的知识点: rg 字段和 wg 字段的数据类型都是 atomic.Uintptr, 而且可以用来表示 4 种数据:

  1. pdReady 信号
  2. pdWait 信号
  3. goroutine
  4. nil
// pollDesc 包含两种信号量,rg 和 wg, 可以表示多种状态
// Tips: 通过将字段设置为 atomic.Uintptr 类型 (效果和 guintptr 类似),可以支持多种类型表示

// 几种信号量状态:
// pdReady - IO 准备就绪
// pdWait - goroutine 准备休眠  
// G pointer - goroutine 阻塞
const (
    pdReady uintptr = 1
    pdWait  uintptr = 2
)

type pollDesc struct {
 link *pollDesc // 链表结构 (后面的元素) 指针
 fd   uintptr   
 
 atomicInfo atomic.Uint32
 
 rg atomic.Uintptr // 表示信号量,可能为 pdReady、pdWait、等待文件描述符可读的 goroutine 或者 nil
 wg atomic.Uintptr // 表示信号量,可能为 pdReady、pdWait、等待文件描述符可写的 goroutine 或者 nil

 lock    mutex     // 保护下面的字段
 closing bool
 rseq    uintptr   // 表示文件描述符被重用或者计时器被重置
 rt      timer     // 可读截至时间计时器
 rd      int64     // 等待文件描述符可读截至时间,-1 表示过期 (goroutine 被唤醒)
 wseq    uintptr   // 表示文件描述符被重用或者计时器被重置
 wt      timer     // 可写截至时间计时器
 wd      int64     // 等待文件描述符可写截至时间,-1 表示过期 (goroutine 被唤醒)
}

轮询文件描述符管理

pollCache 对象用来管理网络 IO 文件描述符,内置了一个互斥锁字段和一个 pollDesc 对象链表。

type pollCache struct {
 lock  mutex
 // 指向一个 pollDesc 链表
 first *pollDesc
}

数据结构图

Go netpoll (上篇)- 数据结构和初始化

网络文件描述符

Listen 流程

Go netpoll (上篇)- 数据结构和初始化

TCP 监听流程图

Listener 接口

type Listener interface {
 // 返回一个实现了 Conn 接口的连接实例
 Accept() (Conn, error)
 
 Close() error
 
 Addr() Addr
}

TCP 监听对象

type TCPListener struct {
 // 包装了一个 netFD 对象
 fd *netFD   
 lc ListenConfig
}

TCP 监听

ListenTCP 方法返回一个 TCP 监听对象的指针。

func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) {
 ...
 
 sl := &sysListener{network: network, address: laddr.String()}
 ln, err := sl.listenTCP(context.Background(), laddr)
 
 ...

 return ln, nil
}

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
 fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0"listen", sl.ListenConfig.Control)
    
 ...
 
 return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(stringstring, syscall.RawConn) error(fd *netFD, err error) {
 ...
 
 family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
 return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

获取系统配置

listenerBacklog 方法缓存了系统全连接队列配置参数值,内部通过内嵌 sync.Once 的方式,保证了仅调用一次 maxListenerBacklog 方法。

func listenerBacklog() int {
 listenerBacklogCache.Do(func() { listenerBacklogCache.val = maxListenerBacklog() })
 return listenerBacklogCache.val
}

maxListenerBacklog 方法用于获取系统全连接队列配置参数值。

// Linux 读取配置文件
func maxListenerBacklog() int {
 fd, err := open("/proc/sys/net/core/somaxconn")
 ...
 n, _, ok := dtoi(f[0])
 return n
}

创建 socket

socket 方法返回一个网络文件描述符,该描述符使用网络轮询器异步接收数据。

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(stringstring, syscall.RawConn) error(fd *netFD, err error) {
 // 使用系统调用创建一个 socket 文件描述符
 // 并将 socket 文件描述符包装为 netFD 对象 
 s, err := sysSocket(family, sotype, proto)
 if fd, err = newFD(s, family, sotype, net); err != nil {
     poll.CloseFunc(s)
     return nil, err
 }
 
 ...

 if laddr != nil && raddr == nil {
  switch sotype {
   // 基于流: TCP
   case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
     // 1. 获取系统配置
     // 2. 绑定并监听端口
     if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
        ...
     }
  
   // 基于数据报: UDP
   case syscall.SOCK_DGRAM:
      ...
   }
 }

 return fd, nil
}

绑定并监听端口

listenStream 方法内部实现了 TCP 的绑定端口和监听端口,并完成了 epoll 的初始化工作。

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(stringstring, syscall.RawConn) errorerror {
 ...
 
 var lsa syscall.Sockaddr
 if lsa, err = laddr.sockaddr(fd.family); err != nil {
  return err
 }

 ...
 
 // 绑定端口由系统调用实现
 if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
  return os.NewSyscallError("bind", err)
 }
 // 监听端口由系统调用实现
 if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
  return os.NewSyscallError("listen", err)
 }
 // 初始化 epoll
 if err = fd.init(); err != nil {
    return err
 }
 
 ...
 
 return nil
}

epoll 初始化

serverInit 类型是 sync.Once 的类型别名,保证了 poll_runtime_pollServerInit 方法只会被调用一次 (也就是单个进程全局只有一个 epoll 实例,避免惊群效应)。

func (pd *pollDesc) init(fd *FD) error {
 // runtime_pollServerInit 通过链接器指向了 poll_runtime_pollServerInit
 // 初始化 epoll
 serverInit.Do(runtime_pollServerInit)
 
 // runtime_pollOpen 通过链接器指向了 poll_runtime_pollOpen
 // 将文件描述符加入 epoll 监听
 ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
 ....
 return nil
}
func poll_runtime_pollServerInit() {
 netpollGenericInit()
}

func netpollGenericInit() {
 if atomic.Load(&netpollInited) == 0 {
        ...
  if netpollInited == 0 {
   netpollinit()
  }
  ...
 }
}

netpollinit 方法实现了 多路复用接口,主要用于网络轮询器 epoll 具体的初始化工作,和上面的 netpollGenericInit 方法一样,该方法也只会被调用一次。

var (
    // epoll 全局对象 (也是一个文件描述符)
    // 相当于调用 epoll_create 函数返回的对象
    // 后续的 epoll_ctl, epoll_wait 函数都是基于这个对象操作的
    epfd int32 = -1

    // 数据读写管道
    netpollBreakRd, netpollBreakWr uintptr

    // 标识变量,避免重复调用 netpollBreak 方法
    netpollWakeSig uint32
)

func netpollinit() {
    // 创建 epoll 描述符,赋值到全局变量 epfd
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd < 0 {
      epfd = epollcreate(1024)
      ...
    }

    // 创建一个通信管道
    r, w, errno := nonblockingPipe()

   ...

    // 将用于读取数据的文件描述符转换为 epollevent 结构,进行监听
    ev := epollevent{
      events: _EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd

    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)

    ...

    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

netpollopen 方法实现了 多路复用接口,将新的文件描述符和监听事件加入到全局变量 epfd 表示的网络轮询文件描述符。

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    ...
    errno := netpollopen(fd, pd)
    ...
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

小结

ListenTCP 方法内部实现了创建 socket,绑定端口,监听端口三个操作,相对于传统的 C 系列语言编程,将初始化过程简化为一个方法 API, 当方法执行完成后,epoll 也已经完成初始化工作,进入轮询状态等待连接到来以及 IO 事件。


Go netpoll (上篇)- 数据结构和初始化

ListenTCP

想要了解Go更多内容,欢迎扫描下方👇关注公众号,回复关键词 [实战群]  ,就有机会进群和我们进行交流

Go netpoll (上篇)- 数据结构和初始化

分享、在看与点赞Go Go netpoll (上篇)- 数据结构和初始化

原文始发于微信公众号(GoCN):Go netpoll (上篇)- 数据结构和初始化

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2023年6月13日18:14:52
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   Go netpoll (上篇)- 数据结构和初始化http://cn-sec.com/archives/1804043.html

发表评论

匿名网友 填写信息