Go資深工程師講解(慕課) 004

004

goroutine

package main

import (
    "fmt"
    "time"
)

func main() {
    for i:=0;i<10;i++{
        go func(i int) {
            fmt.Printf("Hello from goroutine %d \n",i) // i/o操作可以交出控制權,手動操作交出控制權runtime.Gosched()
        }(i)
    }
    // main直接退出了,還沒來得急執行協程
  time.Sleep(time.Millisecond)
}
  • 輕量級“線程”
  • 非搶佔式多任務處理,由協程主動交出控制權(線程是搶佔式任務處理)
  • 編譯器/解釋器/虛擬機層面的多任務
  • 多個協程可能在一個或多個線程上運行
  • 手動交出控制權 runtime.Gosched()

子程序是協程的一個特例,協程是雙向的

線程與協程
goroutine 協程
誰和誰放在一個線程,這個不用管由調度器來管理

  • 在函數前加上 go 關鍵字,就能送給調度器運行
  • 不需要 在定義時區分是否是異步函數
  • 調度器在合適的點進行切換
  • 使用 race 來檢測數據訪問衝突

goroutine 可能的切換點

  • I/O,select
  • channel
  • 等待鎖
  • 函數調用
  • runtime.Gosched() 手動

channel

goroutine 和 goroutine 之間可以開出很多個雙向通道,它是 goroutine 和 goroutine 之間的交互。有發,就得有收,要不就會卡在那(deadlock)。

package main

import (
    "fmt"
    "time"
)

func worker(c chan int)  {
    for{
        n:=<-c
        fmt.Println(n)
    }
}

func chanDemo()  {
    c:=make(chan int) //var c chan int 這只是定義了一個c,這是一個chan類型內部值是int的變量,並沒有生成一個chan 要生成,需要用make
    //用go關鍵詞
    /*go func() {
        for{
            n:=<-c //收數據
            fmt.Println(n)
        }
    }()*/
    go worker(c)
    c<-1 //有發就得有收
    c<-2
    time.Sleep(time.Millisecond) //簡單發一個sleep這樣這兩個值都打印出來,要不c還沒打印,main函數已經結束,收的2還來不及打印
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %d\n",id,<-c)
    }
}

func chanDemo()  {
    c:=make(chan int)
    go worker(0,c)
    c<-1 //有發就得有收
    c<-2
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %c\n",id,<-c)
    }
}

func chanDemo()  {
    //c:=make(chan int)
    //go worker(0,c)
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=make(chan int)
        go worker(i,channels[i])
    }
    //c<-1 //有發就得有收
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

channel 做為返回值,返回一個 channel

package main

import (
    "fmt"
    "time"
)

//
func createWorker(id int) chan int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func chanDemo()  {
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=createWorker(i)
    }
    //c<-1 //有發就得有收
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

有時候我們要明確收發功能,所以我們可以在返回類型上加箭頭,(可能收,可能發,注意位置)chan<-收數據 <-chan 發數據,所以func createWorker(id int) chan<- int {var channels [10]chan<- int 改成有箭頭

bufferedChan

我們建立 channel 有發就得有收,要不程序就會 deadlock,如下

func bufferedChannel(){
    c:=make(chan int)
    c <-1
}
func main(){
    bufferedChannel() //程序掛掉,因為發了沒人收
}

加入緩存區

func bufferedChannel(){
    c:=make(chan int,3)
    c <-1
    c <-2
    c <-3
    c <-4 //發4會報錯,沒人收,緩存大小是3
}
func main(){
    bufferedChannel() //程序掛掉,因為發了沒人收
}
package main

import (
    "fmt"
    "time"
)

func createWorker(id int) chan<- int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func bufferedChannel(){
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    time.Sleep(time.Millisecond)
}

func main() {
    bufferedChannel()
}

channel 是可以 close 掉的,可以通知發完了(由發送方來完成 close)

func channelClose()  {
//    數據有明確的結尾的話,發送方可以通知close
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    close(c) //關閉 一旦close 接收方就會收到0值 空串或0
    time.Sleep(time.Millisecond) //收一毫秒的空值
}
// 那我們接收方也要做一個判斷,如果有值就收沒值就退出唄
func createWorker(id int) chan<- int  {
    c:=make(chan int)
    //真正的worker
    go func() {
        for{
            n,ok:=<-c
            if !ok {
                break
            }
            fmt.Printf("Workder %d received %c\n",id,n)

        }
    }()
    return c
}
// 還有一種方法,我們可以通過range來判斷是否結束了
func createWorker(id int) chan<- int {
    c := make(chan int)
    //真正的worker
    go func() {
        for n := range c {
            fmt.Printf("Workder %d received %c\n", id, n)
        }
    }()
    return c
}

channel buffered channel 發送方 close() 接收方兩種判斷結束的方式n,ok:=<-crange CSP 模型
不要能過共享內存來通信,通過通信來共享內存

等待任務

package main

import (
    "fmt"
)

//創建結構
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        done <- true //輸出一個完成true
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        <-workers[i].done //讀狀態出來
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        <-workers[i].done // 這樣寫會按順序執行,讀個小寫a 輸出一個
    }
}

func main() {
    chanDemo()
}

希望是把所有任務都發出去後,再來等 20 done

package main

import (
    "fmt"
)

//創建結構
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        go func() {
            done <- true //輸出一個完成true
        }()
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        //<-workers[i].done //讀狀態出來
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        //<-workers[i].done // 這樣寫會按順序執行,讀個小寫a 輸出一個
    }
     //等所有發送都完成後再收done的狀態
    for _,workder:=range workers { //這樣會出錯,看小寫的打印完,打寫的沒有打印,這是因為所有的channel發都是block的,小寫的完了有10個done,沒人收又發10個大寫的,就循環等待了,有個簡單快速的解決方法就是,將done<-true放到另goroutine中去做 ,或者將大小寫分開,小寫發完,done完再去搞大寫
        <-workder.done
        <-workder.done
    }
}

func main() {
    chanDemo()
}

WaitGroup

我們再改寫一下上個例子

package main

import (
    "fmt"
    "sync"
)

//創建結構
type workder struct {
    in chan int
    wg *sync.WaitGroup //共享一個
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        wg: wg,
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) //我們有20個任務就開20個
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

再抽象一下 將 wg 封閉到結構的函數中

package main

import (
    "fmt"
    "sync"
)

//創建結構
type workder struct {
    in chan int
    done func()
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        done: func() {
            wg.Done()
        },
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) //我們有20個任務就開20個
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

channel 來實現樹的遍歷

// main.go
func main(){
    c:=root.TraverseWithChannel()
    maxNode:=0
    for node:=range c{
        if node.Value>maxNode {
            maxNode = node.Value
        }
    }
    fmt.Println("Max node value:", maxNode)
}
// traverse.go
package tree

import "fmt"

//中序遍歷 先左在中在右
func (node *Node) Traverse() {
    /*if node==nil {
        return
    }
    node.Left.Traverse()
    //只能作打印,
    node.Print()
    node.Right.Traverse()*/

    node.TraverseFunc(func(n *Node) {
        n.Print()
    })
    fmt.Println()
}

//改造一下
func (node *Node) TraverseFunc(f func(*Node)) {
    if node == nil {
        return
    }
    node.Left.TraverseFunc(f)
    f(node)
    node.Right.TraverseFunc(f)
}
// 返回一個channel
func (node *Node) TraverseWithChannel() chan *Node{
    out:=make(chan *Node)
    go func() {
        node.TraverseFunc(func(node *Node) {
            out <- node
        })
        close(out) //注意關閉
    }()
    return out
}

select 的使用

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func main() {
    //var c1, c2 chan int
    //    從c1或c2裡面搜 誰快搜誰 ,實現非阻塞式的獲取就用select
    var c1, c2 = generator(), generator()
    for {
        select {
        case n := <-c1:
            fmt.Println("Received from c1:", n)
        case n := <-c2:
            fmt.Println("Received from c2:", n)
        default:
            fmt.Println("No value received")

        }
    }

}
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            //隨機sleep1500毫秒 使得c1,c2輸出的速度不一樣
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil 走的default
    //    從c1或c2裡面搜 誰來的快從誰里取 ,實現非阻塞式的獲取就用select
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    hasValue := false
    for {
        var activeWorker chan<- int
        if hasValue {
            activeWorker = workder
        }
        select {
        case n = <-c1:
            hasValue = true

        case n = <-c2:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }

    }

}

生成數據和消耗數據速度不一樣,會導致有些數據丟失

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            //隨機sleep1500毫秒 使得c1,c2輸出的速度不一樣
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        time.Sleep(2 * time.Second) //這樣有些數據沒打印出來 新收到的數據會把原來的數據衝掉,我們可以做個數據緩存(排隊)然後去消耗它
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil 走的default
    //    從c1或c2裡面搜 誰來的快從誰里取 ,實現非阻塞式的獲取就用select
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    //hasValue := false
    var values []int //緩存數據
    //10秒退出
    tm := time.After(20 * time.Second)
    //每秒看一下隊列的長度
    tick := time.Tick(time.Second)
    for {
        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = workder
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            //hasValue = true
            values = append(values, n)
        case n = <-c2:
            //hasValue = true
            values = append(values, n)
        case activeWorker <- activeValue: //activeValue有值才去送數據
            //hasValue = false
            values = values[1:]
        case <-tick:
            fmt.Println("queue len = ", len(values))
            //fmt.Printf("values= %v\n",values)
        case <-time.After(800 * time.Millisecond):
            //數據生的太慢 800毫秒沒有數據提示超時
            fmt.Println("timeout")
        case <-tm:
            //10秒後退出,timer
            fmt.Println("bye")
            return

        }

    }

}

傳統同步方式

互斥量的使用

package main

import (
    "fmt"
    "time"
)

type atomicInt int //線程安全的

func (a *atomicInt) increment()  {
    *a++
}

func (a *atomicInt) get() int {
    return int(*a)
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a)
}

我們可以在命令行下去查看運行結果,實際上會提示資源DATA Race

 go run --raise atomic.go
package main

import (
    "fmt"
    "sync"
    "time"
)

type atomicInt struct {
    value int
    lock sync.Mutex
} //線程安全的

func (a *atomicInt) increment()  {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.value++
}

func (a *atomicInt) get() int {
    a.lock.Lock()
    defer a.lock.Unlock()
    return a.value
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get()) // a.value時還會報錯data race
}

如果我們想讓一塊代碼區加鎖,我們可以如下

func (a *atomicInt) increment() {
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        a.value++
    }()
}

傳遞同步機制都是通過共享內存來實現同步,我們要通過通信來實現共享內存

主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://walker-learn.xyz/archives/6739

(0)
Walker的頭像Walker
上一篇 2026年3月8日 15:11
下一篇 2026年3月9日 12:56

相關推薦

  • Go工程師體系課 002

    GOPATH 與 Go Modules 的區別 1. 概念 GOPATH 是 Go 的早期依賴管理機制。 所有的 Go 項目和依賴包必須放在 GOPATH 目錄中(默認是 ~/go)。 一定要設置 GO111MODULE=off 項目路徑必須按照 src/包名 的結構組織。 不支持版本控制,依賴管理需要手動處理(例如 go get)。 查找依賴包的順序是 g…

    2026年3月6日
    5700
  • 編程基礎 0008_標準庫進階

    Go 標準庫進階 系統整理 Go 標準庫中最常用的包,重點覆蓋 io、os、bufio、strings、time、fmt 等 1. io 包核心接口 Go 的 I/O 設計圍繞幾個核心接口展開,幾乎所有 I/O 操作都基於它們。 // 最基礎的兩個接口 type Reader interface { Read(p []byte) (n int, err er…

    後端開發 2026年3月6日
    7600
  • 編程基礎 0004_Web_beego開發

    beego 開始 2 文章的添加與刪除 創建 TopicController // controllers中添加topic.go package controllers import "github.com/astaxie/beego" type TopicController struct { beego.Controller } fu…

    後端開發 2026年3月6日
    5600
  • Go工程師體系課 018

    API 網關與持續部署入門(Kong & Jenkins) 對應資料目錄《第 2 章 Jenkins 入門》《第 3 章 通過 Jenkins 部署服務》,整理 Kong 與 Jenkins 在企業級持續交付中的實戰路徑。即便零基礎,也能順著步驟搭建出自己的網關 + 持續部署流水線。 課前導覽:甚麼是 API 網關 API 網關位於客戶端與後端微服務…

    後端開發 2026年3月6日
    8000
  • Go日積月累 電子書目錄與推薦

    Go 語言電子書精華整理與推薦 基於 48 份 Go 語言電子書資料,按主題提煉為 4 篇系統化精華文檔。整理時間:2026-03-06 精華文章導讀 以下 4 篇文章從 48 份電子書中提煉核心知識,按主題系統化整理,覆蓋 Go 語言從底層原理到企業實戰的完整知識體系。 1. Go 底層原理與源碼精華 知識來源:《Go 源碼剖析》(雨痕)、《Go 1.4 …

    後端開發 2026年3月6日
    10400
簡體中文 繁體中文 English