通道 - channel

通道 - channel #

Golang中Channel是goroutine间重要通信的方式,是并发安全的,通道内的数据First In First Out,我们可以把通道想象成队列。

channel数据结构 #

Channel底层数据结构是一个结构体。

type hchan struct {
	qcount   uint // 队列中元素个数
	dataqsiz uint // 循环队列的大小
	buf      unsafe.Pointer // 指向循环队列
	elemsize uint16 // 通道里面的元素大小
	closed   uint32 // 通道关闭的标志
	elemtype *_type // 通道元素的类型
	sendx    uint   // 待发送的索引,即循环队列中的队尾指针rear
	recvx    uint   // 待读取的索引,即循环队列中的队头指针front
	recvq    waitq  // 接收等待队列
	sendq    waitq  // 发送等待队列
	lock mutex // 互斥锁
}

hchan结构体中的buf指向一个数组,用来实现循环队列,sendx是循环队列的队尾指针,recvx是循环队列的队头指针。dataqsize是缓存型通道的大小,qcount记录着通道内数据个数。

循环队列一般使用空余单元法来解决队空和队满时候都存在font=rear带来的二义性问题,但这样会浪费一个单元。golang的channel中是通过增加qcount字段记录队列长度来解决二义性,一方面不会浪费一个存储单元,另一方面当使用len函数查看通道长度时候,可以直接返回qcount字段,一举两得。

hchan结构体中另一重要部分是recvq,sendq,分别存储了等待从通道中接收数据的goroutine,和等待发送数据到通道的goroutine。两者都是waitq类型。

waitq是一个结构体类型,waitq和sudog构成双向链表,其中sudog是链表元素的类型,waitq中first和last字段分别指向链表头部的sudog,链表尾部的sudog。

type waitq struct {
	first *sudog
	last  *sudog
}

type sudog struct {
	...
	g *g // 当前阻塞的G
	...
	next     *sudog
	prev     *sudog
	elem     unsafe.Pointer
	...
}

hchan结构图如下:

channel的创建 #

在分析channel的创建代码之前,我们看下源码文件中最开始定义的两个常量;

const (
	maxAlign  = 8
	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
	...
)
  • maxAlgin用来设置内存最大对齐值,对应就是64位系统下cache line的大小。当结构体是8字节对齐时候,能够避免false share,提高读写速度
  • hchanSize用来设置chan大小,unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)),这个复杂公式用来计算离unsafe.Sizeof(hchan{})最近的8的倍数。假设hchan{}大小是13,hchanSize是16。

假设n代表unsafe.Sizeof(hchan{}),a代表maxAlign,c代表hchanSize,则上面hchanSize的计算公式可以抽象为:

c = n + ((-n) & (a - 1))

计算离8最近的倍数,只需将n补足与到8倍数的差值就可,c也可以用下面公式计算

c = n + (a - n%a)

感兴趣的可以证明在a为2的n的次幂时候,上面两个公式是相等的。

func makechan(t *chantype, size int) *hchan {
	elem := t.elem
	// 通道元素的大小不能超过64K
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}

	// hchanSize大小不是maxAlign倍数,或者通道数据元素的对齐保证大于maxAlign
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}
	// 判断通道数据是否超过内存限制
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	var c *hchan
	switch {
	case mem == 0: // 无缓冲通道
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.ptrdata == 0: 
		// 当通道数据元素不含指针,hchan和buf内存空间调用mallocgc一次性分配完成
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		// hchan和buf内存上布局是紧挨着的
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 当通道数据元素含指针时候,先创建hchan,然后给buf分配内存空间
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	...
	return c
}

发送数据到channel #

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// 当通道为nil时候
	if c == nil {
		// 非阻塞模式下,直接返回false
		if !block {
			return false
		}
		// 调用gopark将当前Goroutine休眠,调用gopark时候,将传入unlockf设置为nil,当前Goroutine会一直休眠
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// 调试,不必关注
	if debugChan {
		print("chansend: chan=", c, "\n")
	}
	// 竞态检测,不必关注
	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}

	// 非阻塞模式下,不使用锁快速检查send操作
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	// 加锁
	lock(&c.lock)

	// 如果通道已关闭,再发送数据,发生恐慌
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	
	// 从接收者队列recvq中取出一个接收者,接收者不为空情况下,直接将数据传递给该接收者
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	// 缓冲队列中的元素个数小于队列的大小
	// 说明缓冲队列还有空间
	if c.qcount < c.dataqsiz {
		qp := chanbuf(c, c.sendx) // qp指向循环数组中未使用的位置
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		// 将发送的数据写入到qp指向的循环数组中的位置
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++ // 将send加一,相当于循环队列的front指针向前进1
		if c.sendx == c.dataqsiz { //当循环队列最后一个元素已使用,此时循环队列将再次从0开始
			c.sendx = 0
		}
		c.qcount++ // 队列中元素计数加1
		unlock(&c.lock) // 释放锁
		return true
	}

	if !block {
		unlock(&c.lock)
		return false
	}

	gp := getg() // 获取当前的G
	mysg := acquireSudog() // 返回一个sudog
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep // 发送的数据
	mysg.waitlink = nil
	mysg.g = gp // 当前G,即发送者
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg) // 将当前发送者入队sendq中
	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) // 将当前goroutine放入waiting状态,并释放c.lock锁

	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if raceenabled {
		if c.dataqsiz == 0 {
			// 无缓冲通道
			racesync(c, sg)
		} else {
			qp := chanbuf(c, c.recvx)
			raceacquire(qp)
			racerelease(qp)
			raceacquireg(sg.g, qp)
			racereleaseg(sg.g, qp)
			c.recvx++ // 相当于循环队列的rear指针向前进1
			if c.recvx == c.dataqsiz { // 队列数组中最后一个元素已读取,则再次从头开始读取
				c.recvx = 0
			}
			c.sendx = c.recvx
		}
	}
	if sg.elem != nil { // 复制数据到sg中
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1) // 使goroutine变成runnable状态,唤醒goroutine
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	dst := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}

// 返回缓存槽i位置的对应的指针
func chanbuf(c *hchan, i uint) unsafe.Pointer {
	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

// 将src值复制到dst
// 源码https://github.com/golang/go/blob/2bc8d90fa21e9547aeb0f0ae775107dc8e05dc0a/src/runtime/mbarrier.go#L156
func typedmemmove(typ *_type, dst, src unsafe.Pointer) {
	if dst == src {
		return
	}
	...
	memmove(dst, src, typ.size)
	...
}

从channel中读取数据 #

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// 当通道为nil时候
	if c == nil {
		if !block { // 当非阻塞模式直接返回
			return
		}
		// 一直阻塞
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	...
	// 加锁锁
	lock(&c.lock)
	// 当通道已关闭,且通道缓冲没有元素时候,直接返回
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock) // 释放锁
		if ep != nil {
			typedmemclr(c.elemtype, ep) // 清空ep指向的内存
		}
		return true, false
	}
	// 从发送者队列中取出一个发送者,发送者不为空时候,将发送者数据传递给接收者
	if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	// 缓冲队列中有数据情况下,从缓存队列取出数据,传递给接收者
	if c.qcount > 0 {
		// qp指向循环队列数组中元素
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		if ep != nil {
			// 直接qp指向的数据复制到ep指向的地址
			typedmemmove(c.elemtype, ep, qp)
		}
		// 清空qp指向内存的数据
		typedmemclr(c.elemtype, qp)
		c.recvx++ // 相当于循环队列中的rear加1
		if c.recvx == c.dataqsiz { // 队列最后一个元素已读取出来,recvx指向0
			c.recvx = 0
		}
		c.qcount-- // 队列中元素个数减1
		unlock(&c.lock) // 释放锁
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}

	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}

	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {
		if raceenabled {
			racesync(c, sg)
		}
		if ep != nil {
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
			raceacquireg(sg.g, qp)
			racereleaseg(sg.g, qp)
		}
		// 复制队列中数据到接收者
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1) // 唤醒G
}

关闭channel #

func closechan(c *hchan) {
	// 当关闭的通道是nil时候,直接恐慌
	if c == nil {
		panic(plainError("close of nil channel"))
	}
	// 加锁
	lock(&c.lock)
	// 通道已关闭,再次关闭直接恐慌
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}
	...
	c.closed = 1 // 关闭标志closed置为1
	var glist gList
	// 将接收者添加到glist中
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	// 将发送者添加到glist中
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp) // 
	}
	unlock(&c.lock)

	// 循环glist,调用goready唤醒所有接收者和发送者
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

总结 #

  1. channel规则:
操作 空Channel 已关闭Channel 活跃Channel
close(ch) panic panic 成功关闭
ch <-v 永远阻塞 panic 成功发送或阻塞
v,ok = <-ch 永远阻塞 不阻塞 成功接收或阻塞

注意: 从空通道中写入或读取数据会永远阻塞,这会造成goroutine泄漏。

  1. 发送、接收数据以及关闭通道流程图:

golang通道发送、接收数据以及关闭通道流程图