概述
下面是一个基础的服务器网络程序,主要包含如下功能:
-
监听 TCP 连接,绑定 8888 端口 -
收到新的客户端连接后,启动一个新的 goroutine
进行处理 -
收到客户端的数据后,不做任何处理,原样返回
package main
import (
"log"
"net"
)
func main() {
// 初始化监听
listener, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: []byte("127.0.0.1"),
Port: 8888,
})
if err != nil {
panic(err)
}
for {
// 接收请求
conn, err := listener.Accept()
if err != nil {
panic(err)
}
// 启动 1 个 goroutine 处理请求
go handle(conn)
}
}
// 处理客户端连接请求
func handle(conn net.Conn) {
defer func() {
_ = conn.Close()
}()
buf := make([]byte, 1024)
for {
// 接收数据
n, err := conn.Read(buf[:])
if err != nil {
log.Printf("conn Read %v", err)
break
}
// 如果接收到了数据,原样返回
if n > 0 {
// 发送数据
_, err = conn.Write(buf)
}
}
}
上述代码采用了类似 同步模型 代码的方式实现了功能,但是这种方式真的可以支撑高性能网络编程吗?答案就在隐藏在同步模型后面的底层系统调用和网络轮询器。
前置知识复习
在正式开始研究源代码之前,先来复习两个基础知识点。
1. 多路复用接口
I/O 多路复用于处理同一个事件循环中的多个 I/O 事件,这里的「多路」指多个 IO 事件,「复用」指处理事件的程序 (线程) 是同一个。
Go 网络标准库和一般的 接口
约束形式不同,并没有明确给出具体的 多路复用接口,但是不同平台上面都实现了如下几个方法:
// 初始化网络轮询器
func netpollinit() {}
// 检测网络文件描述符是否被网络轮询器使用
func netpollIsPollDescriptor(fd uintptr) bool {}
// 创建监听事件并监听网络文件描述符
func netpollopen(fd uintptr, pd *pollDesc) int32 {}
// 删除网络文件描述符
func netpollclose(fd uintptr) int32 {}
// 检测网络轮询器并返回已经就绪的 goroutine 列表
func netpoll(delay int64) gList {}
// 唤醒网络轮询器
func netpollBreak() {}
例如 Linux
实现直接复用了底层 epoll
的相关方法, 方法定义在 $GOROOT/src/runtime/netpoll_epoll.go
文件中,MacOS
实现直接复用了底层 kqueue
, 方法定义在 $GOROOT/src/runtime/netpoll_kqueue.go
文件中,其他平台以此类推。
最后,编译器利用条件编译规则,根据不同的平台编译对应的代码,例如 Linux
直接编译 $GOROOT/src/runtime/netpoll_epoll.go
文件。
2. epoll API
epoll 是 Linux 系统提供的一种 I/O 多路复用机制,它可以同时监听多个文件描述符的 I/O 事件,当其中任意一个文件描述符发生 I/O 事件时,就会触发相应的回调函数。与传统的 select 和 poll 模型相比,epoll 的性能更好,具有更高的可扩展性和更好的业务逻辑处理能力。
epoll 的三个核心 API 如下:
-
epoll_create
: 创建一个新的epoll
实例,返回一个epoll
文件描述符,该文件描述符可用于epoll_ctl
和epoll_wait
函数调用 -
epoll_ctl
: 管理epoll
实例中的所有文件描述符 (内部使用红黑树数据结构进行管理),可以注册、修改或删除要监听的文件描述符,设置相应的事件类型和回调函数 -
epoll_wait
: 等待任意文件描述符监听的事件发生,当有事件触发时,函数返回一个非零值,并将所有到达的事件按顺序存入队列 (数组) 中
内部实现
结合文章开头的示例代码,接下来我们一起探究 网络轮询器
的内部实现,相关文件目录为 $GOROOT/src/runtime
,笔者的 Go 版本为 go1.19 linux/amd64
。
本文着重分析一下 netpoll
的数据结构以及 IO 读写流程中涉及到的一些底层方法。
文件描述符数据结构
文件描述符
FD 对象表示最基础的文件描述符抽象,net 和 os 包使用该类型来表示网络连接或操作系统文件。
type FD struct {
// 对 Sysfd 加锁,串行化 Read 和 Write
fdmu fdMutex
// 操作系统的文件描述符
Sysfd int
// 网络轮询 IO 描述符
pd pollDesc
// 描述符关闭信号
csema uint32
// 是否阻塞模式
isBlocking uint32
// 区分当前描述符是一个 stream, 还是一个基于包的描述符 (区分 TCP/UDP)
// 不可变
IsStream bool
// 读取零字节是否表示 EOF
ZeroReadIsEOF bool
// 区分当前描述符是一个文件,还是一个 socket
isFile bool
}
文件描述符初始化方法如下:
func (fd *FD) Init(net string, pollable bool) error {
...
err := fd.pd.init(fd)
...
return err
}
网络文件描述符
netFD 对象表示网络文件描述符。
type netFD struct {
pfd poll.FD // 包装了一个 FD 结构体
// 下列字段在 Close 之前不可变
family int
sotype int
isConnected bool
net string
laddr Addr
raddr Addr
}
newFD 方法实例化一个 netFD 对象,并返回该对象的指针。
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
...
}
return ret, nil
}
网络文件描述符方法如下:
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
网络轮询 IO 文件描述符
pollDesc 对象表示网络轮询 IO 文件描述符,主要用于被 Go 的网络轮询器监听状态变化,是网络底层实现中的核心对象。
这里有一个需要学习的知识点: rg 字段和 wg 字段的数据类型都是 atomic.Uintptr
, 而且可以用来表示 4 种数据:
-
pdReady 信号 -
pdWait 信号 -
goroutine -
nil
// pollDesc 包含两种信号量,rg 和 wg, 可以表示多种状态
// Tips: 通过将字段设置为 atomic.Uintptr 类型 (效果和 guintptr 类似),可以支持多种类型表示
// 几种信号量状态:
// pdReady - IO 准备就绪
// pdWait - goroutine 准备休眠
// G pointer - goroutine 阻塞
const (
pdReady uintptr = 1
pdWait uintptr = 2
)
type pollDesc struct {
link *pollDesc // 链表结构 (后面的元素) 指针
fd uintptr
atomicInfo atomic.Uint32
rg atomic.Uintptr // 表示信号量,可能为 pdReady、pdWait、等待文件描述符可读的 goroutine 或者 nil
wg atomic.Uintptr // 表示信号量,可能为 pdReady、pdWait、等待文件描述符可写的 goroutine 或者 nil
lock mutex // 保护下面的字段
closing bool
rseq uintptr // 表示文件描述符被重用或者计时器被重置
rt timer // 可读截至时间计时器
rd int64 // 等待文件描述符可读截至时间,-1 表示过期 (goroutine 被唤醒)
wseq uintptr // 表示文件描述符被重用或者计时器被重置
wt timer // 可写截至时间计时器
wd int64 // 等待文件描述符可写截至时间,-1 表示过期 (goroutine 被唤醒)
}
轮询文件描述符管理
pollCache 对象用来管理网络 IO 文件描述符,内置了一个互斥锁字段和一个 pollDesc 对象链表。
type pollCache struct {
lock mutex
// 指向一个 pollDesc 链表
first *pollDesc
}
数据结构图
Listen 流程
Listener 接口
type Listener interface {
// 返回一个实现了 Conn 接口的连接实例
Accept() (Conn, error)
Close() error
Addr() Addr
}
TCP 监听对象
type TCPListener struct {
// 包装了一个 netFD 对象
fd *netFD
lc ListenConfig
}
TCP 监听
ListenTCP 方法返回一个 TCP 监听对象的指针。
func ListenTCP(network string, laddr *TCPAddr) (*TCPListener, error) {
...
sl := &sysListener{network: network, address: laddr.String()}
ln, err := sl.listenTCP(context.Background(), laddr)
...
return ln, nil
}
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
...
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
...
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}
获取系统配置
listenerBacklog 方法缓存了系统全连接队列配置参数值,内部通过内嵌 sync.Once 的方式,保证了仅调用一次 maxListenerBacklog 方法。
func listenerBacklog() int {
listenerBacklogCache.Do(func() { listenerBacklogCache.val = maxListenerBacklog() })
return listenerBacklogCache.val
}
maxListenerBacklog 方法用于获取系统全连接队列配置参数值。
// Linux 读取配置文件
func maxListenerBacklog() int {
fd, err := open("/proc/sys/net/core/somaxconn")
...
n, _, ok := dtoi(f[0])
return n
}
创建 socket
socket 方法返回一个网络文件描述符,该描述符使用网络轮询器异步接收数据。
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// 使用系统调用创建一个 socket 文件描述符
// 并将 socket 文件描述符包装为 netFD 对象
s, err := sysSocket(family, sotype, proto)
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
...
if laddr != nil && raddr == nil {
switch sotype {
// 基于流: TCP
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
// 1. 获取系统配置
// 2. 绑定并监听端口
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
...
}
// 基于数据报: UDP
case syscall.SOCK_DGRAM:
...
}
}
return fd, nil
}
绑定并监听端口
listenStream 方法内部实现了 TCP 的绑定端口和监听端口,并完成了 epoll
的初始化工作。
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
...
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
...
// 绑定端口由系统调用实现
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// 监听端口由系统调用实现
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// 初始化 epoll
if err = fd.init(); err != nil {
return err
}
...
return nil
}
epoll 初始化
serverInit 类型是 sync.Once
的类型别名,保证了 poll_runtime_pollServerInit 方法只会被调用一次 (也就是单个进程全局只有一个 epoll 实例,避免惊群效应)。
func (pd *pollDesc) init(fd *FD) error {
// runtime_pollServerInit 通过链接器指向了 poll_runtime_pollServerInit
// 初始化 epoll
serverInit.Do(runtime_pollServerInit)
// runtime_pollOpen 通过链接器指向了 poll_runtime_pollOpen
// 将文件描述符加入 epoll 监听
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
....
return nil
}
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
...
if netpollInited == 0 {
netpollinit()
}
...
}
}
netpollinit 方法实现了 多路复用接口
,主要用于网络轮询器 epoll
具体的初始化工作,和上面的 netpollGenericInit 方法一样,该方法也只会被调用一次。
var (
// epoll 全局对象 (也是一个文件描述符)
// 相当于调用 epoll_create 函数返回的对象
// 后续的 epoll_ctl, epoll_wait 函数都是基于这个对象操作的
epfd int32 = -1
// 数据读写管道
netpollBreakRd, netpollBreakWr uintptr
// 标识变量,避免重复调用 netpollBreak 方法
netpollWakeSig uint32
)
func netpollinit() {
// 创建 epoll 描述符,赋值到全局变量 epfd
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
...
}
// 创建一个通信管道
r, w, errno := nonblockingPipe()
...
// 将用于读取数据的文件描述符转换为 epollevent 结构,进行监听
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
...
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
netpollopen 方法实现了 多路复用接口
,将新的文件描述符和监听事件加入到全局变量 epfd
表示的网络轮询文件描述符。
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
...
errno := netpollopen(fd, pd)
...
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
小结
ListenTCP 方法内部实现了创建 socket,绑定端口,监听端口三个操作,相对于传统的 C 系列语言编程,将初始化过程简化为一个方法 API, 当方法执行完成后,epoll
也已经完成初始化工作,进入轮询状态等待连接到来以及 IO 事件。
想要了解Go更多内容,欢迎扫描下方👇关注公众号,回复关键词 [实战群] ,就有机会进群和我们进行交流
原文始发于微信公众号(GoCN):Go netpoll (上篇)- 数据结构和初始化
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论