Go

Go Channel Source Code Review

Go Channel Source Code Review

0. Intro

With its concurrency properties, Go becomes one of the most popular languages in recent years. In Go concurrency, we have this convenient data structure to communicate between multiple goroutines - Channels. In this article, we are going to dive into the source code and get a deep understanding of it.

  • soucre code: src/runtime/chan.go

1. Data Structure

First, take a look at the data structure.

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

// and bidirectional linked list
type waitq struct {
	first *sudog
	last  *sudog
}

// a wrapper of goroutine
type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.

	g *g

	next *sudog
	prev *sudog
	elem unsafe.Pointer // data element (may point to stack)

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.

	acquiretime int64
	releasetime int64
	ticket      uint32

	// isSelect indicates g is participating in a select, so
	// g.selectDone must be CAS'd to win the wake-up race.
	isSelect bool

	parent   *sudog // semaRoot binary tree
	waitlink *sudog // g.waiting list or semaRoot
	waittail *sudog // semaRoot
	c        *hchan // channel
}

We can separate the channel into three parts, including a ring buffer, two wait queues and a lock.

  1. Ring buffer(buf) is a space (for the buffer channels) to store the temporary data (for a non-buffered channel, buf points to nil).
  2. The sendq and the recvq are two bidirectional linked list of send/recv waiter. The nodes in the waitq are sudog which is a wrapper of goroutines.
  3. lock is used to assert mutual exclusive to prevent race condition.

A visualization of the Channel data structure is as below.

2. Operations

There are only 4 Operations for Channel.

    i. Create a channel
    ii. Send data to a channel
    iii. Receive data from a channel
    iv. Close a channel

We will go through all the operations one by one.

i. Create a channel

To create a channel, go compiler translate make state to runtime.makechan

make(chan interface{}, size) —> runtime.makechan(interface{}, size)
make(chan interface{}) —> runtime.makechan(interface{}, 0)

Create a non-buffer channel is like creating a zero size channel. Take a look at the source code. The process breaks into 3 parts.

  1. Validate parameters(check if memory size is overflow).
  2. Create a ring buffer if size is greater than zero.(buffer channel)
  3. Assign value and create a lock
func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
    if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}
    
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}
	
	// create channel and allocate ring buffer for buffer channel
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

    // assign value and create lock
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}
ii. Send/Receive data to/from channel
Send data to channel

First of all, a goroutine will always check if the channel is nil or blocked, if the channel is a nil channel. It will call gopark and will be blocked.forever. If a channel is closed, a panic will occur.

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	// ....

	lock(&c.lock) 
	// noted that always need to lock the channel to access or update channel data

	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	// ...
}

Noted: If a channel is not nil, a sender needs to acquire the lock before any r/w to the channel.

sendq and recv are two wait queues that consist of go routines. If a channel has no available space for buffering (a full channel or a non buffer channel) and a sender comes, the go routine will call gopark and will be blocked until a receiver comes. On the other hand, if a channel has no buffer data (the channel is empty or it is a non buffer channel), a receiver will call gopark and will be blocked until a sender comes.
In this case, a sender will always try to dequeue the recvq too see if there is a receiver. If there is a receiver in sendq, the sender will copy the data from it and goready the receiver.

	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

Second, a sender will check if there is available space for buffering.

	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

If there no enough space for buffering (a full buffer channel or a non buffer channel) a sender will call gopark and will be blocked until a receiver comes.

	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
Receive data from channel

First, a receiver will always check if the channel is nil. If it is a nil channel, a receiver will be blocked forever. However, it’s a bit different in the case of a closed channel. A receiver can always read the data from a closed channel (even if it is empty) and it will continue to loop the buffer and return the value of the pointer. This is why a receiver will always receive zero value from a closed empty channel

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}

	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	
	// ...

	lock(&c.lock)

	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
}

The following process is quite similar to the process of a sender. A receiver will always try to dequeue the sendq and check if there is any sender in the queue. If there is a sender in sendq, the receiver will copy the data from it and goready the sender

	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

Second, check if there is data in the buffer. If yes, copy the data and return it

	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

If there is no data in the buffer (a non buffer channel or an empty buffer channel) a receiver will call gopark and wait for a sender comes.

	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

To summarize the Send/Receive process,
1. To send/recv to a nil channel, a go routine will be blocked forever.
2. A send/recv need to acquire the lock before any r/w operations.
3. To send to a closed channel, sender will panic, and a receiver will always get a data even if the channel is empty.
4. If a buffer is full or nil, a sender will be blocked. If a buffer is empty or nil, receiver will be blocked.
5. At most one of the queues (sendq/recvq) will be empty.

iii. Close channel

First, to close a channel, a go routine will check if the channel is nil or closed. A panic will occur in both cases.

Noted that a lock is required to read the information on the channel.

func closechan(c *hchan) {
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}
	//...
}

Second, there will be a temporary data structure, gList, to store the remaining node in the sendq/recvq and release the lock quickly.

	// release all readers
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// release all writers (they will panic)
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

Once the lock is released. We can trigger goready for each sudog in the gList (temporary storage for releasing the lock quickly) as the closed channel is ready to be read by receivers.

	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}

Conclusion

Below is the comparison of the operations

Nil Channel Closed Channel Channel
Close Panic Panic Success
Send Blocked forever Panic Success or blocked
Receive Blocked forever always success success or blocked
comments powered by Disqus