Go资深工程师讲解(慕课) 004

004

goroutine

package main

import (
    "fmt"
    "time"
)

func main() {
    for i:=0;i<10;i++{
        go func(i int) {
            fmt.Printf("Hello from goroutine %d \n",i) // i/o操作可以交出控制权,手动操作交出控制权runtime.Gosched()
        }(i)
    }
    // main直接退出了,还没来得急执行协程
  time.Sleep(time.Millisecond)
}
  • 轻量级“线程”
  • 非抢占式多任务处理,由协程主动交出控制权(线程是抢占式任务处理)
  • 编译器/解释器/虚拟机层面的多任务
  • 多个协程可能在一个或多个线程上运行
  • 手动交出控制权 runtime.Gosched()

子程序是协程的一个特例,协程是双向的

线程与协程
goroutine 协程
谁和谁放在一个线程,这个不用管由调度器来管理

  • 在函数前加上 go 关键字,就能送给调度器运行
  • 不需要 在定义时区分是否是异步函数
  • 调度器在合适的点进行切换
  • 使用 race 来检测数据访问冲突

goroutine 可能的切换点

  • I/O,select
  • channel
  • 等待锁
  • 函数调用
  • runtime.Gosched() 手动

channel

goroutine 和 goroutine 之间可以开出很多个双向通道,它是 goroutine 和 goroutine 之间的交互。有发,就得有收,要不就会卡在那(deadlock)。

package main

import (
    "fmt"
    "time"
)

func worker(c chan int)  {
    for{
        n:=<-c
        fmt.Println(n)
    }
}

func chanDemo()  {
    c:=make(chan int) //var c chan int 这只是定义了一个c,这是一个chan类型内部值是int的变量,并没有生成一个chan 要生成,需要用make
    //用go关键词
    /*go func() {
        for{
            n:=<-c //收数据
            fmt.Println(n)
        }
    }()*/
    go worker(c)
    c<-1 //有发就得有收
    c<-2
    time.Sleep(time.Millisecond) //简单发一个sleep这样这两个值都打印出来,要不c还没打印,main函数已经结束,收的2还来不及打印
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %d\n",id,<-c)
    }
}

func chanDemo()  {
    c:=make(chan int)
    go worker(0,c)
    c<-1 //有发就得有收
    c<-2
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %c\n",id,<-c)
    }
}

func chanDemo()  {
    //c:=make(chan int)
    //go worker(0,c)
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=make(chan int)
        go worker(i,channels[i])
    }
    //c<-1 //有发就得有收
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

channel 做为返回值,返回一个 channel

package main

import (
    "fmt"
    "time"
)

//
func createWorker(id int) chan int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func chanDemo()  {
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=createWorker(i)
    }
    //c<-1 //有发就得有收
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

有时候我们要明确收发功能,所以我们可以在返回类型上加箭头,(可能收,可能发,注意位置)chan<-收数据 <-chan 发数据,所以func createWorker(id int) chan<- int {var channels [10]chan<- int 改成有箭头

bufferedChan

我们建立 channel 有发就得有收,要不程序就会 deadlock,如下

func bufferedChannel(){
    c:=make(chan int)
    c <-1
}
func main(){
    bufferedChannel() //程序挂掉,因为发了没人收
}

加入缓存区

func bufferedChannel(){
    c:=make(chan int,3)
    c <-1
    c <-2
    c <-3
    c <-4 //发4会报错,没人收,缓存大小是3
}
func main(){
    bufferedChannel() //程序挂掉,因为发了没人收
}
package main

import (
    "fmt"
    "time"
)

func createWorker(id int) chan<- int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func bufferedChannel(){
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    time.Sleep(time.Millisecond)
}

func main() {
    bufferedChannel()
}

channel 是可以 close 掉的,可以通知发完了(由发送方来完成 close)

func channelClose()  {
//    数据有明确的结尾的话,发送方可以通知close
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    close(c) //关闭 一旦close 接收方就会收到0值 空串或0
    time.Sleep(time.Millisecond) //收一毫秒的空值
}
// 那我们接收方也要做一个判断,如果有值就收没值就退出呗
func createWorker(id int) chan<- int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            n,ok:=<-c
            if !ok {
                break
            }
            fmt.Printf("Workder %d received %c\n",id,n)

        }
    }()
    return c
}
// 还有一种方法,我们可以通过range来判断是否结束了
func createWorker(id int) chan<- int {
    c := make(chan int)
    //真正的worker
    go func() {
        for n := range c {
            fmt.Printf("Workder %d received %c\n", id, n)
        }
    }()
    return c
}

channel buffered channel 发送方 close() 接收方两种判断结束的方式n,ok:=<-crange CSP 模型
不要能过共享内存来通信,通过通信来共享内存

等待任务

package main

import (
    "fmt"
)

//创建结构
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        done <- true //输出一个完成true
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        <-workers[i].done //读状态出来
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        <-workers[i].done // 这样写会按顺序执行,读个小写a 输出一个
    }
}

func main() {
    chanDemo()
}

希望是把所有任务都发出去后,再来等 20 done

package main

import (
    "fmt"
)

//创建结构
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        go func() {
            done <- true //输出一个完成true
        }()
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        //<-workers[i].done //读状态出来
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        //<-workers[i].done // 这样写会按顺序执行,读个小写a 输出一个
    }
     //等所有发送都完成后再收done的状态
    for _,workder:=range workers { //这样会出错,看小写的打印完,打写的没有打印,这是因为所有的channel发都是block的,小写的完了有10个done,没人收又发10个大写的,就循环等待了,有个简单快速的解决方法就是,将done<-true放到另goroutine中去做 ,或者将大小写分开,小写发完,done完再去搞大写
        <-workder.done
        <-workder.done
    }
}

func main() {
    chanDemo()
}

WaitGroup

我们再改写一下上个例子

package main

import (
    "fmt"
    "sync"
)

//创建结构
type workder struct {
    in chan int
    wg *sync.WaitGroup //共享一个
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        wg: wg,
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) //我们有20个任务就开20个
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

再抽象一下 将 wg 封闭到结构的函数中

package main

import (
    "fmt"
    "sync"
)

//创建结构
type workder struct {
    in chan int
    done func()
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        done: func() {
            wg.Done()
        },
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) //我们有20个任务就开20个
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

channel 来实现树的遍历

// main.go
func main(){
    c:=root.TraverseWithChannel()
    maxNode:=0
    for node:=range c{
        if node.Value>maxNode {
            maxNode = node.Value
        }
    }
    fmt.Println("Max node value:", maxNode)
}
// traverse.go
package tree

import "fmt"

//中序遍历 先左在中在右
func (node *Node) Traverse() {
    /*if node==nil {
        return
    }
    node.Left.Traverse()
    //只能作打印,
    node.Print()
    node.Right.Traverse()*/

    node.TraverseFunc(func(n *Node) {
        n.Print()
    })
    fmt.Println()
}

//改造一下
func (node *Node) TraverseFunc(f func(*Node)) {
    if node == nil {
        return
    }
    node.Left.TraverseFunc(f)
    f(node)
    node.Right.TraverseFunc(f)
}
// 返回一个channel
func (node *Node) TraverseWithChannel() chan *Node{
    out:=make(chan *Node)
    go func() {
        node.TraverseFunc(func(node *Node) {
            out <- node
        })
        close(out) //注意关闭
    }()
    return out
}

select 的使用

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func main() {
    //var c1, c2 chan int
    //    从c1或c2里面搜 谁快搜谁 ,实现非阻塞式的获取就用select
    var c1, c2 = generator(), generator()
    for {
        select {
        case n := <-c1:
            fmt.Println("Received from c1:", n)
        case n := <-c2:
            fmt.Println("Received from c2:", n)
        default:
            fmt.Println("No value received")

        }
    }

}
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            //随机sleep1500毫秒 使得c1,c2输出的速度不一样
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil 走的default
    //    从c1或c2里面搜 谁来的快从谁里取 ,实现非阻塞式的获取就用select
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    hasValue := false
    for {
        var activeWorker chan<- int
        if hasValue {
            activeWorker = workder
        }
        select {
        case n = <-c1:
            hasValue = true

        case n = <-c2:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }

    }

}

生成数据和消耗数据速度不一样,会导致有些数据丢失

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            //随机sleep1500毫秒 使得c1,c2输出的速度不一样
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        time.Sleep(2 * time.Second) //这样有些数据没打印出来 新收到的数据会把原来的数据冲掉,我们可以做个数据缓存(排队)然后去消耗它
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil 走的default
    //    从c1或c2里面搜 谁来的快从谁里取 ,实现非阻塞式的获取就用select
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    //hasValue := false
    var values []int //缓存数据
    //10秒退出
    tm := time.After(20 * time.Second)
    //每秒看一下队列的长度
    tick := time.Tick(time.Second)
    for {
        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = workder
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            //hasValue = true
            values = append(values, n)
        case n = <-c2:
            //hasValue = true
            values = append(values, n)
        case activeWorker <- activeValue: //activeValue有值才去送数据
            //hasValue = false
            values = values[1:]
        case <-tick:
            fmt.Println("queue len = ", len(values))
            //fmt.Printf("values= %v\n",values)
        case <-time.After(800 * time.Millisecond):
            //数据生的太慢 800毫秒没有数据提示超时
            fmt.Println("timeout")
        case <-tm:
            //10秒后退出,timer
            fmt.Println("bye")
            return

        }

    }

}

传统同步方式

互斥量的使用

package main

import (
    "fmt"
    "time"
)

type atomicInt int //线程安全的

func (a *atomicInt) increment()  {
    *a++
}

func (a *atomicInt) get() int {
    return int(*a)
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a)
}

我们可以在命令行下去查看运行结果,实际上会提示资源DATA Race

 go run --raise atomic.go
package main

import (
    "fmt"
    "sync"
    "time"
)

type atomicInt struct {
    value int
    lock sync.Mutex
} //线程安全的

func (a *atomicInt) increment()  {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.value++
}

func (a *atomicInt) get() int {
    a.lock.Lock()
    defer a.lock.Unlock()
    return a.value
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get()) // a.value时还会报错data race
}

如果我们想让一块代码区加锁,我们可以如下

func (a *atomicInt) increment() {
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        a.value++
    }()
}

传递同步机制都是通过共享内存来实现同步,我们要通过通信来实现共享内存

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6739

(0)
Walker的头像Walker
上一篇 14小时前
下一篇 18小时前

相关推荐

  • Go工程师体系课 013

    订单事务 先扣库存 后扣库存 都会对库存和订单都会有影响, 所以要使用分布式事务 业务(下单不对付)业务问题 支付成功再扣减(下单了,支付时没库存了) 订单扣减,不支付(订单超时归还)【常用方式】 事务和分布式事务 1. 什么是事务? 事务(Transaction)是数据库管理系统中的一个重要概念,它是一组数据库操作的集合,这些操作要么全部成功执行,要么全部…

  • Go资深工程师讲解(慕课) 008_GMP调度器与Go设计哲学

    Go GMP 调度器与设计哲学 对应视频 9-2 go语言的调度器、18-1 体会Go语言的设计、18-2 课程总结 1. Go 调度器演进 1.0 时代:单线程调度器(Go 0.x) 只有一个线程运行 goroutine 所有 goroutine 排队等待 无法利用多核 1.1 时代:多线程调度器(Go 1.0) 引入多线程 但全局锁竞争严重,性能瓶颈 1…

  • Go工程师体系课 007

    商品微服务 实体结构说明 本模块包含以下核心实体: 商品(Goods) 商品分类(Category) 品牌(Brands) 轮播图(Banner) 品牌分类(GoodsCategoryBrand) 1. 商品(Goods) 描述平台中实际展示和销售的商品信息。 字段说明 字段名 类型 说明 name String 商品名称,必填 brand Pointer …

    后端开发 10小时前
    100
  • Go工程师体系课 017

    限流、熔断与降级入门(含 Sentinel 实战) 结合课件第 3 章(3-1 ~ 3-9)的视频要点,整理一套面向初学者的服务保护指南,帮助理解“为什么需要限流、熔断和降级”,以及如何用 Sentinel 快速上手。 学习路线速览 3-1 理解服务雪崩与限流、熔断、降级的背景 3-2 Sentinel 与 Hystrix 对比,明确技术选型 3-3 Sen…

    后端开发 3分钟前
    000
  • Go工程师体系课 014

    rocketmq 快速入门 去我们的各种配置(podman)看是怎么安装的 概念介绍 RocketMQ 是阿里开源、Apache 顶级项目的分布式消息中间件,核心组件: NameServer:服务发现与路由 Broker:消息存储、投递、拉取 Producer:消息生产者(发送消息) Consumer:消息消费者(订阅并消费消息) Topic/Tag:主题/…

    后端开发 3小时前
    100
简体中文 繁体中文 English