Go资深工程师讲解(慕课) 004

004

goroutine

package main

import (
    "fmt"
    "time"
)

func main() {
    for i:=0;i<10;i++{
        go func(i int) {
            fmt.Printf("Hello from goroutine %d \n",i) // i/o操作可以交出控制权,手动操作交出控制权runtime.Gosched()
        }(i)
    }
    // main直接退出了,还没来得急执行协程
  time.Sleep(time.Millisecond)
}
  • 轻量级“线程”
  • 非抢占式多任务处理,由协程主动交出控制权(线程是抢占式任务处理)
  • 编译器/解释器/虚拟机层面的多任务
  • 多个协程可能在一个或多个线程上运行
  • 手动交出控制权 runtime.Gosched()

子程序是协程的一个特例,协程是双向的

线程与协程
goroutine 协程
谁和谁放在一个线程,这个不用管由调度器来管理

  • 在函数前加上 go 关键字,就能送给调度器运行
  • 不需要 在定义时区分是否是异步函数
  • 调度器在合适的点进行切换
  • 使用 race 来检测数据访问冲突

goroutine 可能的切换点

  • I/O,select
  • channel
  • 等待锁
  • 函数调用
  • runtime.Gosched() 手动

channel

goroutine 和 goroutine 之间可以开出很多个双向通道,它是 goroutine 和 goroutine 之间的交互。有发,就得有收,要不就会卡在那(deadlock)。

package main

import (
    "fmt"
    "time"
)

func worker(c chan int)  {
    for{
        n:=<-c
        fmt.Println(n)
    }
}

func chanDemo()  {
    c:=make(chan int) //var c chan int 这只是定义了一个c,这是一个chan类型内部值是int的变量,并没有生成一个chan 要生成,需要用make
    //用go关键词
    /*go func() {
        for{
            n:=<-c //收数据
            fmt.Println(n)
        }
    }()*/
    go worker(c)
    c<-1 //有发就得有收
    c<-2
    time.Sleep(time.Millisecond) //简单发一个sleep这样这两个值都打印出来,要不c还没打印,main函数已经结束,收的2还来不及打印
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %d\n",id,<-c)
    }
}

func chanDemo()  {
    c:=make(chan int)
    go worker(0,c)
    c<-1 //有发就得有收
    c<-2
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %c\n",id,<-c)
    }
}

func chanDemo()  {
    //c:=make(chan int)
    //go worker(0,c)
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=make(chan int)
        go worker(i,channels[i])
    }
    //c<-1 //有发就得有收
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

channel 做为返回值,返回一个 channel

package main

import (
    "fmt"
    "time"
)

//
func createWorker(id int) chan int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func chanDemo()  {
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=createWorker(i)
    }
    //c<-1 //有发就得有收
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

有时候我们要明确收发功能,所以我们可以在返回类型上加箭头,(可能收,可能发,注意位置)chan<-收数据 <-chan 发数据,所以func createWorker(id int) chan<- int {var channels [10]chan<- int 改成有箭头

bufferedChan

我们建立 channel 有发就得有收,要不程序就会 deadlock,如下

func bufferedChannel(){
    c:=make(chan int)
    c <-1
}
func main(){
    bufferedChannel() //程序挂掉,因为发了没人收
}

加入缓存区

func bufferedChannel(){
    c:=make(chan int,3)
    c <-1
    c <-2
    c <-3
    c <-4 //发4会报错,没人收,缓存大小是3
}
func main(){
    bufferedChannel() //程序挂掉,因为发了没人收
}
package main

import (
    "fmt"
    "time"
)

func createWorker(id int) chan<- int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func bufferedChannel(){
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    time.Sleep(time.Millisecond)
}

func main() {
    bufferedChannel()
}

channel 是可以 close 掉的,可以通知发完了(由发送方来完成 close)

func channelClose()  {
//    数据有明确的结尾的话,发送方可以通知close
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    close(c) //关闭 一旦close 接收方就会收到0值 空串或0
    time.Sleep(time.Millisecond) //收一毫秒的空值
}
// 那我们接收方也要做一个判断,如果有值就收没值就退出呗
func createWorker(id int) chan<- int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            n,ok:=<-c
            if !ok {
                break
            }
            fmt.Printf("Workder %d received %c\n",id,n)

        }
    }()
    return c
}
// 还有一种方法,我们可以通过range来判断是否结束了
func createWorker(id int) chan<- int {
    c := make(chan int)
    //真正的worker
    go func() {
        for n := range c {
            fmt.Printf("Workder %d received %c\n", id, n)
        }
    }()
    return c
}

channel buffered channel 发送方 close() 接收方两种判断结束的方式n,ok:=<-crange CSP 模型
不要能过共享内存来通信,通过通信来共享内存

等待任务

package main

import (
    "fmt"
)

//创建结构
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        done <- true //输出一个完成true
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        <-workers[i].done //读状态出来
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        <-workers[i].done // 这样写会按顺序执行,读个小写a 输出一个
    }
}

func main() {
    chanDemo()
}

希望是把所有任务都发出去后,再来等 20 done

package main

import (
    "fmt"
)

//创建结构
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        go func() {
            done <- true //输出一个完成true
        }()
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        //<-workers[i].done //读状态出来
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        //<-workers[i].done // 这样写会按顺序执行,读个小写a 输出一个
    }
     //等所有发送都完成后再收done的状态
    for _,workder:=range workers { //这样会出错,看小写的打印完,打写的没有打印,这是因为所有的channel发都是block的,小写的完了有10个done,没人收又发10个大写的,就循环等待了,有个简单快速的解决方法就是,将done<-true放到另goroutine中去做 ,或者将大小写分开,小写发完,done完再去搞大写
        <-workder.done
        <-workder.done
    }
}

func main() {
    chanDemo()
}

WaitGroup

我们再改写一下上个例子

package main

import (
    "fmt"
    "sync"
)

//创建结构
type workder struct {
    in chan int
    wg *sync.WaitGroup //共享一个
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        wg: wg,
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) //我们有20个任务就开20个
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

再抽象一下 将 wg 封闭到结构的函数中

package main

import (
    "fmt"
    "sync"
)

//创建结构
type workder struct {
    in chan int
    done func()
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        done: func() {
            wg.Done()
        },
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) //我们有20个任务就开20个
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

channel 来实现树的遍历

// main.go
func main(){
    c:=root.TraverseWithChannel()
    maxNode:=0
    for node:=range c{
        if node.Value>maxNode {
            maxNode = node.Value
        }
    }
    fmt.Println("Max node value:", maxNode)
}
// traverse.go
package tree

import "fmt"

//中序遍历 先左在中在右
func (node *Node) Traverse() {
    /*if node==nil {
        return
    }
    node.Left.Traverse()
    //只能作打印,
    node.Print()
    node.Right.Traverse()*/

    node.TraverseFunc(func(n *Node) {
        n.Print()
    })
    fmt.Println()
}

//改造一下
func (node *Node) TraverseFunc(f func(*Node)) {
    if node == nil {
        return
    }
    node.Left.TraverseFunc(f)
    f(node)
    node.Right.TraverseFunc(f)
}
// 返回一个channel
func (node *Node) TraverseWithChannel() chan *Node{
    out:=make(chan *Node)
    go func() {
        node.TraverseFunc(func(node *Node) {
            out <- node
        })
        close(out) //注意关闭
    }()
    return out
}

select 的使用

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func main() {
    //var c1, c2 chan int
    //    从c1或c2里面搜 谁快搜谁 ,实现非阻塞式的获取就用select
    var c1, c2 = generator(), generator()
    for {
        select {
        case n := <-c1:
            fmt.Println("Received from c1:", n)
        case n := <-c2:
            fmt.Println("Received from c2:", n)
        default:
            fmt.Println("No value received")

        }
    }

}
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            //随机sleep1500毫秒 使得c1,c2输出的速度不一样
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil 走的default
    //    从c1或c2里面搜 谁来的快从谁里取 ,实现非阻塞式的获取就用select
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    hasValue := false
    for {
        var activeWorker chan<- int
        if hasValue {
            activeWorker = workder
        }
        select {
        case n = <-c1:
            hasValue = true

        case n = <-c2:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }

    }

}

生成数据和消耗数据速度不一样,会导致有些数据丢失

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            //随机sleep1500毫秒 使得c1,c2输出的速度不一样
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        time.Sleep(2 * time.Second) //这样有些数据没打印出来 新收到的数据会把原来的数据冲掉,我们可以做个数据缓存(排队)然后去消耗它
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil 走的default
    //    从c1或c2里面搜 谁来的快从谁里取 ,实现非阻塞式的获取就用select
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    //hasValue := false
    var values []int //缓存数据
    //10秒退出
    tm := time.After(20 * time.Second)
    //每秒看一下队列的长度
    tick := time.Tick(time.Second)
    for {
        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = workder
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            //hasValue = true
            values = append(values, n)
        case n = <-c2:
            //hasValue = true
            values = append(values, n)
        case activeWorker <- activeValue: //activeValue有值才去送数据
            //hasValue = false
            values = values[1:]
        case <-tick:
            fmt.Println("queue len = ", len(values))
            //fmt.Printf("values= %v\n",values)
        case <-time.After(800 * time.Millisecond):
            //数据生的太慢 800毫秒没有数据提示超时
            fmt.Println("timeout")
        case <-tm:
            //10秒后退出,timer
            fmt.Println("bye")
            return

        }

    }

}

传统同步方式

互斥量的使用

package main

import (
    "fmt"
    "time"
)

type atomicInt int //线程安全的

func (a *atomicInt) increment()  {
    *a++
}

func (a *atomicInt) get() int {
    return int(*a)
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a)
}

我们可以在命令行下去查看运行结果,实际上会提示资源DATA Race

 go run --raise atomic.go
package main

import (
    "fmt"
    "sync"
    "time"
)

type atomicInt struct {
    value int
    lock sync.Mutex
} //线程安全的

func (a *atomicInt) increment()  {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.value++
}

func (a *atomicInt) get() int {
    a.lock.Lock()
    defer a.lock.Unlock()
    return a.value
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get()) // a.value时还会报错data race
}

如果我们想让一块代码区加锁,我们可以如下

func (a *atomicInt) increment() {
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        a.value++
    }()
}

传递同步机制都是通过共享内存来实现同步,我们要通过通信来实现共享内存

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6739

(0)
Walker的头像Walker
上一篇 2026年3月8日 15:11
下一篇 2026年3月9日 12:56

相关推荐

  • Go工程师体系课 002

    GOPATH 与 Go Modules 的区别 1. 概念 GOPATH 是 Go 的早期依赖管理机制。 所有的 Go 项目和依赖包必须放在 GOPATH 目录中(默认是 ~/go)。 一定要设置 GO111MODULE=off 项目路径必须按照 src/包名 的结构组织。 不支持版本控制,依赖管理需要手动处理(例如 go get)。 查找依赖包的顺序是 g…

    2026年3月6日
    6000
  • 编程基础 0008_标准库进阶

    Go 标准库进阶 系统整理 Go 标准库中最常用的包,重点覆盖 io、os、bufio、strings、time、fmt 等 1. io 包核心接口 Go 的 I/O 设计围绕几个核心接口展开,几乎所有 I/O 操作都基于它们。 // 最基础的两个接口 type Reader interface { Read(p []byte) (n int, err er…

    后端开发 2026年3月6日
    8000
  • 编程基础 0004_Web_beego开发

    beego 开始 2 文章的添加与删除 创建 TopicController // controllers中添加topic.go package controllers import "github.com/astaxie/beego" type TopicController struct { beego.Controller } fu…

    后端开发 2026年3月6日
    5900
  • Go工程师体系课 018

    API 网关与持续部署入门(Kong & Jenkins) 对应资料目录《第 2 章 Jenkins 入门》《第 3 章 通过 Jenkins 部署服务》,整理 Kong 与 Jenkins 在企业级持续交付中的实战路径。即便零基础,也能顺着步骤搭建出自己的网关 + 持续部署流水线。 课前导览:什么是 API 网关 API 网关位于客户端与后端微服务…

    后端开发 2026年3月6日
    8100
  • Go日积月累 电子书目录与推荐

    Go 语言电子书精华整理与推荐 基于 48 份 Go 语言电子书资料,按主题提炼为 4 篇系统化精华文档。整理时间:2026-03-06 精华文章导读 以下 4 篇文章从 48 份电子书中提炼核心知识,按主题系统化整理,覆盖 Go 语言从底层原理到企业实战的完整知识体系。 1. Go 底层原理与源码精华 知识来源:《Go 源码剖析》(雨痕)、《Go 1.4 …

    后端开发 2026年3月6日
    10600
简体中文 繁体中文 English