16 并发

[toc]

Go 语言 在语言层面天生支持并发

并发 与 并行

  • 并发:在同一时间段内执行多个任务
    • 用微信和两个人聊天
  • 并行同一时刻执行多个任务
    • 我和朋友都在用微信别人聊天
  • Go 的并发通过goroutine 实现
    • goroutin 类似于线程,属于用户态的线程,可以创建许多个goroutine 并发工作
    • goroutine 是 由Go 语言运行时,调度完成,而线程是由操作系统调度完成
  • Go 还提供channel 在多个goroutine 间进行通信
  • goroutinechannel 是Go 语言秉承的 CSP(Communicating Sequential Process) 并发模式的重要实现基础

goroutine

  • java/c++ 中实现并发编程:耗费大量心智

    • 需要自己维护一个线程池,
    • 需要自己去包装一个又一个任务
    • 需要自己去调度线程执行任务并维护上下文切换
  • Go 中goroutine 机制:程序员自己定义多个任务,让系统帮助我们将任务分配到 CPU 上实现并发

  • goroutine :类似于线程,由Go 在 runtime 时调度和管理

  • Go 程序会智能地将 goroutine 中的任务合理地分配给每个CPU

  • 现代化的编程语言:在语言层面已经内置了调度和上下文切换的机制

  • Go 中不需要自己写进程、线程、协程,只需要goroutine

  • 当需要让某个任务并发执行的时候,只需要将这个任务 包装成一个函数,开启一个goroutine 去执行这个函数就可以

使用 goroutine

  • Go 中使用goroutine
    • 再调用函数的时候前面加关键字go ,即可为函数创建一个goroutine
  • 一个goroutine 必定对应一个函数,可以创建多个goroutine 去执行相同的函数

启动单个 goroutine

  • 在匿名函数或者普通函数前go

  • 示例

    1
    2
    3
    4
    5
    6
    7
    func hello() {
    fmt.Println("Hello Goroutine!")
    }
    func main() {
    hello()
    fmt.Println("main goroutine done!")
    }
  • 示例中,hello( ) 和 main() 是串行执行的
    结果

    1
    2
    Hello Goroutine!
    main goroutine done!
  • 在函数hello前加关键字go ,启动一个goroutine去执行hello函数

    1
    2
    3
    4
    func main() {
    go hello() // 启动另外一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
    }
  • 执行结果纸打印了main goroutine done!

    • 因为程序启动当时,Go 程序会为main()函数创建一个默认的goroutine
    • 当main() 返回时,该goroutine 就结束了,所有在main() 中启动的goroutine 会一同结束,main 所在的goroutine 是夜王,而其他goroutine 是异鬼
  • 让main函数等hello函数:time.Sleep

    • ```go
      func main() {
      go hello() // 启动另外一个goroutine去执行hello函数
      fmt.Println("main goroutine done!")
      time.Sleep(time.Second)
      
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25

      * 执行结果:先打印`main goroutine done!` , 在打印`Hello Goroutine!`
      * 在创建`goroutine` 时需要花费时间,此时main函数所在的`goroutine` 是继续执行的



      ## 启动多个 goroutine

      * 使用`sysnc.WaitGroup` 实现`goroutine` 的同步

      * ```go
      var wg sync.WaitGroup

      func hello(i int) {
      defer wg.Done() // goroutine结束就登记-1
      fmt.Println("Hello Goroutine!", i)
      }
      func main() {

      for i := 0; i < 10; i++ {
      wg.Add(1) // 启动一个goroutine就登记+1
      go hello(i)
      }
      wg.Wait() // 等待所有登记的goroutine都结束
      }
  • 多次执行,打印的顺序不一致,

    • 因为10goroutine 是并发执行的,而goroutine 的调度是随机的

goroutine 与 线程

可增长的栈

  • OS 线程一般都有固定的栈内存(通常为2 MB)
  • 一个goroutine 的栈在其生命周期开始时只有很小的栈(一般为2 KB),goroutine 的栈不是固定的,按需增大和缩小,goroutine 的栈大小最大为1 GB
  • 因此一次创建十万groutine 也可以

goroutine 调度

  • GPM 是Go运行时(runtime) 层面的实现,是Go自己实现的一套调度系统,区别于操作系统调度OS线程

    • G: 一个goroutine , 存放着本共routine信息,和与所在P的绑定
    • P: 管理着一组 goroutine 队列
      • P 中存储着当前 goroutine 运行的上下文环境(函数指针、堆栈地址、地址边界)
      • P 对自己管理的 goroutine 队列做出一些调度(例如将占用CPU时间长的 goroutine 暂停、运行后续的goroutine等)
      • 当自己的队列消费完后,就去全局队列里取,若全局队列也消费完了,去其他P的队列里抢任务
    • M(machine): Go 运行时(runtime)对 操作系统内核线程的虚拟M 与内核线程一般是一一映射的关系,一个 goroutine 最终是要放到 M 上执行
  • PM 也是一一对应:

    • P 管理着一组 G 挂载 在M上运行
    • 当一个 G 长久阻塞在一个 M 上时,runtime 会新建一个 M,阻塞G 所在的P会把其他的 G 挂载在新建的 M 上
    • 当旧 的 G阻塞完成或者认为其已经死掉时,回收旧的 M
  • P 的个数通过runtime.GOMAXPROCS设定,最大256,Go1.5默认物理线程数

    • 并发量大时,会增加一些 P 和 M,不会太多
  • ==注意==

    • 从线程调度看,Go语言与其它语言相比的优势
      • OS线程是由OS内核来调度的,goroutine 则是由Go运行时(runtime)自己的调度器调度的,调度器使用一个称为**m:n调度的技术(复用/调度 m 个goroutine到 n 个OS线程**)
        • 一大特点: goroutine 的调度在用户态完成,不涉及内核态与用户台的频繁切换,包括内存的分配与释放,都是在用户态维护着一块内存池,不直接调用系统的 malloc 函数,成本比调度OS线程低很多
        • 另一方面,充分利用了多核的硬件资源,近似地把若干 goroutine 均分在物理线程上,再加上goroutine的超轻量,都能保证go调度方面的性能

GOMAXPROCS

  • Go 运行时的调度器使用GOMAXPROCS参数,确定使用多少个 OS 线程 来同时执行 Go 代码

    • 默认值是机器上的 CPU 核心数
    • 例如:一个8 核心的机器上,调度器会把 Go 代码同时调度到 8 个OS线程上
    • 8 即为 m:n 调度中的n
  • Go 中通过runtime.GOMAXPROCS()函数 设置当前程序 并发时 占用的CPU逻辑核心数

  • Go 1.5之前,默认使用 单核心 执行,Go 1.5 之后,默认使用全部的CPU逻辑核心数

  • 可以通过 将任务分配到不同的CPU逻辑核心 上实现 并行 的效果:

    • ```go
      func a() {
      for i := 1; i < 10; i++ {
          fmt.Println("A:", i)
      }
      
      }func b() {
      for i := 1; i < 10; i++ {
          fmt.Println("B:", i)
      }
      
      }func main() {
      runtime.GOMAXPROCS(1)
      go a()
      go b()
      time.Sleep(time.Second)
      
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24

      * 两个任务只有一个逻辑核心,此时是做完一个任务再做另一个任务。

      * 将逻辑核心数设为 2,此时两个任务并行执行:

      * ```go
      func a() {
      for i := 1; i < 10; i++ {
      fmt.Println("A:", i)
      }
      }

      func b() {
      for i := 1; i < 10; i++ {
      fmt.Println("B:", i)
      }
      }

      func main() {
      runtime.GOMAXPROCS(2)
      go a()
      go b()
      time.Sleep(time.Second)
      }
  • Go 中的 操作系统线程goroutine 的关系

    • 一个 OS线程 对应 用户态多个goroutine
    • go 程序可以同时使用多个 OS线程
    • goroutineOS线程 是多对多的关系即**m:n**

channel

  • 单纯的将函数并发执行无意义,函数与函数间 交换数据才能体现并发执行函数的意义

  • 可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞争

    • 为了 数据交换 的正确性,必须使用 互斥量 对内存进行加锁,这种做法会有性能问题
  • Go 的并发模型:**CSP(Communicating Sequential Process)**, 提倡通过 通信共享内存 而不是 通过共享内存而实现通信

  • goroutine 是Go 并发执行的执行体channel 是他们之间的连接

    • channel 可以让一个goroutine 发送特定值到另一个goroutine的通信机制
  • channel 是特殊的类型,像一个传送带或者队列,总是遵循先入先出的规则,以保证收发数据的顺序

    • 每一个通道是一个具体类型的导管,即在声明channel时需要指定其元素类型

channel 类型

  • channel 是一种引用类型

  • 声明 channel 类型的格式:var 变量 chan 元素类型

  • ```go
    var ch1 chan int // 声明一个传递整型的通道
    var ch2 chan bool // 声明一个传递布尔型的通道
    var ch3 chan []int // 声明一个传递int切片的通道

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17



    ## 创建 channel

    * channel 是**引用类型**,**空值**是`nil`

    * 声明的通道需要使用`make` 初始化才能使用

    * 创建 channel 的格式:`make(chan 元素类型,缓冲区大小)`

    * 缓冲区大小可选

    * ```go
    ch4 := make(chan int)
    ch5 := make(chan bool)
    ch6 := make(chan []int)

channel 操作

  • channel操作:
    • 发送:send
    • 接收:receive
    • 关闭:close
  • 发送 和 接收:使用<- 符号
  • 定义一个通道ch := make(chan int)

发送

  • 将一个值发送到通道中
  • ch <- 10

接收

  • 从一个通道中接收值

  • ```go
    x := <- ch //从ch通道中接收值并赋值给变量x
    <- ch //从ch中接收值,忽略结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26

    ### 关闭

    * 调用内置函数`close` 关闭通道
    * `close(ch)`
    * ==注意:==
    * 只有在通知接收方 goroutine **所有的数据都发送完毕**时,才需要关闭通道
    * **通道可以被垃圾回收机制回收**的,和关闭文件不一样(必须做),关闭通道不一定要做
    * 关闭后的通道的==特点==
    * 对一个关闭的通道 再发送值就会导致 panic
    * 对一个关闭的通道进行接收会一直获取值,直到通道为空
    * 对一个关闭且没有值的通道执行接受操作,会得到对应类型的零值
    * 关闭一个已经关闭的通道会导致 panic



    ## 无缓冲的通道

    * 无缓冲的通道(阻塞的通道)

    * ```go
    func main() {
    ch := make(chan int)
    ch <- 10
    fmt.Println("发送成功")
    }
  • 编译能通过,但执行会报错:

    • ```
      fatal error: all goroutines are asleep - deadlock!

      goroutine 1 [chan send]:
      main.main()

      e:/LearningNotes/Go/goroutine/zuse.go:7 +0x5f
      

      exit status 2

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20

      * `deadlock`:

      * 因为`ch:=make(chan int)` 创建的是**无缓冲区的通道**,无缓冲区的通道只有在**有人 接受值 的时候**才能发送值

      * 上面代码阻塞于`ch <- 10`,形成死锁

      * **一种方法**:启用一个`goroutine`接收值

      * ```go
      func recv(c chan int) {
      ret := <-c
      fmt.Println("接收成功", ret)
      }
      func main() {
      ch := make(chan int)
      go recv(ch) // 启用goroutine从通道接收值
      ch <- 10
      fmt.Println("发送成功")
      }
    • 无缓冲通道 上的 发送操作 会阻塞,直到另一个goroutine 在该通道上执行 接收操作,才能发送成功,两个goroutine 会继续执行

    • 相反,若 接受操作 先执行,接收方的goroutine 将阻塞,直到另一个goroutine 在该通道上发送一个值

  • 使用 无缓冲通道 进行通信将导致发送和接收的goroutine 同步化

  • 因此 无缓冲通道 被称为同步通道

有缓冲的通道

  • 解决上面的问题,另一种方法: 使用有缓冲的通道

  • 有缓冲的通道:使用 make初始化 的时候为其指定容量:

    • ```go
      func main() {
      ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
      ch <- 10
      fmt.Println("发送成功")
      
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43

      * 只要通道容量大于零,该通道就是**有缓冲的通道**,容量表示通道中能存放元素的数量
      * 若满则阻塞
      * 可以使用内置函数`len`获取通道内元素的数量,使用`cap`获取通道的容量



      ## for range 从通道循环取值

      * 向通道发送完数据后,可以通过 `close` 关闭通道

      * 通道被关闭时,在向通道发送值 会引发`panic`,从该通道取值的操作会先取完通道内的值,然后一直取得都是**对应类型的零值**

      * 判断通道是否被关闭:

      * ```go
      // channel 练习
      func main() {
      ch1 := make(chan int)
      ch2 := make(chan int)
      // 开启goroutine将0~100的数发送到ch1中
      go func() {
      for i := 0; i < 100; i++ {
      ch1 <- i
      }
      close(ch1)
      }()
      // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
      go func() {
      for {
      i, ok := <-ch1 // 通道关闭后再取值ok=false
      if !ok {
      break
      }
      ch2 <- i * i
      }
      close(ch2)
      }()
      // 在主goroutine中从ch2中接收值打印
      for i := range ch2 { // 通道关闭后会退出for range循环
      fmt.Println(i)
      }
      }
  • 有两种方式在接收值时,判断该通道是否被关闭

    • ```go
      for {
              i, ok := <-ch1 // 通道关闭后再取值ok=false
              if !ok {
                  break
              }
              ch2 <- i * i
          }
      
      1
      2
      3
      4
      5
      6
      7



      * ```go
      for i := range ch2 { // 通道关闭后会退出for range循环
      fmt.Println(i)
      }

单向通道

  • 可以将通道作为 参数多个任务函数间 传递

  • 可以在不同的任务函数中,对使用的通道进行限制只能发送只能接收

  • Go 提供了 单向通道

    • ```go
      func counter(out chan<- int) {
      for i := 0; i < 100; i++ {
          out <- i
      }
      close(out)
      
      }func squarer(out chan<- int, in <-chan int) {
      for i := range in {
          out <- i * i
      }
      close(out)
      
      }
      func printer(in <-chan int) {
      for i := range in {
          fmt.Println(i)
      }
      
      }func main() {
      ch1 := make(chan int)
      ch2 := make(chan int)
      go counter(ch1)
      go squarer(ch2, ch1)
      printer(ch2)
      
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49

      * `chan<- int` 是一个**只写单向通道**(只能对其写入int类型),可以对其**发送操作**,但不能执行接收操作
      * `<-chan int` 是一个**只读单向通道**(只能对其读取int类型),可以对其执行**接收操作**,但不能执行发送操作

      * 在函数传参,以及任何赋值操作中,可以**将双向通道转换成单向通道**,不能反之;

      ## 通道总结

      * channel **异常总结**:
      * ![channel异常总结](https://www.liwenzhou.com/images/Go/concurrence/channel01.png)

      * 关闭已关闭 的`channel` 会引发`panic`



      # worker pool(goroutine 池)

      * 通常会使用 可以指定启动的 `goroutine` 数量- `worker pool` 模式,以控制`goroutine` 的数量,防止`goroutine` 的泄露和暴涨

      * 简易 `worker pool` 示例:

      * ```go
      func worker(id int, jobs <-chan int, results chan<- int) {
      for j := range jobs {
      fmt.Printf("worker:%d start job:%d\n", id, j)
      time.Sleep(time.Second)
      fmt.Printf("worker:%d end job:%d\n", id, j)
      results <- j * 2
      }
      }


      func main() {
      jobs := make(chan int, 100)
      results := make(chan int, 100)
      // 开启3个goroutine
      for w := 1; w <= 3; w++ {
      go worker(w, jobs, results)
      }
      // 5个任务
      for j := 1; j <= 5; j++ {
      jobs <- j
      }
      close(jobs)
      // 输出结果
      for a := 1; a <= 5; a++ {
      <-results
      }
      }

select 多路复用

  • 通道在 接收数据 时,若没有数据可以接收会发生 阻塞

  • 若需要同时从多个通道接收数据,则更有可能发生上述阻塞

  • 例如:

    • ```go
      for{
      // 尝试从ch1接收值
      data, ok := <-ch1
      // 尝试从ch2接收值
      data, ok := <-ch2
      …
      
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23

      * 这种遍历方式,可以实现从多个通道接收值,但运行性能很差

      * 为了解决上述问题:Go 内置了`select` 关键字,可以同时响应多个通道的操作

      * `select` 的使用类似于`switch` 语句,有一系列`case` 分支和一个默认分支

      * 每个`case` 会对应一个通道的通信过程(send or receive)
      * `select` 会一直等待,知道某个`case` 的通信操作完成,会执行`case` 分支对应的语句

      * 格式:

      * ```go
      select{
      case <-ch1:
      ...
      case data := <-ch2:
      ...
      case ch3<-data:
      ...
      default:
      默认操作
      }
  • 示例:

    • ```go
      func main() {
      ch := make(chan int, 1)
      for i := 0; i < 10; i++ {
          select {
          case x := <-ch:
              fmt.Println(x)
          case ch <- i:
          }
      }
      
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31

      * 使用`select` 语句增加代码可读性:
      * 可处理一个或多个 channel 的发送/接受操作
      * 如果多个`case` 同时满足,`select` 会随机选择一个
      * 对于没有`case` 的`select{}` 会一直等待,可用于阻塞`main` 函数



      # 并发安全和锁

      * Go 中,可能会存在多个`goroutine` 同时操作一个资源(临界区),这种情况会发生`竞态问题` 数据竞争

      * 示例:

      * ```go
      var x int64
      var wg sync.WaitGroup

      func add() {
      for i := 0; i < 5000; i++ {
      x = x + 1
      }
      wg.Done()
      }
      func main() {
      wg.Add(2)
      go add()
      go add()
      wg.Wait()
      fmt.Println(x)
      }
  • 开启两个goroutine 去累加 x 的值,这两个goroutine 在访问和修改x 时会存在数据竞争,导致结果与期待不符

  • package time

    • time.Sleep(time.Second) : 主线程等到所有的 goroutine 都运行完毕,使得主线程睡眠一会,等待其他线程充分运行
    • sync.WaitGroup : 内部有个计时器,最初从零开始。有三个方法控制计数器的数量
      • Add() : 将计数器设置为n
      • Done() : 每次将计数器 -1
      • Wait() : 会阻塞代码运行,直到计数器的值减少为0

互斥锁

  • 互斥锁:常用的控制共享资源访问的方法,能保证同时只有一个goroutine 可以访问共享资源

  • Go 中使用syncMutex 包实现互斥锁

    • ```go
      var x int64
      var wg sync.WaitGroup
      var lock sync.Mutexfunc add() {
      for i := 0; i < 5000; i++ {
          lock.Lock() // 加锁
          x = x + 1
          lock.Unlock() // 解锁
      }
      wg.Done()
      
      }
      func main() {
      wg.Add(2)
      go add()
      go add()
      wg.Wait()
      fmt.Println(x)
      
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66

      * 使用 **互斥锁** 能保证同一时间有且只有一个`goroutine` 进入 **临界区**,其他的`goroutine` 则在等待锁
      * 当互斥锁释放后,等待的`goroutine` 才可以获取锁进入临界区

      * 当多个`goroutine` 同时等待一个锁时,唤醒的策略是随机的



      ## 读写互斥锁

      * 互斥锁 是 **完全互斥** 的

      * 并发的读取一个资源时,不涉及修改资源不需要加锁,这时使用读写锁更好

      * **读写锁** 在 Go 中:`sync` 包 中的`RWMutex`

      * 读写锁:读锁 和 写锁

      * 当一个 `goroutine ` 获取读锁后,其他的`goroutine` 若获取读锁会继续获得锁,若获取写锁会等待
      * 当一个`goroutine` 获取写锁之后,其他的`goroutine` 无论是获取读锁还是写锁都会等待

      * 示例

      * ```go
      var (
      x int64
      wg sync.WaitGroup
      lock sync.Mutex
      rwlock sync.RWMutex
      )

      func write() {
      // lock.Lock() // 加互斥锁
      rwlock.Lock() // 加写锁
      x = x + 1
      time.Sleep(10 * time.Millisecond) // 假设写操作耗时10毫秒
      rwlock.Unlock() // 解写锁
      // lock.Unlock() // 解互斥锁
      wg.Done()
      }

      func read() {
      // lock.Lock() // 加互斥锁
      rwlock.RLock() // 加读锁
      time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
      rwlock.RUnlock() // 解读锁
      // lock.Unlock() // 解互斥锁
      wg.Done()
      }

      func main() {
      start := time.Now()
      for i := 0; i < 10; i++ {
      wg.Add(1)
      go write()
      }

      for i := 0; i < 1000; i++ {
      wg.Add(1)
      go read()
      }

      wg.Wait()
      end := time.Now()
      fmt.Println(end.Sub(start))
      }
  • 读写锁 适合 读多写少的场景

sync.WaitGroup

  • Go 中 使用sync.WaitGroup 来实现并发任务的同步。sync.WaitGroup 的方法:

    • 方法名 功能
      (wg *WaitGroup) Add(delta int) 计数器+delta
      (wg *WaitGroup) Done() 计数器-1
      (wg *WaitGroup) Wait() 阻塞 直到计数器变为0
  • sync.WaitGroup 内部维护这一个计数器,计数器的值可以增加和减少

    • 当启动了 N 个并发任务时,就将计数器值增加N
    • 每个任务完成时,通过调用Done() 将计数器减 1
    • 通过调用 Wait() 来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成
  • sync.WaitGroup示例:

    • ```go
      var wg sync.WaitGroupfunc hello() {
      defer wg.Done()
      fmt.Println("Hello Goroutine!")
      
      }
      func main() {
      wg.Add(1)
      go hello() // 启动另外一个goroutine去执行hello函数
      fmt.Println("main goroutine done!")
      wg.Wait()
      
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40

      * 注意:`sync.WaitGroup` 是一个结构体,传递的时候需要传递指针

      ## sync.Once

      * 在许多场景,确保某些操作在高并发的场景下只执行一次
      * 例如 只加载一次配置文件,只i关闭一次通道

      ### 加载配置文件示例

      ### 并发安全的单例模式

      ## sync.Map

      * Go 中的内置map 不是并发安全:

      * ```go
      var m = make(map[string]int)

      func get(key string) int {
      return m[key]
      }

      func set(key string, value int) {
      m[key] = value
      }

      func main() {
      wg := sync.WaitGroup{}
      for i := 0; i < 20; i++ {
      wg.Add(1)
      go func(n int) {
      key := strconv.Itoa(n)
      set(key, n)
      fmt.Printf("k=:%v,v:=%v\n", key, get(key))
      wg.Done()
      }(i)
      }
      wg.Wait()
      }
  • 当并发多了之后,会报fatal error: concurrent map writes 错误

  • 需要为 map 加锁来保证并发的安全性,Go 的sync 包中提供了一个开箱即用的并发安全版 map sync.Map

    • 开箱即用 不需要像内置 map 一样使用make初始化才能使用
    • sync.Map 内置了Store, Load, LoadOrStore, Delete, Range 等方法
  • ```go
    var m = sync.Map{}

    func main() {

    wg := sync.WaitGroup{}
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            m.Store(key, n)
            value, _ := m.Load(key)
            fmt.Printf("k=:%v,v:=%v\n", key, value)
            wg.Done()
        }(i)
    }
    wg.Wait()
    

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105



    # 原子操作

    * 加锁操作:涉及内核态的上下文切换,会比较耗时、代价比较高
    * 针对**基本数据类型**,使用**原子操作**保证并发安全
    * 原子操作 是Go提供的方法,在用户态完成,性能比加锁更好
    * Go 中原子操作由内置标准库`sync/atomic` 提供

    ## atomic包

    * **方法:**
    * 读取操作:`LoadInt`
    * 写入操作: `StoreInt`
    * 修改操作 : `AddInt`
    * 交换操作 : `SwapInt`
    * 比较并交换操作 : `CompareAndSwapPointer`

    ## 示例

    * 比较互斥锁和原子操作的性能

    * ```go
    package main

    import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
    )

    type Counter interface {
    Inc()
    Load() int64
    }

    // 普通版
    type CommonCounter struct {
    counter int64
    }

    func (c CommonCounter) Inc() {
    c.counter++
    }

    func (c CommonCounter) Load() int64 {
    return c.counter
    }

    // 互斥锁版
    type MutexCounter struct {
    counter int64
    lock sync.Mutex
    }

    func (m *MutexCounter) Inc() {
    m.lock.Lock()
    defer m.lock.Unlock()
    m.counter++
    }

    func (m *MutexCounter) Load() int64 {
    m.lock.Lock()
    defer m.lock.Unlock()
    return m.counter
    }

    // 原子操作版
    type AtomicCounter struct {
    counter int64
    }

    func (a *AtomicCounter) Inc() {
    atomic.AddInt64(&a.counter, 1)
    }

    func (a *AtomicCounter) Load() int64 {
    return atomic.LoadInt64(&a.counter)
    }

    func test(c Counter) {
    var wg sync.WaitGroup
    start := time.Now()
    for i := 0; i < 1000; i++ {
    wg.Add(1)
    go func() {
    c.Inc()
    wg.Done()
    }()
    }
    wg.Wait()
    end := time.Now()
    fmt.Println(c.Load(), end.Sub(start))
    }

    func main() {
    c1 := CommonCounter{} // 非并发安全
    test(c1)
    c2 := MutexCounter{} // 使用互斥锁实现并发安全
    test(&c2)
    c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高
    test(&c3)
    }
  • atomic 包提供了底层的原子级内存操作,对于同步算法的实现很有用

练习题

  1. 使用

    1
    goroutine

    1
    channel

    实现一个计算int64随机数各位数和的程序。

    1. 开启一个goroutine循环生成int64类型的随机数,发送到jobChan
    2. 开启24个goroutinejobChan中取出随机数计算各位数的和,将结果发送到resultChan
    3. goroutineresultChan取出结果并打印到终端输出
  2. 为了保证业务代码的执行性能将之前写的日志库改写为异步记录日志方式。