并发编程
支持并发编程是Go语言的一大特性和一大优势。
概述
并行和并发
并行(parallel) :指在同一时刻,有多条指令在多个处理器上同时执行。
并发(concurrency):同一时刻只能有一条指令执行,但是多个进程指令被快速轮换执行。
示例:
- 并行是两个队列同时使用两台咖啡机。
- 并发是两个队列交替使用一台咖啡机。
Go语言的并发优势
Go语言被称为21世纪的C语言,第一是因为Go语言设计简单,第二,21世纪最重要的就是并发程序设计,而Go语言从语言层面就支持了并发。同时,并发程序的内存管理有时候是非常复杂的,而Go语言提供了自动垃圾回收机制。
Go语言为并发编程而内置的上层API是基于CSP(communicating sequential processes 顺序通讯进程)模型。这就意味着显示锁是可以避免的,因为Go语言通过安全的通道发送和接收数据实现同步,大大地简化了并发程序的编写。
goroutine
goroutine是什么
goroutine 是Go并发设计的核心。goroutine说到底其实就是协程,但是它比线程更小,十几个goroutine可能体现在底层就是五六个线程,Go语言内部实现了这些goroutine之间的内存共享。执行goroutine只需极少的栈内存(大概是4-5KB),当然会根据相应的数据伸缩,也正因如此,可以同时运行成千上万个并发任务。goroutine比thread更易用、更高效、更便捷。
创建goroutine
只需再函数调用句前添加go关键字,就可创建并发执行单元。
main() -> 主goroutine协程
package main
import (
"fmt"
"time"
)
func newTask() {
for {
fmt.Println("this is newTask")
time.Sleep(time.Second)
}
}
func main() {
go newTask() // 新建一个协程,新建一个任务
for {
fmt.Println("this is main goroutine")
time.Sleep(time.Second)
}
}
主 goroutine 先退出
主协程退出后,其他子协程也会跟着退出。
若主协程退出过快,可能导致子协程没来得及调用。
package main
import (
"fmt"
"time"
)
func newTask() {
for {
fmt.Println("this is newTask")
time.Sleep(time.Second)
}
}
// 主协程退出后,其他子协程也会跟着退出
func main() {
go func(){
i := 0
for {
i++
fmt.Println("子协程 i = ", i)
time.Sleep(time.Second)
}
}()
i := 0
for {
i++
fmt.Println("mian i = ", i)
time.Sleep(time.Second)
if i == 2 {
break
}
}
}
runtime包
Gosched:让出时间片
让出本轮时间片,先让别的协程执行。
package main
import (
"fmt"
"runtime"
)
func main() {
go func(){
for i := 0; i < 5; i++ {
fmt.Println("go")
}
}()
for i := 0;i<2; i++ {
// 让出时间片,先让别的协程执行,它执行完,再回来执行此协程
runtime.Gosched()
fmt.Println("hello")
}
}
Goexit:退出当前协程
package main
import (
"fmt"
"runtime"
)
func test(){
defer fmt.Println("ccccc")
// return // 终止此函数
runtime.Goexit() // 终止所在的协程
fmt.Println("dddddd")
}
func main() {
// 创建一个协程
go func(){
fmt.Println("aaaaa")
// 调用别的函数
test()
fmt.Println("bbbb")
}()
// 写一个死循环,保持主协程不结束
for {
}
}
GOMAXPROCS:设置可以并行计算的CPU核数的最大值,并返回之前的值
package main
import (
"fmt"
"runtime"
)
func main() {
n := runtime.GOMAXPROCS(1) // 指定以单核运算
fmt.Println(n)
for {
go fmt.Print(1)
fmt.Print(0)
}
}
多任务资源竞争问题
package main
import (
"fmt"
"time"
)
// 定义一个打印机,参数为字符串,按每个字符打印
// 打印机属于公共资源
func Printer(str string) {
for _, data :=range str{
fmt.Printf("%c",data)
time.Sleep(time.Second)
}
fmt.Printf("\n")
}
func person1(){
Printer("hello")
}
func person2(){
Printer("world")
}
func main() {
// 创建两个协程,同时使用打印机
go person1()
go person2()
// 死循环,阻止主协程结束
for{}
}
channel
goroutine运行在相同的地址空间,因此访问共享内存必须做好同步。goroutine奉行通过通信来共享内存,而不是共享内存来通信。
引入channel是CSP(communicating sequential processes 顺序通讯进程)模型的具体实现,用于多个goroutine通讯,其内部实现了同步,确保并发安全。
channel类型
和map类似,channel也对应一个make创建的底层数据结构的引用。
make(chan Type, capacity) // 定义一个channel
channel <- value // 发送value到channel
<-channel // 接收并将其丢弃
x := <-channel // 从channel中接收数据,并赋值给x
x, ok := <=channel // 功能同上,同时检查通道是否已关闭或是否为空
默认情况下,channel发送和接收数据都是阻塞的,除非另一端已经准备好,这样就使得goroutine变得更加的简单,而不需要显式的lock。
通过channel实现同步
package main
import (
"fmt"
"time"
)
// 全局变量,创建一个channel
var ch = make(chan int)
// 定义一个打印机,参数为字符串,按每个字符打印
// 打印机属于公共资源
func Printer(str string) {
for _, data :=range str{
fmt.Printf("%c",data)
time.Sleep(time.Second)
}
fmt.Printf("\n")
}
// person1执行完后,再执行person2
func person1(){
Printer("hello")
ch <- 666 // 给管道写数据,发送
}
func person2(){
<-ch // 从管道取数据,接收,如果通道没有数据,会阻塞
Printer("world")
}
func main() {
// 创建两个协程,同时使用打印机
go person1()
go person2()
// 死循环,阻止主协程结束
for{}
}
通过channel实现同步和数据交互
package main
import (
"fmt"
"time"
)
func main() {
// 创建channel
ch := make(chan string)
defer fmt.Println("主协程已结束1")
go func() {
defer fmt.Println("子协程调用完毕")
for i := 0; i < 2; i++ {
fmt.Println("子协程i = ", i)
time.Sleep(time.Second)
}
ch <- "我是子协程,我工作完毕" // 发送数据到channel
}()
str := <-ch // 没有数据前,阻塞
fmt.Println("str = ", str)
}
无缓存channel
无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。
package main
import (
"fmt"
"time"
)
func main(){
// 创建一个无缓冲的channel
ch := make(chan int)
// len(ch) 缓冲区剩余数据个数,cap(ch)缓冲区大小
fmt.Printf("len(ch) = %d, cap(ch)= %d\n", len(ch), cap(ch))
// 新建协程
go func(){
for i := 0; i < 3; i++ {
fmt.Println("子协程 i = ", i)
ch <- i // 往channel写内容
// fmt.Printf("len(ch) = %d, cap(ch)= %d\n", len(ch), cap(ch))
}
}()
// 延时
time.Sleep(2*time.Second)
for i := 0; i < 3; i++ {
num := <- ch // 读管道中的内容,没有内容前,阻塞
fmt.Println("num = ", num)
}
}
有缓存channel
有缓冲通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。
缓冲区满了之后会阻塞,等待有空位后,继续写入。
package main
import (
"fmt"
"time"
)
func main(){
// 创建一个有缓冲的channel
ch := make(chan int, 3)
// len(ch) 缓冲区剩余数据个数,cap(ch)缓冲区大小
fmt.Printf("len(ch) = %d, cap(ch)= %d\n", len(ch), cap(ch))
// 新建协程
go func(){
for i := 0; i < 10; i++ {
ch <- i // 往channel写内容
fmt.Printf("子协程[%d]:en(ch) = %d, cap(ch)= %d\n", i, len(ch), cap(ch))
}
}()
// 延时
time.Sleep(2*time.Second)
for i := 0; i < 10; i++ {
num := <- ch // 读管道中的内容,没有内容前,阻塞
fmt.Println("num = ", num)
}
}
关闭channel
使用close()方法关闭channel。
- channel不像文件一样需要经常关闭,只有当你确实没有任何需要发送的数据了,或者你想显示的结束range之类的循环,才去关闭channel;
- 关闭channel后,无法向channel再发送数据(会引发panic错误后导致接收立即停止返回零值);
- 关闭channel后,可以继续向channel接收数据;
- 对于nil channel ,无论收发都会被阻塞。
package main
import (
"fmt"
// "time"
)
func main(){
// 定义一个无缓存channel
ch := make(chan int)
// 新建一个goroutine
go func(){
for i := 0; i < 5; i++ {
ch <- i // 往通道写数据
}
// 不需要再写数据时关闭channel
close(ch)
// ch <- 666 // 关闭channel后,无法再发送数据
}()
for{
// 如果ok为true,说明管道没有关闭
if num, ok := <-ch; ok== true{
fmt.Println("num = ", num)
}else{
// 管道关闭
break
}
}
}
通过range遍历channel内容
package main
import (
"fmt"
// "time"
)
func main(){
// 定义一个无缓存channel
ch := make(chan int)
// 新建一个goroutine
go func(){
for i := 0; i < 5; i++ {
ch <- i // 往通道写数据
}
// 不需要再写数据时关闭channel
close(ch)
// ch <- 666 // 关闭channel后,无法再发送数据
}()
for num := range ch {
fmt.Println("num = ", num)
}
}
单向channel
限定只读或只写。
var ch1 chan int // ch1是一个正常的channel,不是单向的
var ch2 chan<- float64 // ch2是单向的channel,只用于写float64数据
var ch3 <-chan int // ch3是单向的channel,只用于读取int数据
可以把channel隐式转换为单向队列,只收或只发,不能将单向队列转化为channel。
package main
import (
"fmt"
)
func main(){
// 创建一个channel,双向
ch := make(chan int)
// 双向channel能隐式转化为单向channel
var writeCh chan<- int = ch // 只能写,不能读
var readCh chan<- int = ch // 只能读,不能写
writeCh<- 666 // 写
<-writeCh
// 单向无法转换为双向
var ch2 chan int = writeCh // err
}
单向channel应用
package main
import (
"fmt"
)
// 此管道通道只能写,不能读
func producer(out chan<- int){
for i := 0; i < 10; i++ {
out <- i*i
}
close(out)
}
// 此channel只能读,不能写
func consumer(in <-chan int){
for num := range in {
fmt.Println("num = ", num)
}
}
func main(){
// 创建一个channel,双向
ch := make(chan int)
// 生产者,生产数字写入channel
go producer(ch) // 新开一个协程,channel传参,引用传递
// 消费者,从channel中读取内容,打印
consumer(ch)
}
定时器
Timer
Timer的使用
package main
import (
"fmt"
"time"
)
// 验证time.NewTimer(),时间到了,只会响应一次
func main(){
timer := time.NewTimer(1 * time.Second)
for {
<-timer.C // fatal error: all goroutines are asleep - deadlock!
// 第二次循环时,造成死锁,说明timer只会响应一次
fmt.Println("时间到")
}
}
// Timer使用
func main01(){
// 创建一个定时器,设置时间为2s,2s后往time通道写内容(当前时间)
timer := time.NewTimer(2*time.Second)
fmt.Println("当前时间:", time.Now())
// 2s后,往timer.C写数据,有数据后,就可以读取
t := <-timer.C // channel没有数据前会阻塞
fmt.Println("t = ", t)
}
通过Timer实现延时功能
package main
import (
"fmt"
"time"
)
// 方法3
func main(){
<-time.After(2 * time.Second) // 定时2s,阻塞2s,2s后产生一个时间,往channel写内容
fmt.Println("时间到")
}
// 方法2
func main02(){
time.Sleep(2 * time.Second)
fmt.Println("时间到")
}
// 延时两秒后,打印一句话
func main01(){
timer := time.NewTimer(2 * time.Second)
<-timer.C
fmt.Println("时间到")
}
停止和重置定时器
package main
import (
"fmt"
"time"
)
// 定时器重置
func main(){
timer := time.NewTimer(10 * time.Second)
timer.Reset(1 * time.Second) // 重置为1s
<-timer.C
fmt.Println("时间到")
}
// 定时器停止
func main01(){
timer := time.NewTimer(3 * time.Second)
go func(){
<-timer.C
fmt.Println("定时器时间到,子协程打印")
}()
timer.Stop() // 停止定时器
for{}
}
Ticker
Ticker是一个定时触发的计时器,它会以一个间隔(interval)往channel发送一个事件(当时时间),而channel的接收者可以以固定的时间间隔从channel中读取事件。
package main
import (
"fmt"
"time"
)
func main(){
ticker := time.NewTicker(1 * time.Second)
i:=0
for {
<-ticker.C
i++
fmt.Println("i = ", i)
if i==5 {
ticker.Stop()
break
}
}
}
select
select可以监听channel上的数据流动。
select的用法与switch非常类似,由select开始一个新的选择块,每个选择条件由case语句来描述。
select有比较多的限制,其中最大的条限制就是每个case语句里必须是一个IO操作。
通过select实现斐波那契数列
// 斐波那契数列 1 1 2 3 5 8 13
package main
import (
"fmt"
)
// ch只写,quit只读
func fibonacci(ch chan<- int, quit <-chan bool) {
x, y := 1, 1
for{
// 监听channel数据流动
select {
case ch <- x:
x, y = y, x+y
case flag := <-quit:
fmt.Println("flag = ", flag)
return
}
}
// x, y = y, x+y
// x, y
// 1, 1
// 1, 2
// 2, 3
// 3, 5
// 5, 8
}
func main(){
ch := make(chan int) // 数字通信
quit := make(chan bool) // 程序是否结束
// 消费者,从channel读取内容
// 新建协程
go func(){
for i:= 0;i<8; i++ {
num := <-ch
fmt.Println(num)
}
// 可以停止
quit <- true
}()
// 生成者,产生数字写入channel
fibonacci(ch, quit)
}
select超时
检测超时,执行操作。
package main
import (
"fmt"
"time"
)
func main(){
ch := make(chan int)
quit := make(chan bool)
// 新开协程,监测数据流动
go func(){
for{
select{
case num := <-ch:
fmt.Println("num = ", num)
case <-time.After(3 * time.Second):
fmt.Println("超时")
quit<-true
}
}
}() // 别忘了()
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(time.Second)
}
<-quit
fmt.Println("程序结束")
}