Understand Channel design in one article

In Go, to understand channel, you need to know goroutine first.

1, Why goroutine

Modern operating systems provide us with three basic methods of constructing concurrent programs: multi process, I/O multiplexing and multithreading. Among them, the simplest construction method is multi process, but for multi process concurrent programs, due to the huge cost of process control and inter process communication, this kind of concurrent method is often very slow.

Therefore, the operating system provides a smaller granularity running unit: thread (kernel thread). It is a logical flow running in the process context. Threads are scheduled through the operating system. Its scheduling model is shown in the figure below.

The concurrent mode of multithreading is much faster than that of multiprocessing. However, because thread context switching always inevitably falls into kernel state, its overhead is still large. So is there a running carrier that doesn't have to fall into the kernel state? Yes, user level threads. The switching of user level threads is controlled by the user program itself without kernel interference, so the consumption of entering and leaving the kernel state is reduced.

The user level thread here is coroutine. Their switching is uniformly scheduled and managed by the runtime system, and the kernel does not know its existence. A coroutine is an object abstracted from a kernel thread. A kernel thread can correspond to multiple coroutines. But the final system call still needs the kernel thread to complete. Note that thread scheduling is managed by the operating system, which is a preemptive scheduling. Coordination is the reason why coordination processes need to hand over cooperation, which is also called coordination processes.

Go naturally supports the collaborative process at the language level, which is often called goroutine. The runtime system of go implements an M:N scheduling model, which is described by GMP objects, where G represents the collaborative process, M is the thread and P is the scheduling context. In Go program, a goroutine represents a minimum user code execution flow, and they are also the minimum unit of concurrent flow.

2, Existence and location of channel

From the perspective of memory, there are only two kinds of concurrency models: shared memory based and message communication based (memory copy). In go, synchronization primitives for both concurrency models are provided: sync* And atomic* It is based on shared memory; Channel represents message based communication. Go advocates the latter, which includes three elements: goroutine, channel and select.

Do not communicate by sharing memory; instead, share memory by communicating.

In Go, the concurrency problem can be solved simply and efficiently through goroutine+channel. Channel is the data bridge between goroutines.

Concurrency is the key to designing high performance network services. Go's concurrency primitives (goroutines and channels) provide a simple and efficient means of expressing concurrent execution.

The following is a simple example code for using channel.

func goroutineA(ch <-chan int)  {
	fmt.Println("[goroutineA] want a data")
	val := <- ch
	fmt.Println("[goroutineA] received the data", val)
}

func goroutineB(ch chan<- int)  {
	time.Sleep(time.Second*1)
	ch <- 1
	fmt.Println("[goroutineB] send the data 1")
}

func main() {
	ch := make(chan int, 1)
	go goroutineA(ch)
	go goroutineB(ch)
	time.Sleep(2*time.Second)
}

The interesting diagram of the above process is as follows

3, channel source code analysis

The channel source code is located in Src / go / Runtime / channel go. This chapter is divided into two parts: channel internal structure and channel operation.

3.1 internal structure of channel

ch := make(chan int,2)

For the above channel declaration statement, we can add breakpoints in the program, and the information of ch is as follows.

Good. It looks very clear. But what does this information mean? Next, let's look at some important structures.

  • hchan

When we generate a channel through make (channel type, size), in the runtime system, an hchan structure object is generated. The source code is located in Src / Runtime / Chan go

type hchan struct {
	qcount   uint           // Number of data in circular queue
	dataqsiz uint           // Size of the circular queue
	buf      unsafe.Pointer // Pointer to an array containing data elements of size dataqsize
	elemsize uint16         // Size of data element
	closed   uint32         // Indicates whether the channel is closed   
	elemtype *_type         // _ Type represents the type system of Go, and elemtype represents the element type in channel
	sendx    uint           // Send index number, initial value is 0
	recvx    uint           // Receive index number, initial value is 0
  recvq    waitq          // The receive wait queue stores blocked goroutines that attempt to receive data (< - CH) from the channel
	sendq    waitq          // The send wait queue stores blocking goroutines that attempt to send data (CH < -) to the channel

	lock mutex              // Locking can protect all fields of hchan, including sudoq objects in waitq
}
  • waitq

waitq is used to express the goroutines linked list information in the blocked state. first points to the goroutine at the head of the chain and last points to the goroutine at the end of the chain

type waitq struct {
	first *sudog           
	last  *sudog
}
  • sudug

sudog represents a goroutine object in the waiting list. The source code is located in Src / Runtime / runtime2 go

type sudog struct {
	g *g
	next *sudog
	prev *sudog
	elem unsafe.Pointer // data element (may point to stack)
	c        *hchan // channel
  ...
}

In order to better understand the structure of hchan, we will understand the meaning of fields in hchan through the following code.

package main

import "time"

func goroutineA(ch chan int) {
	ch <- 100
}

func goroutineB(ch chan int) {
	ch <- 200
}

func goroutineC(ch chan int) {
	ch <- 300
}

func goroutineD(ch chan int) {
	ch <- 300
}

func main() {
	ch := make(chan int, 4)
	for i := 0; i < 4; i++ {
		ch <- i * 10
	}
	go goroutineA(ch)
	go goroutineB(ch)
	go goroutineC(ch)
	go goroutineD(ch)
	// The first sleep is to give enough time for all goroutine s to start
	time.Sleep(time.Millisecond * 500)
	time.Sleep(time.Second)
}

Open the code debugging function and run the program to the breakpoint time At sleep (time. Second), the chan information obtained at this time is as follows.

In this channel, the size of the channel defined by make (channel int, 4) is 4, that is, the value of dataqsiz is 4. At the same time, since four elements have been added to the circular queue, the qcount value is also 4. At this time, four goroutines (A-D) want to send data to the channel, but because the circular queue for storing data is full, they can only enter the sending waiting list, i.e. sendq. At the same time, it should be noted that the sending and receiving index values at this time are 0, that is, the goroutine receiving data next time will be taken from the first element of the circular queue, and the goroutine sending data will be sent to the first position of the circular queue.

The above hchan structure visualization diagram is as follows

3.2 channel operation

The channel operation is divided into four parts: create, send, receive and close.

establish

The reference go version of this article is 1.15.2. The creation and implementation code of its channel is located in Src / go / Runtime / channel Go's makechan method.

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

  // Send element size limit
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
  // Alignment check
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

  // Determine whether memory overflow will occur
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

  // Allocate memory for constructed hchan objects
	var c *hchan
	switch {
  // Unbuffered channel or element size 0
	case mem == 0:
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
  // The element does not contain a pointer  
	case elem.ptrdata == 0:
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
  // Element contains a pointer  
	default:
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

  // Initialization related parameters
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

It can be seen that the makechan method mainly checks the legitimacy of the transfer elements, allocates memory for hchan, and initializes relevant parameters, including lock initialization.

send out

The sending implementation code of channel is located in Src / go / Runtime / channel Go's chansend method. There are several situations in the sending process.

  1. When the sending channel is nil
if c == nil {
	if !block {
		return false
	}
	gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
	throw("unreachable")
}

When sending data to a nil channel, call gopark function to transfer the currently executed goroutine from running state to waiting state.

  1. Send data to closed channel s
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

If you send data to a closed channel, panic will be raised.

  1. If there are already blocked receiving goroutines (i.e. non null in recvq), the data will be sent directly to the receiving goroutines.
if sg := c.recvq.dequeue(); sg != nil {
	// Found a waiting receiver. We pass the value we want to send
	// directly to the receiver, bypassing the channel buffer (if any).
	send(c, sg, ep, func() { unlock(&c.lock) }, 3)
	return true
}

The implementation code of this logic is in the send method and sendDirect.

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  ... // The race code is omitted
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	dst := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}

Among them, memmove has been encountered many times in the source code series. Its purpose is to copy the contents of src in memory to dst. In addition, notice the code of go ready (GP, skip + 1), which will change the state of the first goroutine in the reception waiting queue to runnable, so that the go scheduler can re execute the goroutine.

  1. For buffered channel s, if the current buffer hchan If buf has free space, the data will be copied to the buffer
if c.qcount < c.dataqsiz {
	qp := chanbuf(c, c.sendx)
	if raceenabled {
		raceacquire(qp)
		racerelease(qp)
	}
	typedmemmove(c.elemtype, qp, ep)
  // Send index number + 1
	c.sendx++
  // Because the structure of storing data elements is a circular queue, when the front index number has reached the end of the queue, adjust the index number to the head of the queue
	if c.sendx == c.dataqsiz {
		c.sendx = 0
	}
  // Number of stored elements in the current circular queue + 1
	c.qcount++
	unlock(&c.lock)
	return true
}

Among them, chanbuf(c, c.sendx) is to obtain the pointer to the corresponding memory area. typememmove will call the memmove method to copy the data. In addition, it is noted that when hchan is actually operated, lock (& c.lock) needs to be called to lock. Therefore, after the data copy is completed, unlock (& c.lock) to release the lock.

  1. Buffered channel, when hchan BUF is full; Or non buffered channel. Currently, there is no goroutine received
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
	mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

Get the currently executed goroutine through getg. acquireSudog first obtains the thread M currently executing goroutine, then obtains the P corresponding to M, and finally takes out the sudog of the queue head in the sudugo cache queue of P (see source code src/runtime/proc.go for details). Via c.sendq Enqueue adds sudug to the sending waiting list of the channel and calls gopark to change the current goroutine to waiting state.

  • Sending will lock hchan.
  • When there is a goroutine waiting to be received in recvq, the data elements will be copied directly to the receiving goroutine.
  • When the recvq waiting queue is empty, it will judge hchan Whether buf is available. If available, the sent data will be copied to hchan BUF.
  • If hchan If the buf is full, the currently sent goroutine is queued in sendq and suspended at runtime.
  • Sending data to a closed channel will cause panic.

For unbuffered channel s, it is naturally hchan BUF is full because of its hchan The capacity of buf is 0.

package main

import "time"

func main() {
	ch := make(chan int)
	go func(ch chan int) {
		ch <- 100
	}(ch)
	time.Sleep(time.Millisecond * 500)
	time.Sleep(time.Second)
}

In the above example, the sending goroutine sends data to the unbuffered channel, but the goroutine is not received. Place breakpoint at time Sleep (time. Second). The ch structure is as follows.

It can be seen that in the unbuffered channel, the buf length of the hchan is 0. When the rootine is not received, the sent goroutine will be placed in the sendq sending queue.

receive

There are two receiving implementations of channel: V: = < - CH corresponds to channelcv1, V, OK: = < - CH corresponds to channelcv2, but they all depend on the location in Src / go / Runtime / Chan Go's chanrecv method.

func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}

The detailed code of chanrecv will not be shown here. It corresponds to chansend logic. The specific processing criteria are as follows.

  • The receiving operation will lock the hchan.
  • When there is a goroutine waiting to be sent in sendq, it means hchan BUF is full (NATURAL full without cache), which can be divided into two cases (see recv method of code src/go/runtime/chan.go): 1 If the hchan has a cache, first copy the data in the buffer to the receiving goroutine, then dequeue the sudog of sendq, and copy the elements on the dequeued sudog to the cache of hchan. 2. If the hchan has no cache, copy the elements on the sudog out of the queue directly to the receiving goroutine. In both cases, the sending goroutine on the sudog of the queue will be awakened at the end.
  • When the sendq send queue is empty, hchan will be judged Whether buf is available. If available, hchan The data of buf is copied to the receiving goroutine.
  • If hchan If buf is unavailable, queue the currently received goroutine in recvq and suspend it at runtime.
  • Unlike sending, goroutine can also get data from the channel when the channel is closed. If there are goroutines in the recvq waiting list, they will be awakened to receive data. If hchan If there is still unreceived data in buf, goroutine will receive the data in the buffer, otherwise goroutine will get the zero value of the element.

The following is the sample code for receiving goroutine reading after the channel is closed.

func main() {
	ch := make(chan int, 1)
	ch <- 10
	close(ch)
	a, ok := <-ch
	fmt.Println(a, ok)
	b, ok := <-ch
	fmt.Println(b, ok)
	c := <-ch
	fmt.Println(c)
}

//The output is as follows
10 true
0 false
0

Note: all element transfers in the channel are accompanied by memory copies.

func main() {
	type Instance struct {
		ID   int
		name string
	}

	var ins = Instance{ID: 1, name: "Golang"}

	ch := make(chan Instance, 3)
	ch <- ins

	fmt.Println("ins Original value of:", ins)

	ins.name = "Python"
	go func(ch chan Instance) {
		fmt.Println("channel Received value:", <-ch)
	}(ch)

	time.Sleep(time.Second)
	fmt.Println("ins Final value of:", ins)
}

// Output results
ins Original value of: {1 Golang}
channel Received value: {1 Golang}
ins Final value of: {1 Python}

The first half is illustrated below

The second half is illustrated below

Note that if the channel transfer type is replaced by an Instance pointer, the element stored in the buf by the channel is already a copy object, and it is taken out of the channel and copied again. However, because their type is an Instance pointer, the copied object and the original object will point to the same memory address. When modifying the data of the original element object, it will affect the retrieval of data.

func main() {
	type Instance struct {
		ID   int
		name string
	}

	var ins = &Instance{ID: 1, name: "Golang"}

	ch := make(chan *Instance, 3)
	ch <- ins

	fmt.Println("ins Original value of:", ins)

	ins.name = "Python"
	go func(ch chan *Instance) {
		fmt.Println("channel Received value:", <-ch)
	}(ch)

	time.Sleep(time.Second)
	fmt.Println("ins Final value of:", ins)
}

// Output results
ins Original value of: &{1 Golang}
channel Received value: &{1 Python}
ins Final value of: &{1 Python}

Therefore, when using channel, try to avoid passing pointers. If you pass pointers, be careful.

close

The closing implementation code of channel is located in Src / go / Runtime / channel The detailed execution logic of the chansend method of go has been stated through comments.

func closechan(c *hchan) {
  // If the hchan object is nil, painc is thrown
	if c == nil {
		panic(plainError("close of nil channel"))
	}

  // Lock hchan
	lock(&c.lock)
  // Call the close (C Chan < - type) method multiple times, otherwise it will cause painc
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
		racerelease(c.raceaddr())
	}

  // close flag
	c.closed = 1

  // gList represents the G set of GMP scheduling of Go
	var glist gList

	// The for loop is to release all sudog waiting to be received on recvq
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// The for loop releases all sudog waiting to be sent on sendq
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
  // After releasing sendq and recvq, hchan releases the lock
	unlock(&c.lock)

  // Take out the goroutine s added in glist above, make them into runnable state, and wait for the scheduler to execute
	// Note: as we analyzed above, trying to send data to a closed channel will cause painc.
  // Therefore, if goroutine s in sendq are released, they will trigger panic once they are executed.
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

There are several points to note about closing operation.

  • If a closed channel is closed, painc will be raised.
  • After the channel is closed, goroutines will be awakened if there is blocked reading or sending. Reading goroutines will get the received element of hchan. If not, it will get the element zero value; The execution of sending goroutine will cause painc.

For the second point, we can make good use of this feature to control the program execution flow (similar to the function of sync.WaitGroup). The following is the sample program code.

func main() {
	ch := make(chan struct{})
	//
	go func() {
		// do something work...
		// when work has done, call close()
		close(ch)
	}()
	// waiting work done
	<- ch
	// other work continue...
}

4, Summary

channel is a very powerful and useful mechanism in Go. In order to use it more effectively, we must understand its implementation principle, which is also the purpose of this paper.

  • The hchan structure has a lock guarantee and is safe for concurrent goroutine
  • channel receives and sends data according to FIFO (First In First Out) primitive
  • The data transfer of channel depends on memory copy
  • channel can block (gopark), wake (goready) and goroutine
  • The so-called cache free channel works by directly sending goroutine copy data to the receiving goroutine instead of through hchan buf

In addition, we can see that Go has weighed simplicity and performance in the design of channel. For simplicity, hchan is a lock structure, because the queue with lock will be easier to understand and implement, but it will lose some performance. Considering the high cost of locking the whole channel operation, in fact, the official has also considered using the design of lockless channel, but it has been proposed at present( https://github.com/golang/go/issues/8899 ), the channel maintainability of the lock free implementation is poor, and the actual performance test is not convincing, and it does not conform to the simple philosophy of Go. Therefore, the official has not adopted the lock free design so far.

In terms of performance, we need to realize that the so-called goroutine blocking in channel is only blocked in the runtime system, which is the blocking of the user layer. The actual underlying kernel thread is not affected, and it is still unblocked.

Reference link

https://speakerdeck.com/kavya719/understanding-channels

https://codeburst.io/diving-deep-into-the-golang-channels-549fd4ed21a8

https://github.com/talkgo/night/issues/450

Tags: Go Concurrent Programming Channel goroutine

Posted by capslock118 on Fri, 06 May 2022 01:37:04 +0300