操作系统用于在物理处理器(cpu)上调度线程的运行,而协程的调度则是一个逻辑上的概念,在go 1.5的版本,会为每个线程都分配一个逻辑处理器,用于执行创建的go routine。
go routine
go语言通过go关键字来让一段逻辑在一个routine上运行,这段逻辑可以是个匿名函数,可以是lambda表达式,也可以是一个普通的函数:
go worker(...) // worker 是个函数,这样就使得worker非阻塞的开始并发运行
并发的多个方法可能需要等其结束,类似线程的join,于是go语言在sync包中提供了一个名为WaitGroup的类型,该类型实际是一个计数信号量,在go routine结束时,调用Done方法将信号量减一,WaitGroup通过wait方法阻塞,当WaitGroup减为0后,阻塞解除。
import (
"sync" // WaitGroup 包含在此包内
"runtime" // 用于设置并行执行的逻辑处理器数
)
func main(){
runtime.GOMAXPROCS(1) // 说明此程序在一个逻辑处理器上运行
var wg sync.WaitGroup
wg.add(2) // 说明将有两个go routine运行
go func() { // 一个匿名的函数
defer wg.Done // defer 用于让代码在函数结束后运行
..... // 做一些工作
}
go func() { // 另一个匿名函数
defer wg.Done //
...... // 做一些工作
}
wg.Wait() // 等待这俩协程的结束
}
竞争
多个go routine并发运行,如果有使用共享的资源,一样会面临和线程同步一样的竞争问题,因此,go也提供了原子操作,锁等方法去解决竞争的问题,此外,还可以在编译时通过下面的命令检查程序中是否有竞争:
go build -race
原子操作
原子操作的方法位于:
import ( "sync/atomic")
其中包含若干原子方法,包括原子的加法,原子的读写:
var counter int64
atomic.AddInt64(&counter, 1) // 加法
atomic.LoadIn64(&counter) // 原子读
atomic.StoreInt64(&counter, 1) // 原子写
锁
sync.Mutex
锁是位于sync包内的Mutex类型,提供加锁和解锁操作,用来限制一块临界区的访问:
var mutex sync.Mutex
mutex.Lock(){
.....
}
mutex.Unlock()
channel
这样的方式,就会退化成传统的多线程程序并发的样子了,而go更常见的是使用通道来传递数据,而非共享数据,和Scala的actor一样一样的。go语言通过make这个内建函数来创建一个通道,通道可以是有缓冲或无缓冲的,在声明通道时,需要指定其传递的数据的类型:
unbuffered := make(chan int) // 无缓冲,参数为通道数据类型
buffered := make(chan string, 10) // 有缓冲的,第二个参数指定缓冲长度
buffered <- "daren" // 通过 <- 操作符来存取数据,这里将数据写入buffered这个通道
value := <- buffered // 将数据从通道读取,并以此数据构建一个对象
无缓冲的通道只有在双方都准备好,才交换数据,因此一定是同时交换数据,而有缓冲则没有这种保证。
并发模式
runner
runner相当于是一个任务的执行器,将任务交给runner后,runner将在go routine内并发的执行任务。且需要考虑执行任务的超时情况,以及收到系统信号的情况。因此,需要有以下几个对外接口:
- 添加任务到runner,runner应该持有一个任务的队列,在go里面可以使用切片
- 执行任务,通常是在一个循环中执行所有任务,当任务完成时,发送消息告知完成
- 启动go routine执行任务并等待其超时或者结束
type Runner struct{
interrupt chan os.Signal // 一个接收os.Signal消息的通道,用于接收系统中断信号
complete chan error // 接收任务已完成消息
timeout chan time.Time // 报告任务超时
tasks []func(int) // 待执行的任务队列
}
Runner的构造函数需要返回一个新的Runner的指针:
func New(d time.Duration) *Runner{
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error), // 无缓冲,也就是让等待阻塞
timeout: time.After(d),
}
}
下面是执行任务的方法:
func(r *Runner) Start() error{ // 相当于是Runner的成员函数,返回一个error
signal.Notify(r.interrupt, os.Interrupt) // 将关注os.Interrupt,并转给r.interrupt这个通道
go func() {
r.complete <- r.run() // 在一个routine里执行方法,返回值发给通道complete
}()
select{ //select 用于关注所有的通道是否有消息
case err := <-r.complete: // complete通道有消息
return err
case timeout :<-r.timeout:
return ErrTimeout // timeout通道的消息
}
}
在具体的run方法中,用一个循环,顺序的执行切片内所有的任务,在每个任务之前,检查interrupt通道是否有消息。