searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Golang深入浅出——Channel结构解析

2023-07-12 14:01:59
14
0

简介

channel是golang中的一种数据结构,该数据类型也是一种引用类型。channel常用于协程间通信和数据共享,是golang“不要通过共享内存来通信,而应该通过通信来共享内存”灵魂的基石。

channel是一种协诚安全的数据类型,其底层采用带锁的环形队列存储消息。

 

类型定义

type hchan struct {                                                    
    qcount   uint           // total data in the queue                 
    dataqsiz uint           // size of the circular queue              
    buf      unsafe.Pointer // points to an array of dataqsiz elements 
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}   
    
type waitq struct {
    first *sudog
    last  *sudog
}

类型实现

对象创建

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")                                  
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {                                
        throw("makechan: bad alignment")
    }
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))                           
    if overflow || mem > maxAlloc-hchanSize || size < 0 {                                
        panic(plainError("makechan: size out of range"))                                 
    }
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.                         
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))                                 
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)      
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") 
    }          
    return c   
}

关键说明:

1)hchan是通过mallocgc在堆上分配内存,会被GC回收

2)元素是指针类型:元素包含指针,存在GC可能性,hchan结构和buff分开申请内存;

消息发送

GC假设栈只在一个协程内完成;读写一个无缓冲或空buffer的channel,是唯一一个协程写另外一个运行协程的栈场景,通过写屏障来实现。
1)recvq队列不空,则从recvq队首获取阻塞协程,将消息通过memmove写入该协程空间;调用goready唤醒阻塞协程;
2)环形队列有空间,将消息加入环形对接队尾;
3)环形队列没有空间,则阻塞当前协程,并加入到本channel的sendq队列;
4)向一个关闭的channel发送消息,会触发panic

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 参数、c状态异常检测                
    。。。
    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    // 如果recvq有阻塞协程,则直接发送数据到队首协程
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)                                  
        return true
    }

    // recvq没有阻塞协程,环形队列有空闲空间,将消息加入环形队列
    if c.qcount < c.dataqsiz {      
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }    
        
    if !block {
        unlock(&c.lock)
        return false
    }    
        
    // !!!阻塞当前协程,直到其它读协程消费后,唤醒本协程!!!      
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }    
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    // 将新创建的本协程sugog加入c的sendq队尾
    c.sendq.enqueue(mysg)
    
    // 阻塞本协程
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)       

    // 设置keepalive,确保消息活跃直到被消费,原因:sudogs没有作为栈跟踪的“根集”
    KeepAlive(ep)

    // ^_^消息已经别其它协程消费,本协程被唤醒^_^
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))                  
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    // 释放sudog结构资源
    releaseSudog(mysg)
    return true
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {                  
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)              

    memmove(dst, src, t.size)
}

消息接收

从channel读取消息,并写到目标地址ep;
1)sendq队列有阻塞协程,获取sendq队首阻塞协程;如果channel的容量为空,直接将阻塞协程数据memmove到当前协程;否则,从环形队列队尾typedmemmove到当前协程,并将消息写环形队列;
2)循环队列有数据,则从环形队列队尾typedmemmove到当前协程;并清理队尾空间,调整循环队列队尾;调用goread唤醒阻塞协程
3)循环队列为空,则创建sudog结构,阻塞当前协程到本channel的recvq队列;和chansend区别是,没有设置keepalive

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加锁
    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }
    // sendq有阻塞协程,则从阻塞协程中读取消息
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }

    // !!!阻塞当前协程,直到其它协程写入消息,唤醒本协程!!!
    gp := getg()
    // 创建几个sudog
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
 
    // 在elem赋值和mysg入队列之间没有栈空间分割
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // 将新创建的sudog加入recvq队尾
    c.recvq.enqueue(mysg)
    
    // 阻塞当前协程
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // ^_^其它协程向本channel写数据,本协程被唤醒^_^
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    
    // 释放sudog资源
    releaseSudog(mysg)
    return true, !closed
}

 

对象关闭

关闭过程

1)校验channel状态(nil,已关闭则触发panic)
2)清空recvq队列
3)清空sendq队列
4)goready唤醒recvq和sendq阻塞的协程

优雅关闭

1)关闭风险:
1.1)重复关闭,触发panic
1.2)向已关闭channel发送消息,触发panic

2)关闭原则:
2.1)防止重复关闭;
2.2)防止向已关闭channel发送消息

3)优雅关闭方法:
3.1)Context
3.2)done channel:通过增加辅助控制channel,用来传递关闭信号,多生产者和消费者退出协程,使用GC自动回收机制回收channel(辅助线程使用doneOnce保证只关闭一次)。

常用场景

阻塞channel

创建时,未指定size的channel,即为阻塞式的channel,因为其容量为1,所以是阻塞式的channel。

ch1 := make(chan int)

非阻塞channel

创建时,指定size的channel,即为非阻塞式的channel,因为其容量大于1,所以在channel未满时,向channel发送数据是不阻塞的(实际这种非阻塞并非是真正的非阻塞,只是在一定条件下的非阻塞)。

ch := make(chan int, 1024)

 

 

 

 

 

 

 

0条评论
0 / 1000
s****n
5文章数
0粉丝数
s****n
5 文章 | 0 粉丝
原创

Golang深入浅出——Channel结构解析

2023-07-12 14:01:59
14
0

简介

channel是golang中的一种数据结构,该数据类型也是一种引用类型。channel常用于协程间通信和数据共享,是golang“不要通过共享内存来通信,而应该通过通信来共享内存”灵魂的基石。

channel是一种协诚安全的数据类型,其底层采用带锁的环形队列存储消息。

 

类型定义

type hchan struct {                                                    
    qcount   uint           // total data in the queue                 
    dataqsiz uint           // size of the circular queue              
    buf      unsafe.Pointer // points to an array of dataqsiz elements 
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}   
    
type waitq struct {
    first *sudog
    last  *sudog
}

类型实现

对象创建

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")                                  
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {                                
        throw("makechan: bad alignment")
    }
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))                           
    if overflow || mem > maxAlloc-hchanSize || size < 0 {                                
        panic(plainError("makechan: size out of range"))                                 
    }
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.                         
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))                                 
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)      
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") 
    }          
    return c   
}

关键说明:

1)hchan是通过mallocgc在堆上分配内存,会被GC回收

2)元素是指针类型:元素包含指针,存在GC可能性,hchan结构和buff分开申请内存;

消息发送

GC假设栈只在一个协程内完成;读写一个无缓冲或空buffer的channel,是唯一一个协程写另外一个运行协程的栈场景,通过写屏障来实现。
1)recvq队列不空,则从recvq队首获取阻塞协程,将消息通过memmove写入该协程空间;调用goready唤醒阻塞协程;
2)环形队列有空间,将消息加入环形对接队尾;
3)环形队列没有空间,则阻塞当前协程,并加入到本channel的sendq队列;
4)向一个关闭的channel发送消息,会触发panic

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 参数、c状态异常检测                
    。。。
    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    // 如果recvq有阻塞协程,则直接发送数据到队首协程
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)                                  
        return true
    }

    // recvq没有阻塞协程,环形队列有空闲空间,将消息加入环形队列
    if c.qcount < c.dataqsiz {      
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }    
        
    if !block {
        unlock(&c.lock)
        return false
    }    
        
    // !!!阻塞当前协程,直到其它读协程消费后,唤醒本协程!!!      
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }    
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    // 将新创建的本协程sugog加入c的sendq队尾
    c.sendq.enqueue(mysg)
    
    // 阻塞本协程
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)       

    // 设置keepalive,确保消息活跃直到被消费,原因:sudogs没有作为栈跟踪的“根集”
    KeepAlive(ep)

    // ^_^消息已经别其它协程消费,本协程被唤醒^_^
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))                  
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    // 释放sudog结构资源
    releaseSudog(mysg)
    return true
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {                  
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)              

    memmove(dst, src, t.size)
}

消息接收

从channel读取消息,并写到目标地址ep;
1)sendq队列有阻塞协程,获取sendq队首阻塞协程;如果channel的容量为空,直接将阻塞协程数据memmove到当前协程;否则,从环形队列队尾typedmemmove到当前协程,并将消息写环形队列;
2)循环队列有数据,则从环形队列队尾typedmemmove到当前协程;并清理队尾空间,调整循环队列队尾;调用goread唤醒阻塞协程
3)循环队列为空,则创建sudog结构,阻塞当前协程到本channel的recvq队列;和chansend区别是,没有设置keepalive

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加锁
    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }
    // sendq有阻塞协程,则从阻塞协程中读取消息
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }

    // !!!阻塞当前协程,直到其它协程写入消息,唤醒本协程!!!
    gp := getg()
    // 创建几个sudog
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
 
    // 在elem赋值和mysg入队列之间没有栈空间分割
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // 将新创建的sudog加入recvq队尾
    c.recvq.enqueue(mysg)
    
    // 阻塞当前协程
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // ^_^其它协程向本channel写数据,本协程被唤醒^_^
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    
    // 释放sudog资源
    releaseSudog(mysg)
    return true, !closed
}

 

对象关闭

关闭过程

1)校验channel状态(nil,已关闭则触发panic)
2)清空recvq队列
3)清空sendq队列
4)goready唤醒recvq和sendq阻塞的协程

优雅关闭

1)关闭风险:
1.1)重复关闭,触发panic
1.2)向已关闭channel发送消息,触发panic

2)关闭原则:
2.1)防止重复关闭;
2.2)防止向已关闭channel发送消息

3)优雅关闭方法:
3.1)Context
3.2)done channel:通过增加辅助控制channel,用来传递关闭信号,多生产者和消费者退出协程,使用GC自动回收机制回收channel(辅助线程使用doneOnce保证只关闭一次)。

常用场景

阻塞channel

创建时,未指定size的channel,即为阻塞式的channel,因为其容量为1,所以是阻塞式的channel。

ch1 := make(chan int)

非阻塞channel

创建时,指定size的channel,即为非阻塞式的channel,因为其容量大于1,所以在channel未满时,向channel发送数据是不阻塞的(实际这种非阻塞并非是真正的非阻塞,只是在一定条件下的非阻塞)。

ch := make(chan int, 1024)

 

 

 

 

 

 

 

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0