Go資深工程師講解(慕課) 004

004

goroutine

package main

import (
    "fmt"
    "time"
)

func main() {
    for i:=0;i<10;i++{
        go func(i int) {
            fmt.Printf("Hello from goroutine %d \n",i) // i/o操作可以交出控制權,手動操作交出控制權runtime.Gosched()
        }(i)
    }
    // main直接退出了,還沒來得急執行協程
  time.Sleep(time.Millisecond)
}
  • 輕量級“線程”
  • 非搶佔式多任務處理,由協程主動交出控制權(線程是搶佔式任務處理)
  • 編譯器/解釋器/虛擬機層面的多任務
  • 多個協程可能在一個或多個線程上運行
  • 手動交出控制權 runtime.Gosched()

子程序是協程的一個特例,協程是雙向的

線程與協程
goroutine 協程
誰和誰放在一個線程,這個不用管由調度器來管理

  • 在函數前加上 go 關鍵字,就能送給調度器運行
  • 不需要 在定義時區分是否是異步函數
  • 調度器在合適的點進行切換
  • 使用 race 來檢測數據訪問衝突

goroutine 可能的切換點

  • I/O,select
  • channel
  • 等待鎖
  • 函數調用
  • runtime.Gosched() 手動

channel

goroutine 和 goroutine 之間可以開出很多個雙向通道,它是 goroutine 和 goroutine 之間的交互。有發,就得有收,要不就會卡在那(deadlock)。

package main

import (
    "fmt"
    "time"
)

func worker(c chan int)  {
    for{
        n:=<-c
        fmt.Println(n)
    }
}

func chanDemo()  {
    c:=make(chan int) //var c chan int 這只是定義了一個c,這是一個chan類型內部值是int的變量,並沒有生成一個chan 要生成,需要用make
    //用go關鍵詞
    /*go func() {
        for{
            n:=<-c //收數據
            fmt.Println(n)
        }
    }()*/
    go worker(c)
    c<-1 //有發就得有收
    c<-2
    time.Sleep(time.Millisecond) //簡單發一個sleep這樣這兩個值都打印出來,要不c還沒打印,main函數已經結束,收的2還來不及打印
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %d\n",id,<-c)
    }
}

func chanDemo()  {
    c:=make(chan int)
    go worker(0,c)
    c<-1 //有發就得有收
    c<-2
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %c\n",id,<-c)
    }
}

func chanDemo()  {
    //c:=make(chan int)
    //go worker(0,c)
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=make(chan int)
        go worker(i,channels[i])
    }
    //c<-1 //有發就得有收
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

channel 做為返回值,返回一個 channel

package main

import (
    "fmt"
    "time"
)

//
func createWorker(id int) chan int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func chanDemo()  {
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=createWorker(i)
    }
    //c<-1 //有發就得有收
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

有時候我們要明確收發功能,所以我們可以在返回類型上加箭頭,(可能收,可能發,注意位置)chan<-收數據 <-chan 發數據,所以func createWorker(id int) chan<- int {var channels [10]chan<- int 改成有箭頭

bufferedChan

我們建立 channel 有發就得有收,要不程序就會 deadlock,如下

func bufferedChannel(){
    c:=make(chan int)
    c <-1
}
func main(){
    bufferedChannel() //程序掛掉,因為發了沒人收
}

加入緩存區

func bufferedChannel(){
    c:=make(chan int,3)
    c <-1
    c <-2
    c <-3
    c <-4 //發4會報錯,沒人收,緩存大小是3
}
func main(){
    bufferedChannel() //程序掛掉,因為發了沒人收
}
package main

import (
    "fmt"
    "time"
)

func createWorker(id int) chan<- int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func bufferedChannel(){
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    time.Sleep(time.Millisecond)
}

func main() {
    bufferedChannel()
}

channel 是可以 close 掉的,可以通知發完了(由發送方來完成 close)

func channelClose()  {
//    數據有明確的結尾的話,發送方可以通知close
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    close(c) //關閉 一旦close 接收方就會收到0值 空串或0
    time.Sleep(time.Millisecond) //收一毫秒的空值
}
// 那我們接收方也要做一個判斷,如果有值就收沒值就退出唄
func createWorker(id int) chan<- int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            n,ok:=<-c
            if !ok {
                break
            }
            fmt.Printf("Workder %d received %c\n",id,n)

        }
    }()
    return c
}
// 還有一種方法,我們可以通過range來判斷是否結束了
func createWorker(id int) chan<- int {
    c := make(chan int)
    //真正的worker
    go func() {
        for n := range c {
            fmt.Printf("Workder %d received %c\n", id, n)
        }
    }()
    return c
}

channel buffered channel 發送方 close() 接收方兩種判斷結束的方式n,ok:=<-crange CSP 模型
不要能過共享內存來通信,通過通信來共享內存

等待任務

package main

import (
    "fmt"
)

//創建結構
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        done <- true //輸出一個完成true
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        <-workers[i].done //讀狀態出來
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        <-workers[i].done // 這樣寫會按順序執行,讀個小寫a 輸出一個
    }
}

func main() {
    chanDemo()
}

希望是把所有任務都發出去後,再來等 20 done

package main

import (
    "fmt"
)

//創建結構
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        go func() {
            done <- true //輸出一個完成true
        }()
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        //<-workers[i].done //讀狀態出來
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        //<-workers[i].done // 這樣寫會按順序執行,讀個小寫a 輸出一個
    }
     //等所有發送都完成後再收done的狀態
    for _,workder:=range workers { //這樣會出錯,看小寫的打印完,打寫的沒有打印,這是因為所有的channel發都是block的,小寫的完了有10個done,沒人收又發10個大寫的,就循環等待了,有個簡單快速的解決方法就是,將done<-true放到另goroutine中去做 ,或者將大小寫分開,小寫發完,done完再去搞大寫
        <-workder.done
        <-workder.done
    }
}

func main() {
    chanDemo()
}

WaitGroup

我們再改寫一下上個例子

package main

import (
    "fmt"
    "sync"
)

//創建結構
type workder struct {
    in chan int
    wg *sync.WaitGroup //共享一個
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        wg: wg,
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) //我們有20個任務就開20個
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

再抽象一下 將 wg 封閉到結構的函數中

package main

import (
    "fmt"
    "sync"
)

//創建結構
type workder struct {
    in chan int
    done func()
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        done: func() {
            wg.Done()
        },
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) //我們有20個任務就開20個
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

channel 來實現樹的遍歷

// main.go
func main(){
    c:=root.TraverseWithChannel()
    maxNode:=0
    for node:=range c{
        if node.Value>maxNode {
            maxNode = node.Value
        }
    }
    fmt.Println("Max node value:", maxNode)
}
// traverse.go
package tree

import "fmt"

//中序遍歷 先左在中在右
func (node *Node) Traverse() {
    /*if node==nil {
        return
    }
    node.Left.Traverse()
    //只能作打印,
    node.Print()
    node.Right.Traverse()*/

    node.TraverseFunc(func(n *Node) {
        n.Print()
    })
    fmt.Println()
}

//改造一下
func (node *Node) TraverseFunc(f func(*Node)) {
    if node == nil {
        return
    }
    node.Left.TraverseFunc(f)
    f(node)
    node.Right.TraverseFunc(f)
}
// 返回一個channel
func (node *Node) TraverseWithChannel() chan *Node{
    out:=make(chan *Node)
    go func() {
        node.TraverseFunc(func(node *Node) {
            out <- node
        })
        close(out) //注意關閉
    }()
    return out
}

select 的使用

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func main() {
    //var c1, c2 chan int
    //    從c1或c2裡面搜 誰快搜誰 ,實現非阻塞式的獲取就用select
    var c1, c2 = generator(), generator()
    for {
        select {
        case n := <-c1:
            fmt.Println("Received from c1:", n)
        case n := <-c2:
            fmt.Println("Received from c2:", n)
        default:
            fmt.Println("No value received")

        }
    }

}
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            //隨機sleep1500毫秒 使得c1,c2輸出的速度不一樣
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil 走的default
    //    從c1或c2裡面搜 誰來的快從誰里取 ,實現非阻塞式的獲取就用select
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    hasValue := false
    for {
        var activeWorker chan<- int
        if hasValue {
            activeWorker = workder
        }
        select {
        case n = <-c1:
            hasValue = true

        case n = <-c2:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }

    }

}

生成數據和消耗數據速度不一樣,會導致有些數據丟失

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            //隨機sleep1500毫秒 使得c1,c2輸出的速度不一樣
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        time.Sleep(2 * time.Second) //這樣有些數據沒打印出來 新收到的數據會把原來的數據衝掉,我們可以做個數據緩存(排隊)然後去消耗它
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil 走的default
    //    從c1或c2裡面搜 誰來的快從誰里取 ,實現非阻塞式的獲取就用select
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    //hasValue := false
    var values []int //緩存數據
    //10秒退出
    tm := time.After(20 * time.Second)
    //每秒看一下隊列的長度
    tick := time.Tick(time.Second)
    for {
        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = workder
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            //hasValue = true
            values = append(values, n)
        case n = <-c2:
            //hasValue = true
            values = append(values, n)
        case activeWorker <- activeValue: //activeValue有值才去送數據
            //hasValue = false
            values = values[1:]
        case <-tick:
            fmt.Println("queue len = ", len(values))
            //fmt.Printf("values= %v\n",values)
        case <-time.After(800 * time.Millisecond):
            //數據生的太慢 800毫秒沒有數據提示超時
            fmt.Println("timeout")
        case <-tm:
            //10秒後退出,timer
            fmt.Println("bye")
            return

        }

    }

}

傳統同步方式

互斥量的使用

package main

import (
    "fmt"
    "time"
)

type atomicInt int //線程安全的

func (a *atomicInt) increment()  {
    *a++
}

func (a *atomicInt) get() int {
    return int(*a)
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a)
}

我們可以在命令行下去查看運行結果,實際上會提示資源DATA Race

 go run --raise atomic.go
package main

import (
    "fmt"
    "sync"
    "time"
)

type atomicInt struct {
    value int
    lock sync.Mutex
} //線程安全的

func (a *atomicInt) increment()  {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.value++
}

func (a *atomicInt) get() int {
    a.lock.Lock()
    defer a.lock.Unlock()
    return a.value
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get()) // a.value時還會報錯data race
}

如果我們想讓一塊代碼區加鎖,我們可以如下

func (a *atomicInt) increment() {
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        a.value++
    }()
}

傳遞同步機制都是通過共享內存來實現同步,我們要通過通信來實現共享內存

主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://walker-learn.xyz/archives/6739

(0)
Walker的頭像Walker
上一篇 12小時前
下一篇 16小時前

相關推薦

  • Go工程師體系課 014

    rocketmq 快速入門 去我們的各種配置(podman)看是怎麼安裝的 概念介紹 RocketMQ 是阿里開源、Apache 頂級項目的分布式消息中間件,核心組件: NameServer:服務發現與路由 Broker:消息存儲、投遞、拉取 Producer:消息生產者(發送消息) Consumer:消息消費者(訂閱並消費消息) Topic/Tag:主題/…

    後端開發 1小時前
    000
  • Go日積月累 電子書目錄與推薦

    Go 語言電子書精華整理與推薦 基於 48 份 Go 語言電子書資料,按主題提煉為 4 篇系統化精華文檔。整理時間:2026-03-06 精華文章導讀 以下 4 篇文章從 48 份電子書中提煉核心知識,按主題系統化整理,覆蓋 Go 語言從底層原理到企業實戰的完整知識體系。 1. Go 底層原理與源碼精華 知識來源:《Go 源碼剖析》(雨痕)、《Go 1.4 …

    後端開發 1天前
    1000
  • Go工程師體系課 020

    性能優化與 pprof 1. 先測量後優化 "Premature optimization is the root of all evil." — Donald Knuth 優化流程:1. 先寫正確的代碼2. 用 Benchmark 確認性能瓶頸3. 用 pprof 定位具體位置4. 優化 → 再測量 → 對比 2. pprof 工具 2.1 在 HTTP …

  • Go工程師體系課 009

    其它一些功能 個人中心 收藏 管理收貨地址(增刪改查) 留言 拷貝inventory_srv--> userop_srv 查詢替換所有的inventory Elasticsearch 深度解析文檔 1. 甚麼是Elasticsearch Elasticsearch是一個基於Apache Lucene構建的分布式、RESTful搜索和分析引擎,能夠快速地…

  • 編程基礎 0009_testing詳解

    Go testing 詳解 目錄 testing 包基礎 表格驅動測試 子測試 t.Run 基準測試 Benchmark 測試覆蓋率 TestMain httptest 包 Mock 和接口測試技巧 模糊測試 Fuzz 1. testing 包基礎 1.1 測試文件和函數命名規則 Go 測試遵循嚴格的命名約定: 測試文件以 _test.go 結尾(如 use…

    後端開發 18小時前
    100
簡體中文 繁體中文 English