searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

GO net库的非阻塞支持

2025-05-16 09:29:49
0
0

Go语言的net库通过以下机制实现非阻塞网络I/O,同时保持代码的简洁性:

  1. 异步系统调用封装
    底层对socket(SO_NONBLOCK)自动启用非阻塞模式,但开发者的调用接口仍表现为同步形式。当执行读写操作时,若内核缓冲区未就绪,立即返回EWOULDBLOCK错误。

  2. 事件驱动架构
    内部通过runtime.netpoll组件集成OS特定多路复用器:

    • Linux使用epoll
    • macOS使用kqueue
    • Windows使用IOCP
      该组件持续监听网络事件,形成高效的I/O就绪队列。
  3. 协程调度整合
    当Goroutine发起网络请求时:

  • 运行时将Goroutine挂起,将其与对应socket绑定
  • 当前线程(M)立即解绑,转去执行其他待处理的Goroutine
  • 内核通知I/O就绪后,调度器优先唤醒等待此事件的Goroutine
  1. 零回调的同步编程模型
    尽管底层采用非阻塞实现,开发者仍可编写直观的同步代码:

以Read为例子,函数代码如下:

// 位置: net/net.go
type conn struct {
fd *netFD
}

func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

// 位置: net/fd_posix.go
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError(readSyscallName, err)
}

// 位置: internal/poll/fd_unix.go
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc

// ...省略部分代码
}

func (fd *FD) Read(p []byte) (int, error) {
// ...省略部分代码
// 从Sysfd缓冲区读取数据写入goroutine缓冲区p
// 忽略中断信号 EINTER
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}

// 位置: internal/poll/fd_poll_runtime.go
type pollDesc struct {
runtimeCtx uintptr
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}

func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// ...省略部分代码

for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}

/ returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// poller持有的goroutine
// rg 是read goroutine
// wg 是write goroutine
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// 把goroutine置为等待状态
// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if gpp.CompareAndSwap(pdReady, 0) {
return true
}
if gpp.CompareAndSwap(0, pdWait) {
break
}

// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}

// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// 当前goroutine被唤醒
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
0条评论
作者已关闭评论
a****m
4文章数
0粉丝数
a****m
4 文章 | 0 粉丝
a****m
4文章数
0粉丝数
a****m
4 文章 | 0 粉丝
原创

GO net库的非阻塞支持

2025-05-16 09:29:49
0
0

Go语言的net库通过以下机制实现非阻塞网络I/O,同时保持代码的简洁性:

  1. 异步系统调用封装
    底层对socket(SO_NONBLOCK)自动启用非阻塞模式,但开发者的调用接口仍表现为同步形式。当执行读写操作时,若内核缓冲区未就绪,立即返回EWOULDBLOCK错误。

  2. 事件驱动架构
    内部通过runtime.netpoll组件集成OS特定多路复用器:

    • Linux使用epoll
    • macOS使用kqueue
    • Windows使用IOCP
      该组件持续监听网络事件,形成高效的I/O就绪队列。
  3. 协程调度整合
    当Goroutine发起网络请求时:

  • 运行时将Goroutine挂起,将其与对应socket绑定
  • 当前线程(M)立即解绑,转去执行其他待处理的Goroutine
  • 内核通知I/O就绪后,调度器优先唤醒等待此事件的Goroutine
  1. 零回调的同步编程模型
    尽管底层采用非阻塞实现,开发者仍可编写直观的同步代码:

以Read为例子,函数代码如下:

// 位置: net/net.go
type conn struct {
fd *netFD
}

func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

// 位置: net/fd_posix.go
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError(readSyscallName, err)
}

// 位置: internal/poll/fd_unix.go
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc

// ...省略部分代码
}

func (fd *FD) Read(p []byte) (int, error) {
// ...省略部分代码
// 从Sysfd缓冲区读取数据写入goroutine缓冲区p
// 忽略中断信号 EINTER
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}

// 位置: internal/poll/fd_poll_runtime.go
type pollDesc struct {
runtimeCtx uintptr
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}

func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// ...省略部分代码

for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}

/ returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// poller持有的goroutine
// rg 是read goroutine
// wg 是write goroutine
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// 把goroutine置为等待状态
// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if gpp.CompareAndSwap(pdReady, 0) {
return true
}
if gpp.CompareAndSwap(0, pdWait) {
break
}

// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}

// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// 当前goroutine被唤醒
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
文章来自个人专栏
文章 | 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0