004
goroutine
package main
import (
"fmt"
"time"
)
func main() {
for i:=0;i<10;i++{
go func(i int) {
fmt.Printf("Hello from goroutine %d \n",i) // i/o操作可以交出控制權,手動操作交出控制權runtime.Gosched()
}(i)
}
// main直接退出了,還沒來得急執行協程
time.Sleep(time.Millisecond)
}
- 輕量級“線程”
- 非搶佔式多任務處理,由協程主動交出控制權(線程是搶佔式任務處理)
- 編譯器/解釋器/虛擬機層面的多任務
- 多個協程可能在一個或多個線程上運行
- 手動交出控制權
runtime.Gosched()
子程序是協程的一個特例,協程是雙向的
- 在函數前加上 go 關鍵字,就能送給調度器運行
- 不需要 在定義時區分是否是異步函數
- 調度器在合適的點進行切換
- 使用 race 來檢測數據訪問衝突
goroutine 可能的切換點
- I/O,select
- channel
- 等待鎖
- 函數調用
- runtime.Gosched() 手動
channel
goroutine 和 goroutine 之間可以開出很多個雙向通道,它是 goroutine 和 goroutine 之間的交互。有發,就得有收,要不就會卡在那(deadlock)。
package main
import (
"fmt"
"time"
)
func worker(c chan int) {
for{
n:=<-c
fmt.Println(n)
}
}
func chanDemo() {
c:=make(chan int) //var c chan int 這只是定義了一個c,這是一個chan類型內部值是int的變量,並沒有生成一個chan 要生成,需要用make
//用go關鍵詞
/*go func() {
for{
n:=<-c //收數據
fmt.Println(n)
}
}()*/
go worker(c)
c<-1 //有發就得有收
c<-2
time.Sleep(time.Millisecond) //簡單發一個sleep這樣這兩個值都打印出來,要不c還沒打印,main函數已經結束,收的2還來不及打印
}
func main() {
chanDemo()
}
package main
import (
"fmt"
"time"
)
func worker(id int ,c chan int) {
for{
fmt.Printf("Workder %d received %d\n",id,<-c)
}
}
func chanDemo() {
c:=make(chan int)
go worker(0,c)
c<-1 //有發就得有收
c<-2
time.Sleep(time.Millisecond)
}
func main() {
chanDemo()
}
package main
import (
"fmt"
"time"
)
func worker(id int ,c chan int) {
for{
fmt.Printf("Workder %d received %c\n",id,<-c)
}
}
func chanDemo() {
//c:=make(chan int)
//go worker(0,c)
var channels [10]chan int
for i:=0;i<10;i++{
channels[i]=make(chan int)
go worker(i,channels[i])
}
//c<-1 //有發就得有收
//c<-2
for i:=0;i<10;i++{
channels[i] <- 'a'+i
}
for i:=0;i<10;i++{
channels[i] <- 'A'+i
}
time.Sleep(time.Millisecond)
}
func main() {
chanDemo()
}
channel 做爲返回值,返回一個 channel
package main
import (
"fmt"
"time"
)
//
func createWorker(id int) chan int {
c:=make(chan int)
//真正的worker
go func() {
for{
fmt.Printf("Workder %d received %c\n",id,<-c)
}
}()
return c
}
func chanDemo() {
var channels [10]chan int
for i:=0;i<10;i++{
channels[i]=createWorker(i)
}
//c<-1 //有發就得有收
//c<-2
for i:=0;i<10;i++{
channels[i] <- 'a'+i
}
for i:=0;i<10;i++{
channels[i] <- 'A'+i
}
time.Sleep(time.Millisecond)
}
func main() {
chanDemo()
}
有時候我們要明確收發功能,所以我們可以在返回類型上加箭頭,(可能收,可能發,注意位置)chan<-收數據 <-chan 發數據,所以func createWorker(id int) chan<- int { 和 var channels [10]chan<- int 改成有箭頭
bufferedChan
我們建立 channel 有發就得有收,要不程序就會 deadlock,如下
func bufferedChannel(){
c:=make(chan int)
c <-1
}
func main(){
bufferedChannel() //程序掛掉,因爲發了沒人收
}
加入緩存區
func bufferedChannel(){
c:=make(chan int,3)
c <-1
c <-2
c <-3
c <-4 //發4會報錯,沒人收,緩存大小是3
}
func main(){
bufferedChannel() //程序掛掉,因爲發了沒人收
}
package main
import (
"fmt"
"time"
)
func createWorker(id int) chan<- int {
c:=make(chan int)
//真正的worker
go func() {
for{
fmt.Printf("Workder %d received %c\n",id,<-c)
}
}()
return c
}
func bufferedChannel(){
c:=createWorker(0)
c <-97
c <-98
c <-99
c <-100
time.Sleep(time.Millisecond)
}
func main() {
bufferedChannel()
}
channel 是可以 close 掉的,可以通知發完了(由發送方來完成 close)
func channelClose() {
// 數據有明確的結尾的話,發送方可以通知close
c:=createWorker(0)
c <-97
c <-98
c <-99
c <-100
close(c) //關閉 一旦close 接收方就會收到0值 空串或0
time.Sleep(time.Millisecond) //收一毫秒的空值
}
// 那我們接收方也要做一個判斷,如果有值就收沒值就退出唄
func createWorker(id int) chan<- int {
c:=make(chan int)
//真正的worker
go func() {
for{
n,ok:=<-c
if !ok {
break
}
fmt.Printf("Workder %d received %c\n",id,n)
}
}()
return c
}
// 還有一種方法,我們可以通過range來判斷是否結束了
func createWorker(id int) chan<- int {
c := make(chan int)
//真正的worker
go func() {
for n := range c {
fmt.Printf("Workder %d received %c\n", id, n)
}
}()
return c
}
channel buffered channel 發送方 close() 接收方兩種判斷結束的方式
n,ok:=<-c或rangeCSP 模型不要能過共享內存來通信,通過通信來共享內存
等待任務
package main
import (
"fmt"
)
//創建結構
type workder struct {
in chan int
done chan bool
}
//
func doWork(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("Workder %d received %c \n", id, n)
done <- true //輸出一個完成true
}
}
func createWroker(id int) workder {
w := workder{
in: make(chan int),
done: make(chan bool),
}
go doWork(id, w.in, w.done)
return w
}
func chanDemo() {
var workers [10]workder
for i := 0; i < 10; i++ {
workers[i] = createWroker(i)
}
for i := 0; i < 10; i++ {
workers[i].in <- 'a' + i
<-workers[i].done //讀狀態出來
}
for i := 0; i < 10; i++ {
workers[i].in <- 'A' + i
<-workers[i].done // 這樣寫會按順序執行,讀個小寫a 輸出一個
}
}
func main() {
chanDemo()
}
希望是把所有任務都發出去後,再來等 20 done
package main
import (
"fmt"
)
//創建結構
type workder struct {
in chan int
done chan bool
}
//
func doWork(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("Workder %d received %c \n", id, n)
go func() {
done <- true //輸出一個完成true
}()
}
}
func createWroker(id int) workder {
w := workder{
in: make(chan int),
done: make(chan bool),
}
go doWork(id, w.in, w.done)
return w
}
func chanDemo() {
var workers [10]workder
for i := 0; i < 10; i++ {
workers[i] = createWroker(i)
}
for i := 0; i < 10; i++ {
workers[i].in <- 'a' + i
//<-workers[i].done //讀狀態出來
}
for i := 0; i < 10; i++ {
workers[i].in <- 'A' + i
//<-workers[i].done // 這樣寫會按順序執行,讀個小寫a 輸出一個
}
//等所有發送都完成後再收done的狀態
for _,workder:=range workers { //這樣會出錯,看小寫的打印完,打寫的沒有打印,這是因爲所有的channel發都是block的,小寫的完了有10個done,沒人收又發10個大寫的,就循環等待了,有個簡單快速的解決方法就是,將done<-true放到另goroutine中去做 ,或者將大小寫分開,小寫發完,done完再去搞大寫
<-workder.done
<-workder.done
}
}
func main() {
chanDemo()
}
WaitGroup
我們再改寫一下上個例子
package main
import (
"fmt"
"sync"
)
//創建結構
type workder struct {
in chan int
wg *sync.WaitGroup //共享一個
}
//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
for n := range c {
fmt.Printf("Workder %d received %c \n", id, n)
wg.Done()
}
}
func createWroker(id int, wg *sync.WaitGroup) workder {
w := workder{
in: make(chan int),
wg: wg,
}
go doWork(id, w.in, wg)
return w
}
func chanDemo() {
var wg sync.WaitGroup
var workers [10]workder
for i := 0; i < 10; i++ {
workers[i] = createWroker(i, &wg)
}
wg.Add(20) //我們有20個任務就開20個
for i := 0; i < 10; i++ {
workers[i].in <- 'a' + i
}
for i := 0; i < 10; i++ {
workers[i].in <- 'A' + i
}
wg.Wait()
}
func main() {
chanDemo()
}
再抽象一下 將 wg 封閉到結構的函數中
package main
import (
"fmt"
"sync"
)
//創建結構
type workder struct {
in chan int
done func()
}
//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
for n := range c {
fmt.Printf("Workder %d received %c \n", id, n)
wg.Done()
}
}
func createWroker(id int, wg *sync.WaitGroup) workder {
w := workder{
in: make(chan int),
done: func() {
wg.Done()
},
}
go doWork(id, w.in, wg)
return w
}
func chanDemo() {
var wg sync.WaitGroup
var workers [10]workder
for i := 0; i < 10; i++ {
workers[i] = createWroker(i, &wg)
}
wg.Add(20) //我們有20個任務就開20個
for i := 0; i < 10; i++ {
workers[i].in <- 'a' + i
}
for i := 0; i < 10; i++ {
workers[i].in <- 'A' + i
}
wg.Wait()
}
func main() {
chanDemo()
}
channel 來實現樹的遍歷
// main.go
func main(){
c:=root.TraverseWithChannel()
maxNode:=0
for node:=range c{
if node.Value>maxNode {
maxNode = node.Value
}
}
fmt.Println("Max node value:", maxNode)
}
// traverse.go
package tree
import "fmt"
//中序遍歷 先左在中在右
func (node *Node) Traverse() {
/*if node==nil {
return
}
node.Left.Traverse()
//只能作打印,
node.Print()
node.Right.Traverse()*/
node.TraverseFunc(func(n *Node) {
n.Print()
})
fmt.Println()
}
//改造一下
func (node *Node) TraverseFunc(f func(*Node)) {
if node == nil {
return
}
node.Left.TraverseFunc(f)
f(node)
node.Right.TraverseFunc(f)
}
// 返回一個channel
func (node *Node) TraverseWithChannel() chan *Node{
out:=make(chan *Node)
go func() {
node.TraverseFunc(func(node *Node) {
out <- node
})
close(out) //注意關閉
}()
return out
}
select 的使用
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- i
i++
}
}()
return out
}
func main() {
//var c1, c2 chan int
// 從c1或c2裏面搜 誰快搜誰 ,實現非阻塞式的獲取就用select
var c1, c2 = generator(), generator()
for {
select {
case n := <-c1:
fmt.Println("Received from c1:", n)
case n := <-c2:
fmt.Println("Received from c2:", n)
default:
fmt.Println("No value received")
}
}
}
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
//隨機sleep1500毫秒 使得c1,c2輸出的速度不一樣
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- i
i++
}
}()
return out
}
func workder(id int, c chan int) {
for n := range c {
fmt.Printf("Workder %d received %d \n", id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go workder(id, c)
return c
}
func main() {
//var c1, c2 chan int // c1 and c2 = nil 走的default
// 從c1或c2裏面搜 誰來的快從誰裏取 ,實現非阻塞式的獲取就用select
var c1, c2 = generator(), generator()
workder := createWorker(0)
n := 0
hasValue := false
for {
var activeWorker chan<- int
if hasValue {
activeWorker = workder
}
select {
case n = <-c1:
hasValue = true
case n = <-c2:
hasValue = true
case activeWorker <- n:
hasValue = false
}
}
}
生成數據和消耗數據速度不一樣,會導致有些數據丟失
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
//隨機sleep1500毫秒 使得c1,c2輸出的速度不一樣
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- i
i++
}
}()
return out
}
func workder(id int, c chan int) {
for n := range c {
time.Sleep(2 * time.Second) //這樣有些數據沒打印出來 新收到的數據會把原來的數據沖掉,我們可以做個數據緩存(排隊)然後去消耗它
fmt.Printf("Workder %d received %d \n", id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go workder(id, c)
return c
}
func main() {
//var c1, c2 chan int // c1 and c2 = nil 走的default
// 從c1或c2裏面搜 誰來的快從誰裏取 ,實現非阻塞式的獲取就用select
var c1, c2 = generator(), generator()
workder := createWorker(0)
n := 0
//hasValue := false
var values []int //緩存數據
//10秒退出
tm := time.After(20 * time.Second)
//每秒看一下隊列的長度
tick := time.Tick(time.Second)
for {
var activeWorker chan<- int
var activeValue int
if len(values) > 0 {
activeWorker = workder
activeValue = values[0]
}
select {
case n = <-c1:
//hasValue = true
values = append(values, n)
case n = <-c2:
//hasValue = true
values = append(values, n)
case activeWorker <- activeValue: //activeValue有值纔去送數據
//hasValue = false
values = values[1:]
case <-tick:
fmt.Println("queue len = ", len(values))
//fmt.Printf("values= %v\n",values)
case <-time.After(800 * time.Millisecond):
//數據生的太慢 800毫秒沒有數據提示超時
fmt.Println("timeout")
case <-tm:
//10秒後退出,timer
fmt.Println("bye")
return
}
}
}
傳統同步方式
互斥量的使用
package main
import (
"fmt"
"time"
)
type atomicInt int //線程安全的
func (a *atomicInt) increment() {
*a++
}
func (a *atomicInt) get() int {
return int(*a)
}
func main() {
var a atomicInt
a.increment()
go func() {
a.increment()
}()
time.Sleep(time.Millisecond)
fmt.Println(a)
}
我們可以在命令行下去查看運行結果,實際上會提示資源DATA Race
go run --raise atomic.go
package main
import (
"fmt"
"sync"
"time"
)
type atomicInt struct {
value int
lock sync.Mutex
} //線程安全的
func (a *atomicInt) increment() {
a.lock.Lock()
defer a.lock.Unlock()
a.value++
}
func (a *atomicInt) get() int {
a.lock.Lock()
defer a.lock.Unlock()
return a.value
}
func main() {
var a atomicInt
a.increment()
go func() {
a.increment()
}()
time.Sleep(time.Millisecond)
fmt.Println(a.get()) // a.value時還會報錯data race
}
如果我們想讓一塊代碼區加鎖,我們可以如下
func (a *atomicInt) increment() {
func() {
a.lock.Lock()
defer a.lock.Unlock()
a.value++
}()
}
傳遞同步機制都是通過共享內存來實現同步,我們要通過通信來實現共享內存
主題測試文章,只做測試使用。發佈者:Walker,轉轉請注明出處:https://walker-learn.xyz/archives/6739
