searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

go routine

2023-12-19 02:10:21
10
0

操作系统用于在物理处理器(cpu)上调度线程的运行,而协程的调度则是一个逻辑上的概念,在go 1.5的版本,会为每个线程都分配一个逻辑处理器,用于执行创建的go routine。

go routine

go语言通过go关键字来让一段逻辑在一个routine上运行,这段逻辑可以是个匿名函数,可以是lambda表达式,也可以是一个普通的函数:

go worker(...) // worker 是个函数,这样就使得worker非阻塞的开始并发运行

并发的多个方法可能需要等其结束,类似线程的join,于是go语言在sync包中提供了一个名为WaitGroup的类型,该类型实际是一个计数信号量,在go routine结束时,调用Done方法将信号量减一,WaitGroup通过wait方法阻塞,当WaitGroup减为0后,阻塞解除。

import (
    "sync"  // WaitGroup 包含在此包内
    "runtime"  // 用于设置并行执行的逻辑处理器数
)

func main(){
    runtime.GOMAXPROCS(1)  // 说明此程序在一个逻辑处理器上运行
    
    var wg sync.WaitGroup
    wg.add(2)  // 说明将有两个go routine运行
    
    go func() { // 一个匿名的函数
        defer wg.Done // defer 用于让代码在函数结束后运行
        ..... // 做一些工作
    }
    
    go func() { // 另一个匿名函数
        defer wg.Done //
        ...... // 做一些工作
    }
    
    wg.Wait() // 等待这俩协程的结束
}

竞争

多个go routine并发运行,如果有使用共享的资源,一样会面临和线程同步一样的竞争问题,因此,go也提供了原子操作,锁等方法去解决竞争的问题,此外,还可以在编译时通过下面的命令检查程序中是否有竞争:

go build -race
原子操作

原子操作的方法位于:

import ( "sync/atomic")

其中包含若干原子方法,包括原子的加法,原子的读写:

var counter int64
atomic.AddInt64(&counter, 1) // 加法

atomic.LoadIn64(&counter)  // 原子读
atomic.StoreInt64(&counter, 1)  // 原子写
sync.Mutex

锁是位于sync包内的Mutex类型,提供加锁和解锁操作,用来限制一块临界区的访问:

var mutex sync.Mutex
mutex.Lock(){
    .....
}
mutex.Unlock()
channel

这样的方式,就会退化成传统的多线程程序并发的样子了,而go更常见的是使用通道来传递数据,而非共享数据,和Scala的actor一样一样的。go语言通过make这个内建函数来创建一个通道,通道可以是有缓冲或无缓冲的,在声明通道时,需要指定其传递的数据的类型:

unbuffered := make(chan int) // 无缓冲,参数为通道数据类型

buffered := make(chan string, 10) // 有缓冲的,第二个参数指定缓冲长度

buffered <- "daren" // 通过 <- 操作符来存取数据,这里将数据写入buffered这个通道

value := <- buffered // 将数据从通道读取,并以此数据构建一个对象

无缓冲的通道只有在双方都准备好,才交换数据,因此一定是同时交换数据,而有缓冲则没有这种保证。


并发模式

runner

runner相当于是一个任务的执行器,将任务交给runner后,runner将在go routine内并发的执行任务。且需要考虑执行任务的超时情况,以及收到系统信号的情况。因此,需要有以下几个对外接口:

  • 添加任务到runner,runner应该持有一个任务的队列,在go里面可以使用切片
  • 执行任务,通常是在一个循环中执行所有任务,当任务完成时,发送消息告知完成
  • 启动go routine执行任务并等待其超时或者结束
type Runner struct{
    interrupt chan os.Signal  // 一个接收os.Signal消息的通道,用于接收系统中断信号
    complete chan error // 接收任务已完成消息
    timeout chan time.Time // 报告任务超时
    tasks []func(int)  // 待执行的任务队列
}

Runner的构造函数需要返回一个新的Runner的指针:

func New(d time.Duration) *Runner{
    return &Runner{
        interrupt: make(chan os.Signal, 1),
        complete: make(chan error),  // 无缓冲,也就是让等待阻塞
        timeout: time.After(d),
    }
}

下面是执行任务的方法:

func(r *Runner) Start() error{ // 相当于是Runner的成员函数,返回一个error
    signal.Notify(r.interrupt, os.Interrupt) // 将关注os.Interrupt,并转给r.interrupt这个通道
    go func() {
       r.complete <- r.run() // 在一个routine里执行方法,返回值发给通道complete 
    }()
    
    select{ //select 用于关注所有的通道是否有消息
        case err := <-r.complete: // complete通道有消息
            return err 
        case timeout :<-r.timeout:
            return ErrTimeout // timeout通道的消息
    }   
}

在具体的run方法中,用一个循环,顺序的执行切片内所有的任务,在每个任务之前,检查interrupt通道是否有消息。

0条评论
作者已关闭评论
l****n
2文章数
0粉丝数
l****n
2 文章 | 0 粉丝
l****n
2文章数
0粉丝数
l****n
2 文章 | 0 粉丝
原创

go routine

2023-12-19 02:10:21
10
0

操作系统用于在物理处理器(cpu)上调度线程的运行,而协程的调度则是一个逻辑上的概念,在go 1.5的版本,会为每个线程都分配一个逻辑处理器,用于执行创建的go routine。

go routine

go语言通过go关键字来让一段逻辑在一个routine上运行,这段逻辑可以是个匿名函数,可以是lambda表达式,也可以是一个普通的函数:

go worker(...) // worker 是个函数,这样就使得worker非阻塞的开始并发运行

并发的多个方法可能需要等其结束,类似线程的join,于是go语言在sync包中提供了一个名为WaitGroup的类型,该类型实际是一个计数信号量,在go routine结束时,调用Done方法将信号量减一,WaitGroup通过wait方法阻塞,当WaitGroup减为0后,阻塞解除。

import (
    "sync"  // WaitGroup 包含在此包内
    "runtime"  // 用于设置并行执行的逻辑处理器数
)

func main(){
    runtime.GOMAXPROCS(1)  // 说明此程序在一个逻辑处理器上运行
    
    var wg sync.WaitGroup
    wg.add(2)  // 说明将有两个go routine运行
    
    go func() { // 一个匿名的函数
        defer wg.Done // defer 用于让代码在函数结束后运行
        ..... // 做一些工作
    }
    
    go func() { // 另一个匿名函数
        defer wg.Done //
        ...... // 做一些工作
    }
    
    wg.Wait() // 等待这俩协程的结束
}

竞争

多个go routine并发运行,如果有使用共享的资源,一样会面临和线程同步一样的竞争问题,因此,go也提供了原子操作,锁等方法去解决竞争的问题,此外,还可以在编译时通过下面的命令检查程序中是否有竞争:

go build -race
原子操作

原子操作的方法位于:

import ( "sync/atomic")

其中包含若干原子方法,包括原子的加法,原子的读写:

var counter int64
atomic.AddInt64(&counter, 1) // 加法

atomic.LoadIn64(&counter)  // 原子读
atomic.StoreInt64(&counter, 1)  // 原子写
sync.Mutex

锁是位于sync包内的Mutex类型,提供加锁和解锁操作,用来限制一块临界区的访问:

var mutex sync.Mutex
mutex.Lock(){
    .....
}
mutex.Unlock()
channel

这样的方式,就会退化成传统的多线程程序并发的样子了,而go更常见的是使用通道来传递数据,而非共享数据,和Scala的actor一样一样的。go语言通过make这个内建函数来创建一个通道,通道可以是有缓冲或无缓冲的,在声明通道时,需要指定其传递的数据的类型:

unbuffered := make(chan int) // 无缓冲,参数为通道数据类型

buffered := make(chan string, 10) // 有缓冲的,第二个参数指定缓冲长度

buffered <- "daren" // 通过 <- 操作符来存取数据,这里将数据写入buffered这个通道

value := <- buffered // 将数据从通道读取,并以此数据构建一个对象

无缓冲的通道只有在双方都准备好,才交换数据,因此一定是同时交换数据,而有缓冲则没有这种保证。


并发模式

runner

runner相当于是一个任务的执行器,将任务交给runner后,runner将在go routine内并发的执行任务。且需要考虑执行任务的超时情况,以及收到系统信号的情况。因此,需要有以下几个对外接口:

  • 添加任务到runner,runner应该持有一个任务的队列,在go里面可以使用切片
  • 执行任务,通常是在一个循环中执行所有任务,当任务完成时,发送消息告知完成
  • 启动go routine执行任务并等待其超时或者结束
type Runner struct{
    interrupt chan os.Signal  // 一个接收os.Signal消息的通道,用于接收系统中断信号
    complete chan error // 接收任务已完成消息
    timeout chan time.Time // 报告任务超时
    tasks []func(int)  // 待执行的任务队列
}

Runner的构造函数需要返回一个新的Runner的指针:

func New(d time.Duration) *Runner{
    return &Runner{
        interrupt: make(chan os.Signal, 1),
        complete: make(chan error),  // 无缓冲,也就是让等待阻塞
        timeout: time.After(d),
    }
}

下面是执行任务的方法:

func(r *Runner) Start() error{ // 相当于是Runner的成员函数,返回一个error
    signal.Notify(r.interrupt, os.Interrupt) // 将关注os.Interrupt,并转给r.interrupt这个通道
    go func() {
       r.complete <- r.run() // 在一个routine里执行方法,返回值发给通道complete 
    }()
    
    select{ //select 用于关注所有的通道是否有消息
        case err := <-r.complete: // complete通道有消息
            return err 
        case timeout :<-r.timeout:
            return ErrTimeout // timeout通道的消息
    }   
}

在具体的run方法中,用一个循环,顺序的执行切片内所有的任务,在每个任务之前,检查interrupt通道是否有消息。

文章来自个人专栏
文章 | 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0