Go语言 Channel

基础

为什么需要Channel

在介绍goroutine的时候有提到过多个goroutine会产生竞争关系,如果使用不得当,轻则无法正确更新变量,重则挂掉程序。我们可以通过原子函数和互斥锁来解决竞争关系,但是还有一种更有趣的方法,那就是使用Channel。

当一个资源需要在goroutine之间共享时,Channel在goroutine之间架起了一个管道,并提供了 确保同步交换数据的机制。

定义Channel

//无缓冲的整型通道
unbuffered := make(chan int)
//有缓冲的字符串通道
buffered := make(chan string, 10)

从Channel里收发数据

//通过通道发送一个字符串 
buffered <- 1
//从通道接收一个字符串 
value := <- buffered

无缓冲的Channel

无缓冲的Channel必须有接受者,如果没有接受者(worker)就会报错。

package main

import (
	"fmt"
	"time"
)

func worker(id int, c chan int) {
	for  {
		fmt.Printf("Worker %d received %d\n",
			id, <-c)
	}
}

func createWorker(id int) chan<- int {
	c := make(chan int)
	//Channel的接受者
	go worker(id, c)
	return c
}

func chanDemo() {
	//建立Channel的slice (Channel是一等公民)
	var channels [10]chan<- int
	//建立10个Channel和接受者
	for i := 0; i < 10; i++ {
		channels[i] = createWorker(i)
	}
	//传入值
	for i := 0; i < 10; i++ {
		channels[i] <- 'a' + i
	}
	//传入值
	for i := 0; i < 10; i++ {
		channels[i] <- 'A' + i
	}
	//程序持续1 Millisecond
	time.Sleep(time.Millisecond)
}

func main() {
	fmt.Println("Channel as first-class citizen")
	chanDemo()
}

注意在把Channel当做函数参数使用的时候,可以定义Channel的数据走向chan<- int,这样Channel就只能接受数据。

有缓冲的Channel

有缓冲的Channel就算没有接受者,只要没有超过预先设置的缓冲,就能保存传入的值。

package main

import (
	"fmt"
	"time"
)

func worker(id int, c chan int) {
	for {
		fmt.Printf("Worker %d received %c\n",
			id, <-c)
	}
}

func bufferedChannel() {
	c := make(chan int, 3)
	go worker(0, c)
	c <- 'a'
	c <- 'b'
	c <- 'c'
	c <- 'd'
	time.Sleep(time.Millisecond)
}

func main() {
	fmt.Println("Buffered channel")
	bufferedChannel()
}

如果不加worker,程序会报错,但是如果把c <- 'd'也注释掉,程序时不会报错的。

关闭Channel

package main

import (
	"fmt"
	"time"
)

func worker(id int, c chan int) {
	for {
		fmt.Printf("Worker %d received %c\n",
			id, <-c)
	}
}

func channelClose() {
	c := make(chan int)
	go worker(0, c)
	c <- 'a'
	c <- 'b'
	c <- 'c'
	c <- 'd'
	close(c)
	time.Sleep(time.Millisecond)
}

func main() {
	fmt.Println("Channel close and range")
	channelClose()
}

在传完4个值之后关闭了Channel,但是worker还在无限循环并且往外传Channel。但是由于已经没有值了,在1 毫秒的时间内,会生成下面的结果。

Channel close and range
Worker 0 received a
Worker 0 received b
Worker 0 received c
Worker 0 received d
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 

如何对应这个问题可以有2种写法。

写法1

func worker2(id int, c chan int) {
	for {
		n, ok := <-c
		if !ok {
			break
		}
		fmt.Printf("Worker %d received %c\n",
			id, n)
	}
}

写法2

func worker(id int, c chan int) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
	}
}

如何等待Goroutine

在之前的代码里面都有使用time.Sleep(time.Millisecond)来等待Channel的结束。但是这种方法的危险肉眼可见。所以我们需要修改一下。

修改点:

  1. 在执行打印的时候,不仅接受一个Channel的int,同时在打印完成后,对完传出一个done

  2. 由于有一对chan,将这一对chan提出来做一个struct

  3. 在Createworker的时候用worker构造体传值

  4. 同时在创建10个worker的时候也用构造体创建构造体的slice

  5. 在向内部传值后,同时加入接受done的处理

package main

import (
	"fmt"
)

func doworker(id int, c chan int, done chan bool) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
		done <- true
	}
}

type worker struct {
	in   chan int
	done chan bool
}

func createWorker(id int) worker {
	w := worker{
		in:   make(chan int),
		done: make(chan bool),
	}
	go doworker(id, w.in, w.done)
	return w
}

func chanDemo() {
	var workers [10]worker
	for i := 0; i < 10; i++ {
		workers[i] = createWorker(i)
	}

	for i := 0; i < 10; i++ {
		workers[i].in <- 'a' + i
		<-workers[i].done
	}

	for i := 0; i < 10; i++ {
		workers[i].in <- 'A' + i
		<-workers[i].done
	}
}

func main() {
	fmt.Println("Channel as first-class citizen")
	chanDemo()
}

这个程序跑出来的结果如下:

orker 0 received a
Worker 1 received b
Worker 2 received c
Worker 3 received d
Worker 4 received e
Worker 5 received f
Worker 6 received g
Worker 7 received h
Worker 8 received i
Worker 9 received j
Worker 0 received A
Worker 1 received B
Worker 2 received C
Worker 3 received D
Worker 4 received E
Worker 5 received F
Worker 6 received G
Worker 7 received H
Worker 8 received I
Worker 9 received J

可以看到程序并没有并发,而是传进去一条,然后等待done,然后再传下一条。如果并发效果,使用goroutine就没有了意义。所以这里需要继续改:

func chanDemo() {
	var workers [10]worker
	for i := 0; i < 10; i++ {
		workers[i] = createWorker(i)
	}

	for i, worker := range workers {
		worker.in <- 'a' + i
	}

	for i, worker := range workers {
		worker.in <- 'A' + i
	}
	for _, worker := range workers {
		<-worker.done
		<-worker.done
	}
}

把<-worker.done也单独拿出来接收,但是这样其实是会报错的。程序只会打印出第一段小写的for,原因是所有done的接受都放在了最后,而Channel的是传值是阻塞式的,第一段for的Channel接受完值,但是没有收到done,而第二段done却已经开始执行要传值了,导致报错。这里有一种非常tricky的改法,就是把doworker函数里的done改为goroutine。

func doworker(id int, c chan int, done chan bool) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
		go func() {
			done <- true
		}()
	}
}

最后自己管理goroutine的结束实在太累了,所以go语言提供了一种叫做sync.WaitGroup的语法糖。

WaitGroup的用法:

  1. 定义WaitGroup本身:var wg sync.WaitGroup

  2. 定义需要等待的goroutine数量:wg.Add(20)

  3. 等待:wg.Wait()

  4. 在goroutine执行完成后done:w.done()

在这个程序里还有一些需要说的点:

  1. WaitGroup作为参数的时候一定要用指针

  2. 这里利用函数式编程重构了一下worker构造,把done设置为函数,createworker的时候定义函数同时传入dowork里。

package main

import (
	"fmt"
	"sync"
)

func doWork(id int,
	w worker) {
	for n := range w.in {
		fmt.Printf("Worker %d received %c\n",
			id, n)
		w.done()
	}
}

type worker struct {
	in   chan int
	done func()
}

func createWorker(
	id int, wg *sync.WaitGroup) worker {
	w := worker{
		in: make(chan int),
		done: func() {
			wg.Done()
		},
	}
	go doWork(id, w)
	return w
}

func chanDemo() {
	var wg sync.WaitGroup

	var workers [10]worker
	for i := 0; i < 10; i++ {
		workers[i] = createWorker(i, &wg)
	}

	wg.Add(20)
	for i, worker := range workers {
		worker.in <- 'a' + i
	}
	for i, worker := range workers {
		worker.in <- 'A' + i
	}

	wg.Wait()
}

func main() {
	chanDemo()
}

应用

Go语言有句名言

Don't communicate by sharing memory, share memory by communicating.

比如我们判断前后程序是否处理是否完成的时候经常设置一个flag变量,然后通过检测这个变量来进行判断,这就是所谓通过共享内存进行通信。但是go语言更提倡通过使用Channel,通过通信来共享内存

其实说说直接一点就是作者是推荐使用Channel而不是Mutex的。但是还是要分场景,详细可以参照这篇文章

利用Select

假如你需要从多个Channel接受值,但是你不知道哪一个Channel会先发数据给你。那你可以使用Select语句来完成此类需求。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func generator() chan int {
	out := make(chan int)
	go func() {
		i := 0
		for {
			time.Sleep(
				time.Duration(rand.Intn(1500)) *
					time.Millisecond)
			out <- i
			i++
		}
	}()
	return out
}

func main() {
	var c1, c2 = generator(), generator()
	for {
		select {
		case n := <-c1:
			fmt.Println("Received from c1: ", n)
		case n := <-c2:
			fmt.Println("Received from c1: ", n)
		}
	}
}

执行结果:

Received from c1:  0
Received from c2:  0
Received from c2:  1
Received from c1:  1
Received from c1:  2
Received from c2:  2
Received from c1:  3
^Csignal: interrupt

假如我想把c1和c2的值传给之前写的worker。可以这样写。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func generator() chan int {
	out := make(chan int)
	go func() {
		i := 0
		for {
			time.Sleep(
				time.Duration(rand.Intn(1500)) *
					time.Millisecond)
			out <- i
			i++
		}
	}()
	return out
}

func worker(id int, c chan int) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
	}
}

func createWorker(id int) chan<- int {
	c := make(chan int)
	go worker(id, c)
	return c
}

func main() {
	w := createWorker(0)
	var c1, c2 = generator(), generator()
	for {
		n := 0
		select {
		case n = <-c1:
			fmt.Println("Received from c1: ", n)
		case n = <-c2:
			fmt.Println("Received from c2: ", n)
		case w <- n:
		}
	}
}

但是这样写有个问题,由于generator需要等待,所以在一开始的时候,会走很多遍case w <- n同时输出0。所以我们在传给worker之前,需要检查n是否为空。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func generator() chan int {
	out := make(chan int)
	go func() {
		i := 0
		for {
			time.Sleep(
				time.Duration(rand.Intn(1500)) *
					time.Millisecond)
			out <- i
			i++
		}
	}()
	return out
}

func worker(id int, c chan int) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
	}
}

func createWorker(id int) chan<- int {
	c := make(chan int)
	go worker(id, c)
	return c
}

func main() {
	worker := createWorker(0)
	var c1, c2 = generator(), generator()
	n := 0
	hasValue := false
	for {
		// nil chan永远不会被select到
		var activeworker chan<- int 
		if hasValue {
			activeworker = worker
		}
		select {
		case n = <-c1:
			fmt.Println("Received from c1: ", n)
			hasValue = true
		case n = <-c2:
			fmt.Println("Received from c2: ", n)
			hasValue = true
		case activeworker <- n:
			hasValue = false
		}
	}
}

这样就对了,这里有个知识点是nil chan永远不会被select到。用var activeworker chan<- int定义的chan是nil,所以在最后的case activeworker <- n的时候可以用来作为判断条件。

这里还有一个点需要考虑,现在我们的chan是从c1或者c2传进worker,但是中途经过int变量n,如果worker处理的速度过慢,而c1,c2传入过快,会导致n被覆盖从而无法传入全部的值到worker之中。

个人还想过为什么不直接把c1,c2传给worker,但是worker是个Channel类型,c1,c2也是Channel类型,所以不能直接worker<-c1

这个时候我们需要把c1,c2传进来的数据储存起来。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func generator() chan int {
	out := make(chan int)
	go func() {
		i := 0
		for {
			time.Sleep(
				time.Duration(rand.Intn(1500)) *
					time.Millisecond)
			out <- i
			i++
		}
	}()
	return out
}

func worker(id int, c chan int) {
	for n := range c {
		// 让worker故意慢一点
		time.Sleep(time.Second)
		fmt.Printf("Worker %d received %d\n",
			id, n)
	}
}

func createWorker(id int) chan<- int {
	c := make(chan int)
	go worker(id, c)
	return c
}

func main() {
	var c1, c2 = generator(), generator()
	var worker = createWorker(0)
	// 建立一个切片来储存数据
	var values []int
	// time.After在指定时间后往返回Channel你传入时间
	tm := time.After(10 * time.Second)
	// time.Tick在指定间隔往返回Channel你传入时间
	tick := time.Tick(time.Second)
	for {
		var activeWorker chan<- int
		// 定义有效的值
		var activeValue int
		if len(values) > 0 {
			activeWorker = worker
			// 永远拿队列第一个值
			activeValue = values[0]
		}
		select {
		case n := <-c1:
			values = append(values, n)
		case n := <-c2:
			values = append(values, n)
		case activeWorker <- activeValue:
			// 将队列第二个值往前移动一位
			values = values[1:]
		case <-time.After(800 * time.Millisecond):
			fmt.Println("timeout")
		case <-tick:
			fmt.Println(
				// 每隔一秒查看队列长度
				"queue len =", len(values))
		case <-tm:
			fmt.Println("bye")
			return
		}
	}
}

Last updated