Golang网络IO模型源码分析-goroutinue+epoll

admin 2023年6月5日19:14:36评论20 views字数 18788阅读62分37秒阅读模式

👇我在这儿




在了解Golang的网络IO模型的具体实现之前,可以先了解一下Linux的几种经典网络模型


Linux经典网络模型


阻塞式IO(BIO)

应用进程从发起IO系统调用一直到返回结果,整个期间都是处于阻塞状态,当前线程被挂起

Golang网络IO模型源码分析-goroutinue+epoll


非阻塞式IO(NIO)

Socket可以设置为非阻塞,这样应用进程可发起IO系统调用后可以立刻返回。轮询发起的IO系统调用直到返回结束标识,需要应用进程不停地访问执行结果。

Golang网络IO模型源码分析-goroutinue+epoll


IO多路复用模型

可以将多个应用进程的Socket注册到一个Select上,用一个进程来监听该Select(阻塞)。这样就可以实现有一个Socket准备好立刻返回,并发起IO系统调用来完成数据读取。有select/poll/epoll这一系列的多路选择器,实现了对线程的复用,一个线程可以处理多个IO

Golang网络IO模型源码分析-goroutinue+epoll


Golang的代码实现

我们从服务端与客户端的连接建立看起:

// 服务端func server() {    listener, err := net.Listen("tcp", "127.0.0.1:8081")    if err != nil {        return    }    defer listener.Close()        for {        conn, err := listener.Accept()        if err != nil {            return        }        fmt.Println(conn)    }}
// 客户端func client() { conn, err := net.Dial("tcp", "127.0.0.1:8081") if err != nil { return } defer conn.Close()}


可以分别通过服务端的 net.Listen()、Accept() 与 客户端的 net.Dial() 函数来深入看看Golang的内部实现


服务端的 net.Listen()

进入net.Listen()方法,可以看到实际上对ListenConfig.Listen()方法的一层封装,直接进入 net/dial.go 文件可以查看到 ListenConfig 的成员函数 Listen():

// 在本地网络地址上监听通告func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {   // 根据协议名称和地址获得Internet协议族地址列表   addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)   if err != nil {      return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}   }   sl := &sysListener{      ListenConfig: *lc,      network:      network,      address:      address,   }   var l Listener   la := addrs.first(isIPv4)   switch la := la.(type) {   case *TCPAddr:      // TCP监听      l, err = sl.listenTCP(ctx, la)   case *UnixAddr:      // Unix      l, err = sl.listenUnix(ctx, la)   default:      return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}   }   if err != nil {      return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer   }   return l, nil}


实例中我们创建的是TCP连接,所以通过一个 switch-case 语句可以进入 TCP 类型对应的处理分支内,也就是函数 listenTCP() 为实际处理函数

// 在本地网络地址上监听通告func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {   // 根据协议名称和地址获得Internet协议族地址列表   addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)   if err != nil {      return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}   }   sl := &sysListener{      ListenConfig: *lc,      network:      network,      address:      address,   }   var l Listener   la := addrs.first(isIPv4)   switch la := la.(type) {   case *TCPAddr:      // TCP监听      l, err = sl.listenTCP(ctx, la)   case *UnixAddr:      // Unix      l, err = sl.listenUnix(ctx, la)   default:      return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}   }   if err != nil {      return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer   }   return l, nil}


这个方法首先通过对 internetSocket() 方法的调用,根据不同的平台生成socket描述符,返回值为关键结果

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {   if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {      raddr = raddr.toLocal(net)   }   family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)   // 返回socket描述符具体地址,此函数:1.调用sysSocket生产描述符 2.调用newFD封装描述符 构造netFD 3.调用netFD实现bind与listen   return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)}


在 internetSocket() 方法内部,可以看到会先将零地址值转换为具体的地址,随后调用了比较重要的 socket() 方法,这个方法实现了:

  1.调用sysSocket生产描述符     2.调用newFD封装描述符 构造netFD      3.调用netFD实现bindlisten   
// 返回一个网络文件描述用于网络轮询器进行异步 IOfunc socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {   // 根据操作系统获取对应的socket   s, err := sysSocket(family, sotype, proto)   if err != nil {      return nil, err   }   // 设置socket选项   if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {      poll.CloseFunc(s)      return nil, err   }   // 根据socket创建fd,实际上对于底层socket进行封装,包含文件句柄等关键信息   if fd, err = newFD(s, family, sotype, net); err != nil {      poll.CloseFunc(s)      return nil, err   }
/* 机器翻译: 此函数为以下应用程序创建网络文件描述符: - 打开被动流连接的端点持有者,称为流侦听器 - 打开目标非特定数据报连接的端点持有者,称为数据报侦听器 - 端点持有者打开活动流或特定于目标的数据报连接,称为拨号器 - 打开其他连接的端点持有者,例如与内核内的协议栈对话 对于流和数据报侦听器,它们只需要命名套接字 所以我们可以假设当 laddr 不为 nil 但 raddr 为 nil 时,是来自流或数据报侦听器的请求。否则我们假设它仅用于拨号器或其他连接持有者*/
// 监听 if laddr != nil && raddr == nil { switch sotype { // windows实现在socket_windows.go linux实现在socket_cloexec.go case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET: if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil { fd.Close() return nil, err } return fd, nil case syscall.SOCK_DGRAM: // UDP if err := fd.listenDatagram(laddr, ctrlFn); err != nil { fd.Close() return nil, err } return fd, nil } } // 发起连接,非listen socket处理 if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil { fd.Close() return nil, err } return fd, nil}

前面对 internetSocket() 方法调用时的参数 syscall.SOCK_STREAM,程序会进行 listenStream() 方法的调用。

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {   ...   // 将socket与 ip port进行绑定   if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {      return os.NewSyscallError("bind", err)   }   // 调用系统的 syscall.Listen,开启监听   if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {      return os.NewSyscallError("listen", err)   }   // 初始化 fd   if err = fd.init(); err != nil {      return err   }   lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)   fd.setAddr(fd.addrFunc()(lsa), nil)   return nil}



可以看到,到这里基本上印证了前面的三点作用。服务端通过对方法 internetSocket() 方法的调用,生成 netFD 实例并将地址返回并包装进 TCPListener 结构体中。


到这里对服务端的流程基本结束,让我们先看看客户端的连接过程再去看看 netFD 是怎么回事


客户端的 net.Dial()

进入net.Dial()方法,可以看到与服务端调用的Listen()类似,实际上是对Dialer.Dial()方法的一层封装,直接进入 net/dial.go 文件可以查看到 Dial():

// 连接到指定网络上的地址func (d *Dialer) Dial(network, address string) (Conn, error) {   // conn用TCPConn   return d.DialContext(context.Background(), network, address)}
// 使用提供的上下文连接到指定网络上的地址。// 提供的上下文必须是非空的。// 如果上下文在连接完成之前过期,则会返回错误。一旦连接成功,上下文的任何过期都不会影响连接
// 最终返回TCPConn > Conn 里面就是是基于关键网络描述符 netFD,与服务端对应上了func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) { ... var c Conn if len(fallbacks) > 0 { c, err = sd.dialParallel(ctx, primaries, fallbacks) } else { c, err = sd.dialSerial(ctx, primaries) } if err != nil { return nil, err }
if tc, ok := c.(*TCPConn); ok && d.KeepAlive >= 0 { setKeepAlive(tc.fd, true) ka := d.KeepAlive if d.KeepAlive == 0 { ka = defaultTCPKeepAlive } setKeepAlivePeriod(tc.fd, ka) testHookSetKeepAlive(ka) } return c, nil}


在 DialContext() 方法的最后声明了一个 Conn类型的interface(),通过断言转为 *TCPConn 类型进行后续操作

// TCPConn 是用于 TCP 网络连接的 Conn 接口的实现type TCPConn struct {   conn}
type conn struct { fd *netFD // 关键 网络描述符/句柄 无论net.listener还是dial都是基于此}

逐步进入源码中 conn 结构体就可以看到,客户端最后也是生成了一个包含 netFD 的结构体,至此客户端与服务端的连接过程就比较清晰了。


服务端与客户端的总结

Golang网络IO模型源码分析-goroutinue+epoll


netFD


poll.FD

接下来就是最重要的 netFD 结构体,server端在创建socket的时候会创建好 fd 结构体,可以从源码 net/fd_posix.go 中看到该结构体的详细信息

// Network file descriptor.// 包含在Conn结构中,而Conn又包含在TCPConn结构中,所以应该处于用户接口层type netFD struct {   // 包含两个重要的数据结构:Sysfd 和 pollDesc,用户层接口调用完成交互   // 1.前者是真正的系统文件描述符   // 2.后者是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法所实现的   pfd poll.FD
// immutable until Close family int sotype int isConnected bool // handshake completed or use of association with peer net string laddr Addr raddr Addr}


关键结构体:poll.FD,源码位置 internal/poll/fd_unix.go

// FD 是一个文件描述符。 net 和 os 包使用此类型作为表示网络连接或操作系统文件的较大类型的字段type FD struct {   // Lock sysfd and serialize access to Read and Write methods.   fdmu fdMutex   // 读写锁 锁定sysfd并序列化对Read和Write方法的访问
// System file descriptor. Immutable until Close. Sysfd int // 关键 系统文件描述符
// I/O poller. pd pollDesc // 底层事件驱动的封装 所有的读写超时等操作都是通过它
// Writev cache. iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed. 关闭文件时的信号 csema uint32
// Non-zero if this file has been set to blocking mode. isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a // packet-based descriptor like a UDP socket. Immutable. TCP还是UDP IsStream bool
// Whether a zero byte read indicates EOF. This is false for a // message based socket connection. 读取到0字节是是否为错误 ZeroReadIsEOF bool
// Whether this is a file rather than a network socket. 是否是系统文件或者网络socket连接 isFile bool}


进入到关键字段 pd(pollDesc) 的源码,位于internal/poll/fd_poll_runtime.go,可以看到:

// FD 是一个文件描述符。 net 和 os 包使用此类型作为表示网络连接或操作系统文件的较大类型的字段type FD struct {   // Lock sysfd and serialize access to Read and Write methods.   fdmu fdMutex   // 读写锁 锁定sysfd并序列化对Read和Write方法的访问
// System file descriptor. Immutable until Close. Sysfd int // 关键 系统文件描述符
// I/O poller. pd pollDesc // 底层事件驱动的封装 所有的读写超时等操作都是通过它
// Writev cache. iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed. 关闭文件时的信号 csema uint32
// Non-zero if this file has been set to blocking mode. isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a // packet-based descriptor like a UDP socket. Immutable. TCP还是UDP IsStream bool
// Whether a zero byte read indicates EOF. This is false for a // message based socket connection. 读取到0字节是是否为错误 ZeroReadIsEOF bool
// Whether this is a file rather than a network socket. 是否是系统文件或者网络socket连接 isFile bool}


结构 pollDesc() 中只包含了一个指针,内容是运行时的上下文信息,结构体通过自己的 init 方法进行初始化。

从源码中可以看到,先通过 sync.Once 来执行函数,保证只执行一次初始化,随后调用方法 runtime_pollOpen 来将 fd 加入 epoll中,实现了内核态用户态的关联切换。


几个关键的方法在文件开始的地方均有函数签名,但却没有具体实现:

func runtime_pollServerInit()func runtime_pollOpen(fd uintptr) (uintptr, int)func runtime_pollClose(ctx uintptr)func runtime_pollWait(ctx uintptr, mode int) intfunc runtime_pollWaitCanceled(ctx uintptr, mode int) intfunc runtime_pollReset(ctx uintptr, mode int) intfunc runtime_pollSetDeadline(ctx uintptr, d int64, mode int)func runtime_pollUnblock(ctx uintptr)func runtime_isPollServerDescriptor(fd uintptr) bool


从源码中可以看到,先通过 sync.Once 来执行函数,保证只执行一次初始化,随后调用方法 runtime_pollOpen 来将 fd 加入 epoll中,实现了内核态用户态的关联切换。


随后打开 runtime/netpoll.go 文件,可以看到上面只有签名的函数的具体实现


runtime/netpoll.go

不同系统的具体实现是有差异的,以 Linux 为例,具体实现位于 runtime/netpoll_epoll.go,但 darwin 系统的实现位于 runtime/netpoll_kqueque.go中,而 windows 实现位于runtime/netpoll_windows.go 中,这里以 Linux 系统为例:


初始化

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit// 初始化func poll_runtime_pollServerInit() {   netpollGenericInit()}
func netpollGenericInit() { ... // 实际调用 netpollinit() ...}
// 初始化网络轮询器 通过sync.Once 以及 netpollInited 状态值保证只遍历一次func netpollinit() { // 尝试使用create1系统调用创建epoll,否则使用create来创建epoll epfd = epollcreate1(_EPOLL_CLOEXEC) if epfd < 0 { epfd = epollcreate(1024) if epfd < 0 { println("runtime: epollcreate failed with", -epfd) throw("runtime: netpollinit failed") } closeonexec(epfd) } // IO多路复用 r, w, errno := nonblockingPipe() if errno != 0 { println("runtime: pipe failed with", -errno) throw("runtime: pipe failed") } ev := epollevent{ events: _EPOLLIN, } *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd // 调用三个关键函数的创建 errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) if errno != 0 { println("runtime: epollctl failed with", -errno) throw("runtime: epollctl failed") } netpollBreakRd = uintptr(r) netpollBreakWr = uintptr(w)

}


在初始化过程中,可以看到主要是对 epoll 的初始化,同时生成r w,r和w是最底层的读写fd,也是要进行复用的fd,之后在netpoll注册监听的所有fd都是复用这个netpollBreakRdnetpollBreakWr,在函数其中有三个关于epoll的关键函数:epollcreate1()、epollcreate()、epollctl(),从文件开头可以看到对几个函数的签名:

// 三大关键函数// 1.创建eventpoll,返回一个epfd句柄,后续根据这个句柄进行对fd的添加删除等操作func epollcreate(size int32) int32func epollcreate1(flags int32) int32
//go:noescape// 2.创建红黑树结构epitem 完成epoll监听事件咋注册 向epfd添加、删除、修改要监听的fdfunc epollctl(epfd, op, fd int32, ev *epollevent) int32
//go:noescape// 3.轮询获取就绪的epitem维护到双向链表,传入创建返回的epfd句柄,以及超时时间,返回就绪的fd句柄func epollwait(epfd int32, ev *epollevent, nev, timeout int32) int32func closeonexec(fd int32)



golang的 epoll 结构最初是通过 hash表来实现,后面改用红黑树来实现,通过方法 epollctl来维持红黑树结构,关于红黑树:

Golang网络IO模型源码分析-goroutinue+epoll

以上的三个函数(具体用汇编实现,不再研究)  :


1、epollcreate:在内核创建一个epoll对象,返回一个epfd句柄,后续根据句柄来操作对象

2、epollctl:创建红黑树结构体,将fd封装为一个epitem加入红黑树结构,并加一个回调函数注册到内核,状态改变时将item加入rdlist就绪队列中,同时维持红黑树的结构

3、epollwait:轮询获取就绪的epitem维护到rdlist双向链表中,传入创建返回的epfd句柄,以及超时时间,返回就绪的fd句柄。如果就绪队列为空就阻塞等待直到超时


整个过程整理流程可以如图所示,上面部分为用户态,下面部分为内核态:

Golang网络IO模型源码分析-goroutinue+epoll

open

随后我们可以看 poll_runtime_pollOpen() 的具体实现:

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen// pollDesc open 连接至此 打开func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {   //通过此,分配一个pollDesc,返回*pollDesc类型的变量,这个是runtime的pollDesc,和之前提到的不是完全相同的   pd := pollcache.alloc()   lock(&pd.lock)   wg := pd.wg.Load()   if wg != 0 && wg != pdReady {      throw("runtime: blocked write on free polldesc")   }   rg := pd.rg.Load()   if rg != 0 && rg != pdReady {      throw("runtime: blocked read on free polldesc")   }   pd.fd = fd   pd.closing = false   pd.setEventErr(false)   pd.rseq++   pd.rg.Store(0)   pd.rd = 0   pd.wseq++   pd.wg.Store(0)   pd.wd = 0   pd.self = pd   pd.publishInfo()   unlock(&pd.lock)   // 初始化 netpollopen   errno := netpollopen(fd, pd)   if errno != 0 {      // poll 缓存区      pollcache.free(pd)      return nil, int(errno)   }   // 关键 pollDesc的真正结构   return pd, 0}
// 监听文件描述符上的边缘触发事件,创建事件并加入监听// 这个函数将用户态协程的pollDesc信息写入到epoll所在的单独线程,从而实现用户态和内核态的关联func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent // 具体事件 // 注册event事件,这里使用了epoll的ET模式,相对于ET,ET需要每次产生时间是就处理时间,否则容易丢失事件 ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET // events记录上pd的指针 *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd // 系统调用将该fd加到eventpoll对象中,交由内核监听 return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)}

函数 poll_runtime_pollOpen() 最后返回的 pd 是 runtime 包的pollDesc的指针,与之前提到的不是相同的,这里我们可以理解到,之前我们提到的指针类型的 pollDesc 保存的地址就是这个 runtime.Polldesc 的实例地址!


后面实际调用了 netpollopen() 函数,这个函数将用户态协程的pollDesc信息写入到epoll所在的单独线程,从而实现用户态和内核态的关联,通过epollctl的调用将fd加入到epoll中,交由内核监听


到这里服务端的 net.Listener 创建过程就结束了,最后就是对服务端调用的Accept()的研究,关于epoll三大函数的 wait 也将在 Accept 中展开应用



Accept()


从前面的一系列过程可以得知,net.Listen()返回了一个 TCPListener,这个结构体实现了 Listener() 接口,直接找到他的 Accept() 方法来看源码,位置在 net/tcpscok.go 中:

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen// pollDesc open 连接至此 打开func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {   //通过此,分配一个pollDesc,返回*pollDesc类型的变量,这个是runtime的pollDesc,和之前提到的不是完全相同的   pd := pollcache.alloc()   lock(&pd.lock)   wg := pd.wg.Load()   if wg != 0 && wg != pdReady {      throw("runtime: blocked write on free polldesc")   }   rg := pd.rg.Load()   if rg != 0 && rg != pdReady {      throw("runtime: blocked read on free polldesc")   }   pd.fd = fd   pd.closing = false   pd.setEventErr(false)   pd.rseq++   pd.rg.Store(0)   pd.rd = 0   pd.wseq++   pd.wg.Store(0)   pd.wd = 0   pd.self = pd   pd.publishInfo()   unlock(&pd.lock)   // 初始化 netpollopen   errno := netpollopen(fd, pd)   if errno != 0 {      // poll 缓存区      pollcache.free(pd)      return nil, int(errno)   }   // 关键 pollDesc的真正结构   return pd, 0}
// 监听文件描述符上的边缘触发事件,创建事件并加入监听// 这个函数将用户态协程的pollDesc信息写入到epoll所在的单独线程,从而实现用户态和内核态的关联func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent // 具体事件 // 注册event事件,这里使用了epoll的ET模式,相对于ET,ET需要每次产生时间是就处理时间,否则容易丢失事件 ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET // events记录上pd的指针 *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd // 系统调用将该fd加到eventpoll对象中,交由内核监听 return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)}

可以看到,TCPListener的Accept经过包装,最后调用的函数为 fd.accept(),也就是netFD.Accept(),再次和我们前面的关键结构netFD关联了起来!

func (fd *netFD) accept() (netfd *netFD, err error) {   // 调用了 poll.FD 的 Accept() 方法   d, rsa, errcall, err := fd.pfd.Accept()   if err != nil {      if errcall != "" {         err = wrapSyscallError(errcall, err)      }      return nil, err   }   ...}
// Accept wraps the accept network call.func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { // 加锁 if err := fd.readLock(); err != nil { return -1, nil, "", err } defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil { return -1, nil, "", err } for { // 尝试直接获取客户端连接,成功直接返回 s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { return s, rsa, "", err } switch err { case syscall.EINTR: continue // 创建的socket非阻塞,没有新连接返回 EAGAIN 而不是阻塞 case syscall.EAGAIN: // 可轮询的调用 epoll wait来等待通知 if fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } case syscall.ECONNABORTED: // 这意味着监听队列中的套接字在我们接受()之前已经关闭 continue } return -1, nil, errcall, err }}


通过代码可以看到一些关键逻辑,其中关键函数有 accept() 与 waitRead(),来看一下 waitRead() 是如何做的:


func (pd *pollDesc) waitRead(isFile bool) error {   return pd.wait('r', isFile)}
func (pd *pollDesc) wait(mode int, isFile bool) error { if pd.runtimeCtx == 0 { return errors.New("waiting for unsupported file type") } res := runtime_pollWait(pd.runtimeCtx, mode) return convertErr(res, isFile)}

可以看到,像前面的函数一样,调用了 runtime 包的 epollwait,让我们再次回到 runtime/netpoll_epoll.go 来看看具体实现:

func (pd *pollDesc) waitRead(isFile bool) error {   return pd.wait('r', isFile)}
func (pd *pollDesc) wait(mode int, isFile bool) error { if pd.runtimeCtx == 0 { return errors.New("waiting for unsupported file type") } res := runtime_pollWait(pd.runtimeCtx, mode) return convertErr(res, isFile)}



关键函数为 netpollblock():


func (pd *pollDesc) waitRead(isFile bool) error {   return pd.wait('r', isFile)}
func (pd *pollDesc) wait(mode int, isFile bool) error { if pd.runtimeCtx == 0 { return errors.New("waiting for unsupported file type") } res := runtime_pollWait(pd.runtimeCtx, mode) return convertErr(res, isFile)}

到这里,Accept() 的执行逻辑也基本清楚了:


1、通过socket执行accept直接获取客户端连接

2、客户端没有连接返回 EAGAIN 信号量

3、调用runtime.poll_runtime_pollWait挂起当前协程并保存到pollDesc中

4、有新客户端连接来的时候恢复阻塞的协程重复执行第一步


但是这里没有关于 epoll wait 的执行逻辑呀,其实epoll wait的相关执行位于golang的协程调度代码中,也就是位置于runtime/proc.go 文件中,调用的是函数 netpoll()

// netpoll 轮询网络并返回可运行的准备就绪的 goroutines 列表,传入的参数会决定他的行为// - 参数 < 0: 无限期阻塞等待文件就绪// - 参数 == 0: 非阻塞轮询// - 参数 > 0: 阻塞定期轮询func netpoll(delay int64) gList {   if epfd == -1 {      return gList{}   }   var waitms int32   if delay < 0 {      waitms = -1   } else if delay == 0 {      waitms = 0   } else if delay < 1e6 {      waitms = 1   } else if delay < 1e15 {      waitms = int32(delay / 1e6)   } else {      // An arbitrary cap on how long to wait for a timer.      // 1e9 ms == ~11.5 days.      waitms = 1e9   }   // 声明一个epollevent事件,在epoll_wait系统调用的时候会给该数组赋值并返回一个索引值   // 之后遍历数组取出就绪的fd事件   var events [128]epolleventretry:   // 陷入系统调用,取出内核eventpoll中的rdlist,返回就绪的事件   n := epollwait(epfd, &events[0], int32(len(events)), waitms)   if n < 0 {      if n != -_EINTR {         println("runtime: epollwait on fd", epfd, "failed with", -n)         throw("runtime: netpoll failed")      }      // If a timed sleep was interrupted, just return to      // recalculate how long we should sleep now.      if waitms > 0 {         return gList{}      }      goto retry   }   var toRun gList   // 遍历evnet事件数组   for i := int32(0); i < n; i++ {      ev := &events[i]      if ev.events == 0 {         continue      }
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { if ev.events != _EPOLLIN { println("runtime: netpoll: break fd ready for", ev.events) throw("runtime: netpoll: break fd ready for something unexpected") } if delay != 0 { // netpollBreak could be picked up by a // nonblocking poll. Only read the byte // if blocking. var tmp [16]byte read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) atomic.Store(&netpollWakeSig, 0) } continue }
var mode int32 // 是否有就绪的读写事件,放入mode标志位 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { // 取出存入的pollDesc指针 pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) pd.setEventErr(ev.events == _EPOLLERR) // 具体实现在 netpoll.go 中 // 取出 pd 中的 rg 或 wg,后面放到运行队列 netpollready(&toRun, pd, mode) } } return toRun}
// 将就绪好的io事件,写入就绪的goroutinue队列func netpollready(toRun *gList, pd *pollDesc, mode int32) { var rg, wg *g if mode == 'r' || mode == 'r'+'w' { rg = netpollunblock(pd, 'r', true) } if mode == 'w' || mode == 'r'+'w' { wg = netpollunblock(pd, 'w', true) } // 将阻塞的goroutinue加入gList队列返回 if rg != nil { toRun.push(rg) } if wg != nil { toRun.push(wg) }}

结合前面的 epoll 流程图可以较快理解上面的代码,可以看到在协程调度过程中会获取准备好的socket并唤醒对应的Goroutinue


epoll的其他方法


close()

关于 close()的核心逻辑 同样位于 runtime/netpoll.go 与 runtime/netpoll_epoll.go 中

//go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose// 当发生某些情况,如断开连接,fd销毁都会调用到此func poll_runtime_pollClose(pd *pollDesc) {   if !pd.closing {      throw("runtime: close polldesc w/o unblock")   }   wg := pd.wg.Load()   if wg != 0 && wg != pdReady {      throw("runtime: blocked write on closing polldesc")   }   rg := pd.rg.Load()   if rg != 0 && rg != pdReady {      throw("runtime: blocked read on closing polldesc")   }   // 调用epoll_ctl系统调用,删除该fd在eventpoll上对应的epitem   netpollclose(pd.fd)   // 释放对应的pd   pollcache.free(pd)}
// 关闭func netpollclose(fd uintptr) int32 { var ev epollevent return -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev)}
// 释放内存 对应的pdfunc (c *pollCache) free(pd *pollDesc) { lock(&c.lock) pd.link = c.first c.first = pd unlock(&c.lock)}

aloc()

pollDesc的空间申请方法

// 关键 缓存pollDesc实现// go runtime调用 poll_runtime_pollOpen时,往epoll实例注册fd 首次调用次方法 初始大小 4kfunc (c *pollCache) alloc() *pollDesc {   lock(&c.lock)   // 首次   if c.first == nil {      const pdSize = unsafe.Sizeof(pollDesc{})      n := pollBlockSize / pdSize      if n == 0 {         n = 1      }      // Must be in non-GC memory because can be referenced      // only from epoll/kqueue internals.      // 分配不会被GC回收的内存,确保这些数据结构只能被 epoll和kqueue的内核空间去引用      // GC操作会在go runtime关闭时调用pollcache.free释放内存      mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)      for i := uintptr(0); i < n; i++ {         pd := (*pollDesc)(add(mem, i*pdSize))         pd.link = c.first         c.first = pd      }   }   // 往后每次会先判断链表头是否分配过值   pd := c.first   // 如果是则直接返回表头的pollDesc   // 常见优化设计 批量初始化数据进行缓存 提高netpoller吞吐量   c.first = pd.link   lockInit(&pd.lock, lockRankPollDesc)   unlock(&c.lock)   return pd}


Golang netpoll 总结

同样用一个流程图总结一下吧,可以与上面的流程图结合理解

Golang网络IO模型源码分析-goroutinue+epoll

来源:https://juejin.cn/post/7230745439125504061




Golang网络IO模型源码分析-goroutinue+epoll
快点击阅读原文报名吧~~

原文始发于微信公众号(GoCN):Golang网络IO模型源码分析-goroutinue+epoll

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2023年6月5日19:14:36
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   Golang网络IO模型源码分析-goroutinue+epollhttp://cn-sec.com/archives/1746767.html

发表评论

匿名网友 填写信息