With its concurrency properties, Go becomes one of the most popular languages in recent years. In Go concurrency, we have this convenient data structure to communicate between multiple goroutines - Channels. In this article, we are going to dive into the source code and get a deep understanding of it.
src/runtime/chan.go
First, take a look at the data structure.
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
// and bidirectional linked list
type waitq struct {
first *sudog
last *sudog
}
// a wrapper of goroutine
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
We can separate the channel
into three parts, including a ring buffer
, two wait queues
and a lock
.
A visualization of the Channel data structure is as below.
There are only 4 Operations for Channel
.
i. Create a channel
ii. Send data to a channel
iii. Receive data from a channel
iv. Close a channel
We will go through all the operations one by one.
To create a channel, go compiler translate make state to runtime.makechan
make(chan interface{}, size) —> runtime.makechan(interface{}, size)
make(chan interface{}) —> runtime.makechan(interface{}, 0)
Create a non-buffer channel is like creating a zero size channel. Take a look at the source code. The process breaks into 3 parts.
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// create channel and allocate ring buffer for buffer channel
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// assign value and create lock
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
First of all, a goroutine will always check if the channel is nil or blocked, if the channel is a nil channel. It will call gopark and will be blocked.forever. If a channel is closed, a panic will occur.
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// ....
lock(&c.lock)
// noted that always need to lock the channel to access or update channel data
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// ...
}
Noted: If a channel is not nil, a sender needs to acquire the lock before any r/w to the channel.
sendq
and recv
are two wait queues that consist of go routines. If a channel has no available space for buffering (a full channel or a non buffer channel) and a sender comes, the go routine will call gopark and will be blocked until a receiver comes. On the other hand, if a channel has no buffer data (the channel is empty or it is a non buffer channel), a receiver will call gopark and will be blocked until a sender comes.
In this case, a sender will always try to dequeue the recvq too see if there is a receiver. If there is a receiver in sendq, the sender will copy the data from it and goready the receiver.
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
Second, a sender will check if there is available space for buffering.
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
If there no enough space for buffering (a full buffer channel or a non buffer channel) a sender will call gopark and will be blocked until a receiver comes.
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
First, a receiver will always check if the channel is nil. If it is a nil channel, a receiver will be blocked forever. However, it’s a bit different in the case of a closed channel. A receiver can always read the data from a closed channel (even if it is empty) and it will continue to loop the buffer and return the value of the pointer. This is why a receiver will always receive zero value from a closed empty channel
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// ...
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
The following process is quite similar to the process of a sender. A receiver will always try to dequeue the sendq and check if there is any sender in the queue. If there is a sender in sendq, the receiver will copy the data from it and goready the sender
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
Second, check if there is data in the buffer. If yes, copy the data and return it
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
If there is no data in the buffer (a non buffer channel or an empty buffer channel) a receiver will call gopark and wait for a sender comes.
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
To summarize the Send/Receive process,
1. To send/recv to a nil channel, a go routine will be blocked forever.
2. A send/recv need to acquire the lock before any r/w operations.
3. To send to a closed channel, sender will panic, and a receiver will always get a data even if the channel is empty.
4. If a buffer is full or nil, a sender will be blocked. If a buffer is empty or nil, receiver will be blocked.
5. At most one of the queues (sendq/recvq) will be empty.
First, to close a channel, a go routine will check if the channel is nil or closed. A panic will occur in both cases.
Noted that a lock is required to read the information on the channel.
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
//...
}
Second, there will be a temporary data structure, gList, to store the remaining node in the sendq/recvq and release the lock quickly.
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
Once the lock is released. We can trigger goready for each sudog in the gList (temporary storage for releasing the lock quickly) as the closed channel is ready to be read by receivers.
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
Below is the comparison of the operations
Nil Channel | Closed Channel | Channel | |
---|---|---|---|
Close | Panic | Panic | Success |
Send | Blocked forever | Panic | Success or blocked |
Receive | Blocked forever | always success | success or blocked |