# Go语言 Channel

## 基础

### 为什么需要Channel

在介绍goroutine的时候有提到过多个goroutine会产生竞争关系，如果使用不得当，轻则无法正确更新变量，重则挂掉程序。我们可以通过原子函数和互斥锁来解决竞争关系，但是还有一种更有趣的方法，那就是使用Channel。

当一个资源需要在goroutine之间共享时，Channel在goroutine之间架起了一个管道，并提供了 确保同步交换数据的机制。

### **定义Channel**

```go
//无缓冲的整型通道
unbuffered := make(chan int)
//有缓冲的字符串通道
buffered := make(chan string, 10)
```

### 从Channel里收发数据

```go
//通过通道发送一个字符串 
buffered <- 1
//从通道接收一个字符串 
value := <- buffered
```

### 无缓冲的Channel

无缓冲的Channel必须有接受者，如果没有接受者（worker）就会报错。

```go
package main

import (
	"fmt"
	"time"
)

func worker(id int, c chan int) {
	for  {
		fmt.Printf("Worker %d received %d\n",
			id, <-c)
	}
}

func createWorker(id int) chan<- int {
	c := make(chan int)
	//Channel的接受者
	go worker(id, c)
	return c
}

func chanDemo() {
	//建立Channel的slice （Channel是一等公民）
	var channels [10]chan<- int
	//建立10个Channel和接受者
	for i := 0; i < 10; i++ {
		channels[i] = createWorker(i)
	}
	//传入值
	for i := 0; i < 10; i++ {
		channels[i] <- 'a' + i
	}
	//传入值
	for i := 0; i < 10; i++ {
		channels[i] <- 'A' + i
	}
	//程序持续1 Millisecond
	time.Sleep(time.Millisecond)
}

func main() {
	fmt.Println("Channel as first-class citizen")
	chanDemo()
}
```

注意在把Channel当做函数参数使用的时候，可以定义Channel的数据走向`chan<- int`,这样Channel就只能接受数据。

### 有缓冲的Channel

有缓冲的Channel就算没有接受者，只要没有超过预先设置的缓冲，就能保存传入的值。

```go
package main

import (
	"fmt"
	"time"
)

func worker(id int, c chan int) {
	for {
		fmt.Printf("Worker %d received %c\n",
			id, <-c)
	}
}

func bufferedChannel() {
	c := make(chan int, 3)
	go worker(0, c)
	c <- 'a'
	c <- 'b'
	c <- 'c'
	c <- 'd'
	time.Sleep(time.Millisecond)
}

func main() {
	fmt.Println("Buffered channel")
	bufferedChannel()
}
```

如果不加worker，程序会报错，但是如果把`c <- 'd'`也注释掉，程序时不会报错的。

### 关闭Channel

```go
package main

import (
	"fmt"
	"time"
)

func worker(id int, c chan int) {
	for {
		fmt.Printf("Worker %d received %c\n",
			id, <-c)
	}
}

func channelClose() {
	c := make(chan int)
	go worker(0, c)
	c <- 'a'
	c <- 'b'
	c <- 'c'
	c <- 'd'
	close(c)
	time.Sleep(time.Millisecond)
}

func main() {
	fmt.Println("Channel close and range")
	channelClose()
}
```

在传完4个值之后关闭了Channel，但是worker还在无限循环并且往外传Channel。但是由于已经没有值了，在1 毫秒的时间内，会生成下面的结果。

```
Channel close and range
Worker 0 received a
Worker 0 received b
Worker 0 received c
Worker 0 received d
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
Worker 0 received 
```

如何对应这个问题可以有2种写法。

**写法1**

```go
func worker2(id int, c chan int) {
	for {
		n, ok := <-c
		if !ok {
			break
		}
		fmt.Printf("Worker %d received %c\n",
			id, n)
	}
}
```

**写法2**

```go
func worker(id int, c chan int) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
	}
}
```

### 如何等待Goroutine

在之前的代码里面都有使用`time.Sleep(time.Millisecond)`来等待Channel的结束。但是这种方法的危险肉眼可见。所以我们需要修改一下。

修改点：

1. 在执行打印的时候，不仅接受一个Channel的int，同时在打印完成后，对完传出一个done
2. 由于有一对chan，将这一对chan提出来做一个struct
3. 在Createworker的时候用worker构造体传值
4. 同时在创建10个worker的时候也用构造体创建构造体的slice
5. 在向内部传值后，同时加入接受done的处理

```go
package main

import (
	"fmt"
)

func doworker(id int, c chan int, done chan bool) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
		done <- true
	}
}

type worker struct {
	in   chan int
	done chan bool
}

func createWorker(id int) worker {
	w := worker{
		in:   make(chan int),
		done: make(chan bool),
	}
	go doworker(id, w.in, w.done)
	return w
}

func chanDemo() {
	var workers [10]worker
	for i := 0; i < 10; i++ {
		workers[i] = createWorker(i)
	}

	for i := 0; i < 10; i++ {
		workers[i].in <- 'a' + i
		<-workers[i].done
	}

	for i := 0; i < 10; i++ {
		workers[i].in <- 'A' + i
		<-workers[i].done
	}
}

func main() {
	fmt.Println("Channel as first-class citizen")
	chanDemo()
}
```

这个程序跑出来的结果如下:

```
orker 0 received a
Worker 1 received b
Worker 2 received c
Worker 3 received d
Worker 4 received e
Worker 5 received f
Worker 6 received g
Worker 7 received h
Worker 8 received i
Worker 9 received j
Worker 0 received A
Worker 1 received B
Worker 2 received C
Worker 3 received D
Worker 4 received E
Worker 5 received F
Worker 6 received G
Worker 7 received H
Worker 8 received I
Worker 9 received J
```

可以看到程序并没有并发，而是传进去一条，然后等待done，然后再传下一条。如果并发效果，使用goroutine就没有了意义。所以这里需要继续改：

```go
func chanDemo() {
	var workers [10]worker
	for i := 0; i < 10; i++ {
		workers[i] = createWorker(i)
	}

	for i, worker := range workers {
		worker.in <- 'a' + i
	}

	for i, worker := range workers {
		worker.in <- 'A' + i
	}
	for _, worker := range workers {
		<-worker.done
		<-worker.done
	}
}
```

把<-worker.done也单独拿出来接收，但是这样其实是会报错的。程序只会打印出第一段小写的for，原因是所有done的接受都放在了最后，**而Channel的是传值是阻塞式的**，第一段for的Channel接受完值，但是没有收到done，而第二段done却已经开始执行要传值了，导致报错。这里有一种非常tricky的改法，就是把doworker函数里的done改为goroutine。

```go
func doworker(id int, c chan int, done chan bool) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
		go func() {
			done <- true
		}()
	}
}
```

最后自己管理goroutine的结束实在太累了，所以go语言提供了一种叫做`sync.WaitGroup`的语法糖。

WaitGroup的用法：

1. 定义WaitGroup本身：`var wg sync.WaitGroup`
2. 定义需要等待的goroutine数量：`wg.Add(20)`
3. 等待：`wg.Wait()`
4. 在goroutine执行完成后done：`w.done()`

在这个程序里还有一些需要说的点：

1. WaitGroup作为参数的时候一定要用指针
2. 这里利用函数式编程重构了一下worker构造，把done设置为函数，createworker的时候定义函数同时传入dowork里。

```go
package main

import (
	"fmt"
	"sync"
)

func doWork(id int,
	w worker) {
	for n := range w.in {
		fmt.Printf("Worker %d received %c\n",
			id, n)
		w.done()
	}
}

type worker struct {
	in   chan int
	done func()
}

func createWorker(
	id int, wg *sync.WaitGroup) worker {
	w := worker{
		in: make(chan int),
		done: func() {
			wg.Done()
		},
	}
	go doWork(id, w)
	return w
}

func chanDemo() {
	var wg sync.WaitGroup

	var workers [10]worker
	for i := 0; i < 10; i++ {
		workers[i] = createWorker(i, &wg)
	}

	wg.Add(20)
	for i, worker := range workers {
		worker.in <- 'a' + i
	}
	for i, worker := range workers {
		worker.in <- 'A' + i
	}

	wg.Wait()
}

func main() {
	chanDemo()
}

```

## 应用

Go语言[有句名言](https://blog.golang.org/share-memory-by-communicating)：

> Don't communicate by sharing memory, share memory by communicating.

比如我们判断前后程序是否处理是否完成的时候经常设置一个flag变量，然后通过检测这个变量来进行判断，这就是所谓**通过共享内存进行通信**。但是go语言更提倡通过使用Channel，**通过通信来共享内存**。

其实说说直接一点就是作者是推荐使用Channel而不是Mutex的。但是还是要分场景，详细可以参照[这篇文章](https://segmentfault.com/a/1190000017890174)。

### 利用Select

假如你需要从多个Channel接受值，但是你不知道哪一个Channel会先发数据给你。那你可以使用Select语句来完成此类需求。

```go
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func generator() chan int {
	out := make(chan int)
	go func() {
		i := 0
		for {
			time.Sleep(
				time.Duration(rand.Intn(1500)) *
					time.Millisecond)
			out <- i
			i++
		}
	}()
	return out
}

func main() {
	var c1, c2 = generator(), generator()
	for {
		select {
		case n := <-c1:
			fmt.Println("Received from c1: ", n)
		case n := <-c2:
			fmt.Println("Received from c1: ", n)
		}
	}
}
```

执行结果：

```
Received from c1:  0
Received from c2:  0
Received from c2:  1
Received from c1:  1
Received from c1:  2
Received from c2:  2
Received from c1:  3
^Csignal: interrupt
```

假如我想把c1和c2的值传给之前写的worker。可以这样写。

```go
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func generator() chan int {
	out := make(chan int)
	go func() {
		i := 0
		for {
			time.Sleep(
				time.Duration(rand.Intn(1500)) *
					time.Millisecond)
			out <- i
			i++
		}
	}()
	return out
}

func worker(id int, c chan int) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
	}
}

func createWorker(id int) chan<- int {
	c := make(chan int)
	go worker(id, c)
	return c
}

func main() {
	w := createWorker(0)
	var c1, c2 = generator(), generator()
	for {
		n := 0
		select {
		case n = <-c1:
			fmt.Println("Received from c1: ", n)
		case n = <-c2:
			fmt.Println("Received from c2: ", n)
		case w <- n:
		}
	}
}
```

但是这样写有个问题，由于generator需要等待，所以在一开始的时候，会走很多遍`case w <- n`同时输出0。所以我们在传给worker之前，需要检查n是否为空。

```go
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func generator() chan int {
	out := make(chan int)
	go func() {
		i := 0
		for {
			time.Sleep(
				time.Duration(rand.Intn(1500)) *
					time.Millisecond)
			out <- i
			i++
		}
	}()
	return out
}

func worker(id int, c chan int) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
	}
}

func createWorker(id int) chan<- int {
	c := make(chan int)
	go worker(id, c)
	return c
}

func main() {
	worker := createWorker(0)
	var c1, c2 = generator(), generator()
	n := 0
	hasValue := false
	for {
		// nil chan永远不会被select到
		var activeworker chan<- int 
		if hasValue {
			activeworker = worker
		}
		select {
		case n = <-c1:
			fmt.Println("Received from c1: ", n)
			hasValue = true
		case n = <-c2:
			fmt.Println("Received from c2: ", n)
			hasValue = true
		case activeworker <- n:
			hasValue = false
		}
	}
}
```

这样就对了，这里有个知识点是nil chan永远不会被select到。用`var activeworker chan<- int`定义的chan是nil，所以在最后的`case activeworker <- n`的时候可以用来作为判断条件。

这里还有一个点需要考虑，现在我们的chan是从c1或者c2传进worker，但是中途经过int变量n，如果worker处理的速度过慢，而c1,c2传入过快，会导致n被覆盖从而无法传入全部的值到worker之中。

> 个人还想过为什么不直接把c1,c2传给worker，但是worker是个Channel类型，c1,c2也是Channel类型，所以不能直接`worker<-c1`

这个时候我们需要把c1,c2传进来的数据储存起来。

```go
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func generator() chan int {
	out := make(chan int)
	go func() {
		i := 0
		for {
			time.Sleep(
				time.Duration(rand.Intn(1500)) *
					time.Millisecond)
			out <- i
			i++
		}
	}()
	return out
}

func worker(id int, c chan int) {
	for n := range c {
		// 让worker故意慢一点
		time.Sleep(time.Second)
		fmt.Printf("Worker %d received %d\n",
			id, n)
	}
}

func createWorker(id int) chan<- int {
	c := make(chan int)
	go worker(id, c)
	return c
}

func main() {
	var c1, c2 = generator(), generator()
	var worker = createWorker(0)
	// 建立一个切片来储存数据
	var values []int
	// time.After在指定时间后往返回Channel你传入时间
	tm := time.After(10 * time.Second)
	// time.Tick在指定间隔往返回Channel你传入时间
	tick := time.Tick(time.Second)
	for {
		var activeWorker chan<- int
		// 定义有效的值
		var activeValue int
		if len(values) > 0 {
			activeWorker = worker
			// 永远拿队列第一个值
			activeValue = values[0]
		}
		select {
		case n := <-c1:
			values = append(values, n)
		case n := <-c2:
			values = append(values, n)
		case activeWorker <- activeValue:
			// 将队列第二个值往前移动一位
			values = values[1:]
		case <-time.After(800 * time.Millisecond):
			fmt.Println("timeout")
		case <-tick:
			fmt.Println(
				// 每隔一秒查看队列长度
				"queue len =", len(values))
		case <-tm:
			fmt.Println("bye")
			return
		}
	}
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://dingyj.gitbook.io/blog/golang/basic/go-yu-yan-xue-xi-bi-ji/go-yu-yan-channel.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
