go 语言怎么支持的并发请求
Go
中有 goroutine,所以可以采用多协程来解决并发问题。Accept 连接后,将连接丢给 goroutine 处理后续的读写操作。在开发者看到的这个 goroutine 中业务逻辑是同步的,也不用考虑 IO 是否阻塞。
Golang 的协程通信有哪些方式
1)共享内存
- 共享内存是指多个协程直接访问共享变量的方式,这种方式不需要显式地进行通信,但需要考虑并发访问时的竞态问题,需要使用互斥锁等机制来确保同步和一致性。
2)通道
- 通道是 Go 语言中一个重要的并发原语,它是一种线程安全的、带缓冲的 FIFO 队列。通道支持阻塞式读写,可以用来在不同的协程之间传递数据,也可以用来进行同步操作。通道在多个协程之间传递数据时,会自动进行同步,不需要程序员显式地进行加锁和解锁操作。
3)选择器
- 选择器是 Go 语言中的一种控制结构,可以同时监听多个通道的操作,并选择其中一个可以进行操作的通道。选择器可以用来实现非阻塞的通信操作,避免了因等待某个通道操作而导致的阻塞。选择器通常与通道配合使用,用于多个协程之间的协作和同步。
4)条件变量(Cond)
- 条件变量用于在协程之间进行复杂的通信和协调。在 Go 中,可以使用
sync
包中的Cond
类型来实现条件变量。它通常与互斥锁一起使用,以便协程可以在特定条件下等待或被唤醒。
5)原子操作(Atomic Operations)
- Go 语言提供了
sync/atomic
包,用于执行原子操作,这些操作通常用于共享资源的更新,以避免竞态条件。原子操作可以用于对变量的读取、写入、加法等操作,而不需要额外的锁定。
总之,Go 协程之间的通信是非常重要的,不同的应用场景需要选择不同的通信方式,以确保程序的正确性和性能。共享内存通常用于需要高性能的并发场景,但需要注意线程安全和同步问题;通道是一种简单、安全、高效的通信方式,适用于大多数并发场景;选择器则适用于多通道协作和同步的场景。
Go 常用的并发模型?
并发模型说的是系统中的线程如何协作完成并发任务,不同的并发模型,线程以不同的方式进行通信和协作。
线程间通信方式
线程间通信方式有两种:共享内存和消息传递,无论是哪种通信模型,线程或者协程最终都会从内存中获取数据,所以更为准确的说法是直接共享内存、发送消息的方式来同步信息
共享内存
抽象层级:抽象层级低,当我们遇到对资源进行更细粒度的控制或者对性能有极高要求的场景才应该考虑抽象层级更低的方法
耦合:高,线程需要在读取或者写入数据时先获取保护该资源的互斥锁
线程竞争:需要加锁,才能避免线程竞争和数据冲突
发送消息
抽象层级:抽象层级高,提供了更良好的封装和与领域更相关和契合的设计,比如 Go 语言中的 Channel
就提供了 Goroutine 之间用于传递信息的方式,它在内部实现时就广泛用到了共享内存和锁,通过对两者进行的组合提供了更高级的同步机制
耦合:低,生产消费者模型
线程竞争:保证同一时间只有一个活跃的线程能够访问数据,channel 维护所有被该 chanel 阻塞的协程,保证有资源的时候只唤醒一个协程,从而避免竞争
Go 语言中实现了两种并发模型,一种是共享内存并发模型,另一种则是 CSP 模型。
共享内存并发模型
通过直接共享内存 + 锁的方式同步信息,传统多线程并发
CSP 并发模型
通过发送消息的方式来同步信息,Go 语言推荐使用的_通信顺序进程_(communicating sequential processes)并发模型,通过 goroutine 和 channel 来实现
goroutine
是 Go 语言中并发的执行单位,可以理解为”线程“channel
是 Go 语言中各个并发结构体 (goroutine
)之前的通信机制。通俗的讲,就是各个goroutine
之间通信的”管道“,类似于 Linux 中的管道
Go 为啥使用 CSP 模型来实现并发?
Go 语言使用 CSP(Communicating Sequential Processes,通信顺序进程)模型来实现并发,这是由 Go 语言设计者选择的一种并发模型,有以下几个重要的原因:
- 简单性和清晰性:CSP 模型提供了一种清晰且直观的方式来表达并发程序。它基于协程之间的通信来进行协作,通过通道(channel)进行消息传递,使得并发程序的结构和逻辑更加简单和可读。
- 避免共享状态:CSP 模型强调避免共享状态,而是通过通信共享数据。共享状态是许多并发程序中的错误和难点来源之一,而 CSP 模型可以减少竞态条件(race condition)等问题的出现。
- 安全性:Go 的 CSP 模型通过通道提供了一种安全的并发机制。通道的发送和接收操作都是原子的,不需要额外的锁定,因此减少了程序中出现的锁定问题,如死锁和竞态条件。
- 可扩展性:CSP 模型可以轻松扩展到大量的协程,因为通道和协程的创建成本相对较低。这使得 Go 非常适合构建高并发的系统,如 Web 服务器、分布式系统和网络服务。
- 编译器和运行时支持:Go 编译器和运行时系统针对 CSP 模型进行了优化。Go 的并发原语在语言级别得到支持,而不是通过库的方式实现,这使得并发编程更加容易。
总之,Go 选择 CSP 模型是为了提供一种简单、安全、高效和可扩展的并发编程模型,以便开发者能够更轻松地构建并发程序,同时避免共享状态和典型的并发问题。这使得 Go 成为了一个流行的选择,特别适用于需要高度并发性能的应用程序和系统。
有没有什么线程安全的办法?
在 Go 语言中,线程安全一般指协程安全,因为 Go 一般使用协程进行调度;而 Go 中为了保证其协程安全,有以下几种机制:
1、互斥锁:在 Go 的标准库中有 sync 包,sync. Mutex 就是解决并发冲突导致的安全性问题的一种方式。
2、读写锁:是在互斥锁上的进一步升级版本,主要为了解决并发多写少读、少写多读两种高并发的情况
3、如果不是需要强制使用同一个对象,那么也可以采用创建对象副本的方式,每个协程独占一个对象,相互之间不关联,但是这显然不符合我们的要求。
综上,使用互斥锁或者读写锁就能很好的解决问题。
select 可以用于什么
Go 的通道有两种操作方式,一种是带 range 子句的 for 语句,另一种则是 select 语句,它是专门为了操作通道而存在的。这里主要介绍 select 的用法。
Select 的语法如下:
select {
case <-ch1 :
statement(s)
case ch2 <- 1 :
statement(s)
…
default : /* 可选 */
statement(s)
}
这里要注意:
- 每个 case 都必须是一个通信。由于 select 语句是专为通道设计的,所以每个 case 表达式中都只能包含操作通道的表达式,比如接收表达式。
- 如果有多个 case 都可以运行,select 会随机公平地选出一个执行,其他不会执行。
- 如果多个 case 都不能运行,若有 default 子句,则执行该语句,反之,select 将阻塞,直到某个 case 可以运行。
- 所有 channel 表达式都会被求值。
- Select 机制⽤来处理异步 IO 问题。
- Select 机制最⼤的⼀条限制就是每个 case 语句⾥必须是⼀个 IO 操作。
实例
package main
import (
"fmt"
"math/rand"
)
func main() {
// 准备好几个通道。
intChannels := [5]chan int{
make(chan int, 1),
make(chan int, 1),
make(chan int, 1),
make(chan int, 1),
make(chan int, 1),
}
// 随机选择一个通道,并向它发送元素值。
index := rand.Intn(5)
fmt.Printf("The index: %d", index)
intChannels[index] <- index
// 哪一个通道中有可取的元素值,哪个对应的分支就会被执行。
select {
case <-intChannels[0]:
fmt.Println("The first candidate case is selected.")
case <-intChannels[1]:
fmt.Println("The second candidate case is selected.")
case elem := <-intChannels[2]:
fmt.Printf("The third candidate case is selected. The element is %d.", elem)
default:
fmt.Println("No candidate case is selected!")
}
}
select 死锁
Select 使用不当会发生死锁。如果通道没有数据发送,但 select 中有存在接收通道数据的语句,将发生死锁。
func main() {
ch := make(chan string)
select {
case <-ch:
}
}
/*
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
/workspace/src/test.go:5 +0x52
exit status 2
*/
//可以添加 default 语句来避免产生死锁。
空 select{}
对于空的 select 语句,程序会被阻塞,确切的说是当前协程被阻塞,同时 Go 自带死锁检测机制,当发现当前协程再也没有机会被唤醒时,则会发生 panic。所以上述程序会 panic。
func main() {
select {}
}
/*
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
main.main()
/workspace/src/test.go:3 +0x20
exit status 2
*/
select 和 for 结合使用
Select 语句只能对其中的每一个 case 表达式各求值一次。所以,如果想连续或定时地操作其中的通道的话,就需要通过在 for 语句中嵌入 select 语句的方式实现。
func main() {
tick := time.Tick(time.Second)
for {
select {
case t := <-tick:
fmt.Println(t)
break
}
}
fmt.Println("end")
}
你会发现 break 只跳出了 select,无法跳出 for。解决办法有两种:
使用 goto 跳出循环
func main() {
tick := time.Tick(time.Second)
for {
select {
case t := <-tick:
fmt.Println(t)
//跳到指定位置
goto END
}
}
END:
fmt.Println("end")
}
使用标签
func main() {
tick := time.Tick(time.Second)
//这是标签
FOREND:
for {
select {
case t := <-tick:
fmt.Println(t)
//跳出FOREND标签
break ForEnd
}
}
END:
fmt.Println("end")
}
select 实现超时机制
主要使用的 time. After 实现超时控制。
func main() {
ch := make(chan int)
quit := make(chan bool)
go func() {
for {
select {
case num := <-ch: //如果有数据,下面打印。但是有可能ch一直没数据
fmt.Println("num = ", num)
case <-time.After(3 * time.Second): //上面的ch如果一直没数据会阻塞,那么select也会检测其他case条件,检测到后3秒超时
fmt.Println("超时")
quit <- true //写入
}
}
}()
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(time.Second)
}
<-quit //这里暂时阻塞,直到可读
fmt.Println("程序结束")
}
执行后,可以观察到:依次打印出 0-4,几秒过后打印出“超时”和“程序结束”,打印结果如下:
num = 0
num = 1
num = 2
num = 3
num = 4
超时
程序结束
Select 底层原理
select 的底层原理:Go select使用与底层原理
- 每一个 case 对应的 channl 都会被封装到一个结构体中;
- 当第一次执行到 select 时,会锁住所有的 channl 并且,打乱 case 结构体的顺序;
- 按照打乱的顺序遍历,如果有就绪的信号,就直接走对应 case 的代码段,之后跳出 select;
- 如果没有就绪的代码段,但是有 default 字段,那就走 default 的代码段,之后跳出 select;
- 如果没有 default,那就将当前 goroutine 加入所有 channl 的对应等待队列;
- 当某一个等待队列就绪时,再次锁住所有的 channl,遍历一遍,将所有等待队列中的 goroutine 取出,之后执行就绪的代码段,跳出select。
数据结构
每一个 case 对应的数据结构如下:
type scase struct {
c *hchan // chan
elem unsafe.Pointer // 读或者写的缓冲区地址
kind uint16 //case语句的类型,是default、传值写数据(channel <-) 还是 取值读数据(<- channel)
pc uintptr // race pc (for race detector / msan)
releasetime int64
}
Go 有哪些并发同步原语?
Go 是一门以并发编程见长的语言,它提供了一系列的同步原语方便开发者使用
原子操作
Mutex、RWMutex 等并发原语的底层实现是通过 atomic 包中的一些原子操作来实现的,原子操作是最基础的并发原语. Go atomic 包是最轻量级的锁(也称无锁结构),可以在不形成临界区和创建互斥量的情况下完成并发安全的值替换操作,不过这个包只支持 int 32/int 64/uint 32/uint 64/uintptr 这几种数据类型的一些基础操作(增减、交换、载入、存储等)
概念
原子操作仅会由一个独立的 CPU 指令代表和完成。原子操作是无锁的,常常直接通过 CPU 指令直接实现。事实上,其它同步技术的实现常常依赖于原子操作。
使用场景
当我们想要对某个变量并发安全的修改,除了使用官方提供的 mutex
,还可以使用 sync/atomic 包的原子操作,它能够保证对变量的读取或修改期间不被其他的协程所影响。
Atomic 包提供的原子操作能够确保任一时刻只有一个 goroutine 对变量进行操作,善用 atomic 能够避免程序中出现大量的锁操作。
package main
import (
"fmt"
"sync/atomic"
)
var opts int64 = 0
func main() {
add(&opts, 3)
load(&opts)
compareAndSwap(&opts, 3, 4)
swap(&opts, 5)
store(&opts, 6)
}
func add(addr *int64, delta int64) {
atomic.AddInt64(addr, delta) //加操作
fmt.Println("add opts: ", *addr)
}
func load(addr *int64) {
fmt.Println("load opts: ", atomic.LoadInt64(&opts))
}
func compareAndSwap(addr *int64, oldValue int64, newValue int64) {
if atomic.CompareAndSwapInt64(addr, oldValue, newValue) {
fmt.Println("cas opts: ", *addr)
return
}
}
func swap(addr *int64, newValue int64) {
atomic.SwapInt64(addr, newValue)
fmt.Println("swap opts: ", *addr)
}
func store(addr *int64, newValue int64) {
atomic.StoreInt64(addr, newValue)
fmt.Println("store opts: ", *addr)
}
常见操作
- 增减 Add
- 载入 Load
- 比较并交换 CompareAndSwap
- 交换 Swap
- 存储 Store
Atomic 操作的对象是一个地址,你需要把可寻址的变量的地址作为参数传递给方法,而不是把变量的值传递给方法
下面将分别介绍这些操作:
增减操作
此类操作的前缀为 Add
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
需要注意的是,第一个参数必须是指针类型的值,通过指针变量可以获取被操作数在内存中的地址,从而施加特殊的 CPU 指令,确保同一时间只有一个 goroutine 能够进行操作。
使用举例:
func add(addr *int64, delta int64) {
atomic.AddInt64(addr, delta) //加操作
fmt.Println("add opts: ", *addr)
}
载入操作
此类操作的前缀为 Load
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
// 特殊类型: Value类型,常用于配置变更
func (v *Value) Load() (x interface{}) {}
载入操作能够保证原子的读变量的值,当读取的时候,任何其他 CPU 操作都无法对该变量进行读写,其实现机制受到底层硬件的支持。
使用示例:
func load(addr *int64) {
fmt.Println("load opts: ", atomic.LoadInt64(&opts))
}
比较并交换
此类操作的前缀为 CompareAndSwap
, 该操作简称 CAS,可以用来实现乐观锁
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
该操作在进行交换前首先确保变量的值未被更改,即仍然保持参数 old
所记录的值,满足此前提下才进行交换操作。CAS 的做法类似操作数据库时常见的乐观锁机制。
需要注意的是,当有大量的 goroutine 对变量进行读写操作时,可能导致 CAS 操作无法成功,这时可以利用 for 循环多次尝试。
使用示例:
func compareAndSwap(addr *int64, oldValue int64, newValue int64) {
if atomic.CompareAndSwapInt64(addr, oldValue, newValue) {
fmt.Println("cas opts: ", *addr)
return
}
}
交换
此类操作的前缀为 Swap
:
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
相对于 CAS,明显此类操作更为暴力直接,并不管变量的旧值是否被改变,直接赋予新值然后返回背替换的值。
func swap(addr *int64, newValue int64) {
atomic.SwapInt64(addr, newValue)
fmt.Println("swap opts: ", *addr)
}
存储
此类操作的前缀为 Store
:
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
// 特殊类型: Value类型,常用于配置变更
func (v *Value) Store(x interface{})
此类操作确保了写变量的原子性,避免其他操作读到了修改变量过程中的脏数据。
func store(addr *int64, newValue int64) {
atomic.StoreInt64(addr, newValue)
fmt.Println("store opts: ", *addr)
}
Go 原子操作和锁的区别?
- 原子操作由底层硬件支持,而锁是基于原子操作+信号量完成的。若实现相同的功能,前者通常会更有效率
- 原子操作是单个指令的互斥操作;互斥锁/读写锁是一种数据结构,可以完成临界区(多个指令)的互斥操作,扩大原子操作的范围
- 原子操作是无锁操作,属于乐观锁;说起锁的时候,一般属于悲观锁
- 原子操作存在于各个指令/语言层级,比如“机器指令层级的原子操作”,“汇编指令层级的原子操作”,“Go 语言层级的原子操作”等。
- 锁也存在于各个指令/语言层级中,比如“机器指令层级的锁”,“汇编指令层级的锁”,“Go 语言层级的锁”等
Channel
channel
管道,高级同步原语,goroutine 之间通信的桥梁
使用场景:消息队列、数据传递、信号通知、任务编排、锁
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan struct{}, 1)
for i := 0; i < 10; i++ {
go func() {
c <- struct{}{}
time.Sleep(1 * time.Second)
fmt.Println("通过ch访问临界区")
<-c
}()
}
for {
}
}
基本并发原语
Go 语言在 sync
包中提供了用于同步的一些基本原语,这些基本原语提供了较为基础的同步功能,但是它们是一种相对原始的同步机制,在多数情况下,我们都应该使用抽象层级更高的 Channel 实现同步。
常见的并发原语如下:sync.Mutex
、sync.RWMutex
、sync.WaitGroup
、sync.Cond
、sync.Once
、sync.Pool
、sync.Context
sync. Mutex
sync.Mutex
(互斥锁) 可以限制对临界资源的访问,保证只有一个 goroutine 访问共享资源
使用场景:大量读写,比如多个 goroutine 并发更新同一个资源,像计数器
package main
import (
"fmt"
"sync"
)
func main() {
// 封装好的计数器
var counter Counter
var wg sync.WaitGroup
var gNum = 1000
wg.Add(gNum)
// 启动10个goroutine
for i := 0; i < gNum; i++ {
go func() {
defer wg.Done()
counter.Incr() // 受到锁保护的方法
}()
}
wg.Wait()
fmt.Println(counter.Count())
}
// 线程安全的计数器类型
type Counter struct {
mu sync.Mutex
count uint64
}
// 加1的方法,内部使用互斥锁保护
func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
// 得到计数器的值,也需要锁保护
func (c *Counter) Count() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
sync. RWMutex
sync.RWMutex
(读写锁) 可以限制对临界资源的访问,保证只有一个 goroutine 写共享资源,可以有多个 goroutine 读共享资源
使用场景:大量并发读,少量并发写,有强烈的性能要求
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// 封装好的计数器
var counter Counter
var gNum = 1000
// 启动10个goroutine
for i := 0; i < gNum; i++ {
go func() {
counter.Count() // 受到锁保护的方法
}()
}
for { // 一个writer
counter.Incr() // 计数器写操作
fmt.Println("incr")
time.Sleep(time.Second)
}
}
// 线程安全的计数器类型
type Counter struct {
mu sync.RWMutex
count uint64
}
// 加1的方法,内部使用互斥锁保护
func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
// 得到计数器的值,也需要锁保护
func (c *Counter) Count() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.count
}
sync. WaitGroup
sync.WaitGroup
可以等待一组 Goroutine 的返回
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
底层数据结构
其中 noCopy
是 golang 源码中检测禁止拷贝的技术。如果程序中有 WaitGroup 的赋值行为,使用 go vet
检查程序时,就会发现有报错。但需要注意的是,noCopy 不会影响程序正常的编译和运行。
state 1
主要是存储着状态和信号量,状态维护了 2 个计数器,一个是请求计数器 counter ,另外一个是等待计数器 waiter(已调用 WaitGroup. Wait
的 goroutine 的个数)
当数组的首地址是处于一个 8
字节对齐的位置上时,那么就将这个数组的前 8
个字节作为 64
位值使用表示状态,后 4
个字节作为 32
位值表示信号量 (semaphore
);同理如果首地址没有处于 8
字节对齐的位置上时,那么就将前 4
个字节作为 semaphore
,后 8
个字节作为 64
位数值。
使用场景
并发等待,任务编排,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求
使用方法
在 WaitGroup 里主要有 3 个方法:
WaitGroup.Add ()
:可以添加或减少请求的 goroutine 数量,Add (n)
将会导致counter += n
WaitGroup.Done ()
:相当于 Add (-1),Done ()
将导致counter -=1
,请求计数器 counter 为 0 时通过信号量调用runtime_Semrelease
唤醒 waiter 线程WaitGroup.Wait ()
:会将waiter++
,同时通过信号量调用runtime_Semacquire (semap)
阻塞当前 goroutine
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
println("hello")
}()
}
wg.Wait()
}
WaitGroup 的坑
- Add 一个负数
如果计数器的值小于 0 会直接 panic
- Add 在 Wait 之后调用
比如一些子协程开头调用 Add 结束调用 Wait,这些 Wait 无法阻塞子协程。正确做法是在开启子协程之前先 Add 特定的值。
- 未置为 0 就重用
WaitGroup 可以完成一次编排任务,计数值降为 0 后可以继续被其他任务所用,但是不要在还没使用完的时候就用于其他任务,这样由于带着计数值,很可能出问题。
- 复制 waitgroup
WaitGroup 有 nocopy 字段,不能被复制。也意味着 WaitGroup 不能作为函数的参数。
深入理解 sync. Waitgroup
https://juejin.cn/post/7181812988461252667
WaitGroup
内部通过一个计数器来统计有多少协程被等待。
- 这个计数器的值在我们启动 goroutine 之前先写入(使用
Add
方法), - 然后在 goroutine 结束的时候,将这个计数器减 1(使用
Done
方法)。 - 除此之外,在启动这些 goroutine 的协程中,会调用
Wait
来进行等待,在Wait
调用的地方会阻塞,直到WaitGroup
内部的计数器减到 0。 也就实现了等待一组 goroutine 的目的
sync. Cond
sync.Cond
可以让一组的 Goroutine 都在满足特定条件时被唤醒, Go
标准库提供了 Cond
原语
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
底层数据结构
主要有 4
个字段:
nocopy
: golang 源码中检测禁止拷贝的技术。如果程序中有 WaitGroup 的赋值行为,使用go vet
检查程序时,就会发现有报错,但需要注意的是,noCopy 不会影响程序正常的编译和运行checker
:用于禁止运行期间发生拷贝,双重检查 (Double check
)L
:可以传入一个读写锁或互斥锁,当修改条件或者调用Wait
方法时需要加锁notify
:通知链表,调用Wait ()
方法的Goroutine
会放到这个链表中,从这里获取需被唤醒的 Goroutine 列表
使用场景
利用等待 / 通知机制实现阻塞或者唤醒
使用方法
在 Cond 里主要有 3 个方法:
sync.NewCond (l Locker)
: 新建一个 sync. Cond 变量,注意该函数需要一个 Locker 作为必填参数,这是因为在cond.Wait ()
中底层会涉及到 Locker 的锁操作Cond.Wait ()
: 阻塞等待被唤醒,调用 Wait 函数前需要先加锁;并且由于 Wait 函数被唤醒时存在虚假唤醒等情况,导致唤醒后发现,条件依旧不成立,因此需要使用 for 语句来循环地进行等待,直到条件成立为止Cond.Signal ()
: 只唤醒一个最先 Wait 的 goroutine,可以不用加锁Cond.Broadcast ()
: 唤醒所有 Wait 的 goroutine,可以不用加锁
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var status int64
func TestCond(t *testing.T) {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1 * time.Second)
go broadcast(c)
time.Sleep(1 * time.Second)
}
func broadcast(c *sync.Cond) {
// 原子操作
atomic.StoreInt64(&status, 1)
c.Broadcast()
}
func listen(c *sync.Cond) {
c.L.Lock()
fmt.Println("wait")
c.Wait()
// Wait 内部会先调用 c.L.Unlock(),来先释放锁,如果调用方不先加锁的话,会报错
fmt.Println("listen")
c.L.Unlock()
}
sync. Once
什么是 sync. Once
Once 可以用来执行且仅仅执行一次动作,常常用于单例对象的初始化场景。
Once 常常用来初始化单例资源,或者并发访问只需初始化⼀次的共享资源,或者在测试的时候初始化⼀次测试资源。
源码
type Once struct {
m Mutex
done uint32
}
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
sync.Once
可以保证在 Go 程序运行期间的某段代码只会执行一次
使用场景:常常用于单例对象的初始化场景
package main
import (
"fmt"
"sync"
)
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("only once")
})
}
}
sync. Pool
对于很多需要重复分配、回收内存的地方,sync. Pool 是一个很好的选择。频繁地分配、回收内存会给 GC 带来一定的负担,严重的时候会引起 CPU 的毛刺,而sync. Pool 可以将暂时不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存,减轻 GC 的压力,提升系统的性能。
sync.Pool
是 sync 包下的一个组件,可以作为保存临时取还对象的一个“池子”。个人觉得它的名字有一定的误导性,因为 Pool 里装的对象可以被无通知地被回收,可能 sync.Cache
是一个更合适的名字。
使用场景
对于很多需要重复分配、回收内存的地方,sync.Pool
是一个很好的选择。频繁地分配、回收内存会给 GC 带来一定的负担,严重的时候会引起 CPU 的毛刺,而 sync.Pool
可以将暂时不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存,减轻 GC 的压力,提升系统的性能。
对象池化, TCP 连接池、数据库连接池、Worker Pool
使用方法
首先,sync.Pool
是协程安全的,这对于使用者来说是极其方便的。使用前,设置好对象的 New
函数,用于在 Pool
里没有缓存的对象时,创建一个。之后,在程序的任何地方、任何时候仅通过 Get()
、Put()
方法就可以取、还对象了。
首先来看一个简单的例子:
运行结果:
sync. Map
sync.Map
线程安全的 map
使用场景:map 并发读写
package main
import (
"fmt"
"sync"
)
func main() {
var scene sync.Map
// 将键值对保存到sync.Map
scene.Store("1", 1)
scene.Store("2", 2)
scene.Store("3", 3)
// 从sync.Map中根据键取值
fmt.Println(scene.Load("1"))
// 根据键删除对应的键值对
scene.Delete("1")
// 遍历所有sync.Map中的键值对
scene.Range(func(k, v interface{}) bool {
fmt.Println("iterate:", k, v)
return true
})
}
sync. Context
sync.Context
可以进行上下文信息传递、提供超时和取消机制、控制子 goroutine 的执行
使用场景:取消一个 goroutine 的执行
package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer func() {
fmt.Println("goroutine exit")
}()
for {
select {
case <-ctx.Done():
fmt.Println("receive cancel signal!")
return
default:
fmt.Println("default")
time.Sleep(time.Second)
}
}
}()
time.Sleep(time.Second)
cancel()
time.Sleep(2 * time.Second)
}
扩展并发原语
ErrGroup
errgroup
可以在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能
如果协程中 panic 依然会
使用场景:只要一个 goroutine 出错我们就不再等其他 goroutine 了,减少资源浪费,并且返回错误
package main
import (
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
func main() {
var g errgroup.Group
var urls = []string{
"http://www.baidu.com/",
"https://www.sina.com.cn/",
}
for i := range urls {
url := urls[i]
g.Go(func() error {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
err := g.Wait()
if err == nil {
fmt.Println("Successfully fetched all URLs.")
} else {
fmt.Println("fetched error:", err.Error())
}
}
Semaphore
Semaphore
带权重的信号量,控制多个 goroutine 同时访问资源
使用场景:控制 goroutine 的阻塞和唤醒
package main
import (
"context"
"fmt"
"log"
"runtime"
"time"
"golang.org/x/sync/semaphore"
)
var (
maxWorkers = runtime.GOMAXPROCS(0)
sema = semaphore.NewWeighted(int64(maxWorkers)) //信号量
task = make([]int, maxWorkers*4)
// 任务数,是worker的四
)
func main() {
ctx := context.Background()
for i := range task {
// 如果没有worker可用,会阻塞在这里,直到某个worker被释放
if err := sema.Acquire(ctx, 1); err != nil {
break
}
// 启动worker goroutine
go func(i int) {
defer sema.Release(1)
time.Sleep(100 * time.Millisecond) // 模拟一个耗时操作
task[i] = i + 1
}(i)
}
// 请求所有的worker,这样能确保前面的worker都执行完
if err := sema.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("获取所有的worker失败: %v", err)
}
fmt.Println(maxWorkers, task)
}
SingleFlight
用于抑制对下游的重复请求
使用场景:访问缓存、数据库等场景,缓存过期时只有一个请求去更新数据库
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"golang.org/x/sync/singleflight"
)
// 模拟从数据库读取
func getArticle(id int) (article string, err error) {
// 假设这里会对数据库进行调用, 模拟不同并发下耗时不同
atomic.AddInt32(&count, 1)
time.Sleep(time.Duration(count) * time.Millisecond)
return fmt.Sprintf("article: %d", id), nil
}
// 模拟优先读缓存,缓存不存在读取数据库,并且只有一个请求读取数据库,其它请求等待
func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) {
v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
return getArticle(id)
})
return v.(string), err
}
var count int32
func main() {
time.AfterFunc(1*time.Second, func() {
atomic.AddInt32(&count, -count)
})
var (
wg sync.WaitGroup
now = time.Now()
n = 1000
sg = &singleflight.Group{}
)
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
res, _ := singleflightGetArticle(sg, 1)
// res, _ := getArticle(1)
if res != "article: 1" {
panic("err")
}
wg.Done()
}()
}
wg.Wait()
fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
}
Go 有哪些方式安全读写共享变量?
方法 | 并发原语 | 备注 |
---|---|---|
不要修改变量 | sync. Once | 不要去写变量,变量只初始化一次 |
只允许一个 goroutine 访问变量 | Channel | 不要通过共享变量来通信,通过通信 (channel)来共享变量 |
允许多个 goroutine 访问变量,但是同一时间只允许一个 goroutine 访问 | sync. Mutex、sync. RWMutex、原子操作 | 实现锁机制,同时只有一个线程能拿到 |
Go 如何排查数据竞争问题?
概念
只要有两个以上的 goroutine 并发访问同一变量,且至少其中的一个是写操作的时候就会发生数据竞争;全是读的情况下是不存在数据竞争的。
排查方式
package main
import "fmt"
func main() {
i := 0
go func() {
i++ // write i
}()
fmt.Println(i) // read i
}
go 命令行
有个参数race
可以帮助检测代码中的数据竞争
$ go run -race main.go
WARNING: DATA RACE
Write at 0x00c0000ba008 by goroutine 7:
exit status 66
Go 语言怎么做的连接复用
Go 的 netpoll 是怎么实现的像阻塞 read 一样去使用底层的非阻塞 read
Go 语言中 IO 多路复用使用 netpoll 模型 Netpoll 本质上是对 I/O 多路复用技术的封装,所以自然也是和 epoll 一样脱离不了下面几步:
- Netpoll 创建及其初始化;
- 向 netpoll 中加入待监控的任务;
- 从 netpoll 获取触发的事件; 在 go 中对 epoll 提供的三个函数进行了封装
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(delay int64) gList
Netpollinit 函数负责初始化 netpoll; Netpollopen 负责监听文件描述符上的事件; Netpoll 会阻塞等待返回一组已经准备就绪的 Goroutine;
Data Race 问题怎么解决?能不能不加锁解决这个问题?
runtime 提供常见的方法
-
Gosched ():让当前线程让出 cpu 以让其它线程运行,它不会挂起当前线程,因此当前线程未来会继续执行。
-
NumCPU ():返回当前系统的 CPU 核数量。
-
GOMAXPROCS ():设置最大的可同时使用的 CPU 核数。
- 通过 runtime. GOMAXPROCS 函数,应用程序可以设置运行时系统中的 P 最大数量。注意,如果在运行期间设置该值的话,会引起“Stop the World”。所以,应在应用程序最早期调用,并且最好是在运行 Go 程序之前设置好操作程序的环境变量 GOMAXPROCS,而不是在程序中调用 runtime. GOMAXPROCS 不能作为函数的参数。
- 无论我们传递给函数的整数值是什么值,运行时系统的 P 最大值总会在 1~256 之间。
- Go 1.8 后,默认让程序运行在多个核上,可以不用设置了。
- Go 1.8 前,还是要设置一下,可以更高效的利用 cpu。
-
Goexit ():退出当前 goroutine(但是 defer 语句会照常执行)。
-
NumGoroutine:返回正在执行和排队的任务总数。
- Runtime. NumGoroutine 函数在被调用后,会返回系统中的处于特定状态的 Goroutine 的数量。这里的特定状态是指 GrunnableGruningGsyscallGwaition。处于这些状态的 Goroutine 即被看做是活跃的或者说正在被调度。
- 注意:垃圾回收所在 Goroutine 的状态也处于这个范围内的话,也会被纳入该计数器。
-
GOOS:查看目标操作系统。很多时候,我们会根据平台的不同实现不同的操作,就可以用 GOOS 来查看自己所在的操作系统。
-
runtime. GC:会让运行时系统进行一次强制性的垃圾收集。
强制的垃圾回收:不管怎样,都要进行的垃圾回收。非强制的垃圾回收:只会在一定条件下进行的垃圾回收(即运行时,系统自上次垃圾回收之后新申请的堆内存的单元(也成为单元增量)达到指定的数值)。 -
GOROOT ():获取 goroot 目录。
-
runtime. LockOSThread 和 runtime. UnlockOSThread 函数:前者调用会使调用他的 Goroutine 与当前运行它的 M 锁定到一起,后者调用会解除这样的锁定。