简介
channel是golang中的一种数据结构,该数据类型也是一种引用类型。channel常用于协程间通信和数据共享,是golang“不要通过共享内存来通信,而应该通过通信来共享内存”灵魂的基石。
channel是一种协诚安全的数据类型,其底层采用带锁的环形队列存储消息。
类型定义
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
类型实现
对象创建
func makechan(t *chantype, size int) *hchan {
elem := t.elem
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
关键说明:
1)hchan是通过mallocgc在堆上分配内存,会被GC回收
2)元素是指针类型:元素包含指针,存在GC可能性,hchan结构和buff分开申请内存;
消息发送
GC假设栈只在一个协程内完成;读写一个无缓冲或空buffer的channel,是唯一一个协程写另外一个运行协程的栈场景,通过写屏障来实现。
1)recvq队列不空,则从recvq队首获取阻塞协程,将消息通过memmove写入该协程空间;调用goready唤醒阻塞协程;
2)环形队列有空间,将消息加入环形对接队尾;
3)环形队列没有空间,则阻塞当前协程,并加入到本channel的sendq队列;
4)向一个关闭的channel发送消息,会触发panic
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 参数、c状态异常检测
。。。
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 如果recvq有阻塞协程,则直接发送数据到队首协程
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// recvq没有阻塞协程,环形队列有空闲空间,将消息加入环形队列
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// !!!阻塞当前协程,直到其它读协程消费后,唤醒本协程!!!
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 将新创建的本协程sugog加入c的sendq队尾
c.sendq.enqueue(mysg)
// 阻塞本协程
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 设置keepalive,确保消息活跃直到被消费,原因:sudogs没有作为栈跟踪的“根集”
KeepAlive(ep)
// ^_^消息已经别其它协程消费,本协程被唤醒^_^
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
// 释放sudog结构资源
releaseSudog(mysg)
return true
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
消息接收
从channel读取消息,并写到目标地址ep;
1)sendq队列有阻塞协程,获取sendq队首阻塞协程;如果channel的容量为空,直接将阻塞协程数据memmove到当前协程;否则,从环形队列队尾typedmemmove到当前协程,并将消息写环形队列;
2)循环队列有数据,则从环形队列队尾typedmemmove到当前协程;并清理队尾空间,调整循环队列队尾;调用goread唤醒阻塞协程
3)循环队列为空,则创建sudog结构,阻塞当前协程到本channel的recvq队列;和chansend区别是,没有设置keepalive
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// sendq有阻塞协程,则从阻塞协程中读取消息
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// !!!阻塞当前协程,直到其它协程写入消息,唤醒本协程!!!
gp := getg()
// 创建几个sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 在elem赋值和mysg入队列之间没有栈空间分割
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 将新创建的sudog加入recvq队尾
c.recvq.enqueue(mysg)
// 阻塞当前协程
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// ^_^其它协程向本channel写数据,本协程被唤醒^_^
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
// 释放sudog资源
releaseSudog(mysg)
return true, !closed
}
对象关闭
关闭过程
1)校验channel状态(nil,已关闭则触发panic)
2)清空recvq队列
3)清空sendq队列
4)goready唤醒recvq和sendq阻塞的协程
优雅关闭
1)关闭风险:
1.1)重复关闭,触发panic
1.2)向已关闭channel发送消息,触发panic
2)关闭原则:
2.1)防止重复关闭;
2.2)防止向已关闭channel发送消息
3)优雅关闭方法:
3.1)Context
3.2)done channel:通过增加辅助控制channel,用来传递关闭信号,多生产者和消费者退出协程,使用GC自动回收机制回收channel(辅助线程使用doneOnce保证只关闭一次)。
常用场景
阻塞channel
创建时,未指定size的channel,即为阻塞式的channel,因为其容量为1,所以是阻塞式的channel。
ch1 := make(chan int)
非阻塞channel
创建时,指定size的channel,即为非阻塞式的channel,因为其容量大于1,所以在channel未满时,向channel发送数据是不阻塞的(实际这种非阻塞并非是真正的非阻塞,只是在一定条件下的非阻塞)。
ch := make(chan int, 1024)