004
goroutine
package main
import (
"fmt"
"time"
)
func main() {
for i:=0;i<10;i++{
go func(i int) {
fmt.Printf("Hello from goroutine %d \n",i) // i/o操作可以交出控制权,手动操作交出控制权runtime.Gosched()
}(i)
}
// main直接退出了,还没来得急执行协程
time.Sleep(time.Millisecond)
}
- 轻量级“线程”
- 非抢占式多任务处理,由协程主动交出控制权(线程是抢占式任务处理)
- 编译器/解释器/虚拟机层面的多任务
- 多个协程可能在一个或多个线程上运行
- 手动交出控制权
runtime.Gosched()
子程序是协程的一个特例,协程是双向的
- 在函数前加上 go 关键字,就能送给调度器运行
- 不需要 在定义时区分是否是异步函数
- 调度器在合适的点进行切换
- 使用 race 来检测数据访问冲突
goroutine 可能的切换点
- I/O,select
- channel
- 等待锁
- 函数调用
- runtime.Gosched() 手动
channel
goroutine 和 goroutine 之间可以开出很多个双向通道,它是 goroutine 和 goroutine 之间的交互。有发,就得有收,要不就会卡在那(deadlock)。
package main
import (
"fmt"
"time"
)
func worker(c chan int) {
for{
n:=<-c
fmt.Println(n)
}
}
func chanDemo() {
c:=make(chan int) //var c chan int 这只是定义了一个c,这是一个chan类型内部值是int的变量,并没有生成一个chan 要生成,需要用make
//用go关键词
/*go func() {
for{
n:=<-c //收数据
fmt.Println(n)
}
}()*/
go worker(c)
c<-1 //有发就得有收
c<-2
time.Sleep(time.Millisecond) //简单发一个sleep这样这两个值都打印出来,要不c还没打印,main函数已经结束,收的2还来不及打印
}
func main() {
chanDemo()
}
package main
import (
"fmt"
"time"
)
func worker(id int ,c chan int) {
for{
fmt.Printf("Workder %d received %d\n",id,<-c)
}
}
func chanDemo() {
c:=make(chan int)
go worker(0,c)
c<-1 //有发就得有收
c<-2
time.Sleep(time.Millisecond)
}
func main() {
chanDemo()
}
package main
import (
"fmt"
"time"
)
func worker(id int ,c chan int) {
for{
fmt.Printf("Workder %d received %c\n",id,<-c)
}
}
func chanDemo() {
//c:=make(chan int)
//go worker(0,c)
var channels [10]chan int
for i:=0;i<10;i++{
channels[i]=make(chan int)
go worker(i,channels[i])
}
//c<-1 //有发就得有收
//c<-2
for i:=0;i<10;i++{
channels[i] <- 'a'+i
}
for i:=0;i<10;i++{
channels[i] <- 'A'+i
}
time.Sleep(time.Millisecond)
}
func main() {
chanDemo()
}
channel 做为返回值,返回一个 channel
package main
import (
"fmt"
"time"
)
//
func createWorker(id int) chan int {
c:=make(chan int)
//真正的worker
go func() {
for{
fmt.Printf("Workder %d received %c\n",id,<-c)
}
}()
return c
}
func chanDemo() {
var channels [10]chan int
for i:=0;i<10;i++{
channels[i]=createWorker(i)
}
//c<-1 //有发就得有收
//c<-2
for i:=0;i<10;i++{
channels[i] <- 'a'+i
}
for i:=0;i<10;i++{
channels[i] <- 'A'+i
}
time.Sleep(time.Millisecond)
}
func main() {
chanDemo()
}
有时候我们要明确收发功能,所以我们可以在返回类型上加箭头,(可能收,可能发,注意位置)chan<-收数据 <-chan 发数据,所以func createWorker(id int) chan<- int { 和 var channels [10]chan<- int 改成有箭头
bufferedChan
我们建立 channel 有发就得有收,要不程序就会 deadlock,如下
func bufferedChannel(){
c:=make(chan int)
c <-1
}
func main(){
bufferedChannel() //程序挂掉,因为发了没人收
}
加入缓存区
func bufferedChannel(){
c:=make(chan int,3)
c <-1
c <-2
c <-3
c <-4 //发4会报错,没人收,缓存大小是3
}
func main(){
bufferedChannel() //程序挂掉,因为发了没人收
}
package main
import (
"fmt"
"time"
)
func createWorker(id int) chan<- int {
c:=make(chan int)
//真正的worker
go func() {
for{
fmt.Printf("Workder %d received %c\n",id,<-c)
}
}()
return c
}
func bufferedChannel(){
c:=createWorker(0)
c <-97
c <-98
c <-99
c <-100
time.Sleep(time.Millisecond)
}
func main() {
bufferedChannel()
}
channel 是可以 close 掉的,可以通知发完了(由发送方来完成 close)
func channelClose() {
// 数据有明确的结尾的话,发送方可以通知close
c:=createWorker(0)
c <-97
c <-98
c <-99
c <-100
close(c) //关闭 一旦close 接收方就会收到0值 空串或0
time.Sleep(time.Millisecond) //收一毫秒的空值
}
// 那我们接收方也要做一个判断,如果有值就收没值就退出呗
func createWorker(id int) chan<- int {
c:=make(chan int)
//真正的worker
go func() {
for{
n,ok:=<-c
if !ok {
break
}
fmt.Printf("Workder %d received %c\n",id,n)
}
}()
return c
}
// 还有一种方法,我们可以通过range来判断是否结束了
func createWorker(id int) chan<- int {
c := make(chan int)
//真正的worker
go func() {
for n := range c {
fmt.Printf("Workder %d received %c\n", id, n)
}
}()
return c
}
channel buffered channel 发送方 close() 接收方两种判断结束的方式
n,ok:=<-c或rangeCSP 模型不要能过共享内存来通信,通过通信来共享内存
等待任务
package main
import (
"fmt"
)
//创建结构
type workder struct {
in chan int
done chan bool
}
//
func doWork(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("Workder %d received %c \n", id, n)
done <- true //输出一个完成true
}
}
func createWroker(id int) workder {
w := workder{
in: make(chan int),
done: make(chan bool),
}
go doWork(id, w.in, w.done)
return w
}
func chanDemo() {
var workers [10]workder
for i := 0; i < 10; i++ {
workers[i] = createWroker(i)
}
for i := 0; i < 10; i++ {
workers[i].in <- 'a' + i
<-workers[i].done //读状态出来
}
for i := 0; i < 10; i++ {
workers[i].in <- 'A' + i
<-workers[i].done // 这样写会按顺序执行,读个小写a 输出一个
}
}
func main() {
chanDemo()
}
希望是把所有任务都发出去后,再来等 20 done
package main
import (
"fmt"
)
//创建结构
type workder struct {
in chan int
done chan bool
}
//
func doWork(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("Workder %d received %c \n", id, n)
go func() {
done <- true //输出一个完成true
}()
}
}
func createWroker(id int) workder {
w := workder{
in: make(chan int),
done: make(chan bool),
}
go doWork(id, w.in, w.done)
return w
}
func chanDemo() {
var workers [10]workder
for i := 0; i < 10; i++ {
workers[i] = createWroker(i)
}
for i := 0; i < 10; i++ {
workers[i].in <- 'a' + i
//<-workers[i].done //读状态出来
}
for i := 0; i < 10; i++ {
workers[i].in <- 'A' + i
//<-workers[i].done // 这样写会按顺序执行,读个小写a 输出一个
}
//等所有发送都完成后再收done的状态
for _,workder:=range workers { //这样会出错,看小写的打印完,打写的没有打印,这是因为所有的channel发都是block的,小写的完了有10个done,没人收又发10个大写的,就循环等待了,有个简单快速的解决方法就是,将done<-true放到另goroutine中去做 ,或者将大小写分开,小写发完,done完再去搞大写
<-workder.done
<-workder.done
}
}
func main() {
chanDemo()
}
WaitGroup
我们再改写一下上个例子
package main
import (
"fmt"
"sync"
)
//创建结构
type workder struct {
in chan int
wg *sync.WaitGroup //共享一个
}
//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
for n := range c {
fmt.Printf("Workder %d received %c \n", id, n)
wg.Done()
}
}
func createWroker(id int, wg *sync.WaitGroup) workder {
w := workder{
in: make(chan int),
wg: wg,
}
go doWork(id, w.in, wg)
return w
}
func chanDemo() {
var wg sync.WaitGroup
var workers [10]workder
for i := 0; i < 10; i++ {
workers[i] = createWroker(i, &wg)
}
wg.Add(20) //我们有20个任务就开20个
for i := 0; i < 10; i++ {
workers[i].in <- 'a' + i
}
for i := 0; i < 10; i++ {
workers[i].in <- 'A' + i
}
wg.Wait()
}
func main() {
chanDemo()
}
再抽象一下 将 wg 封闭到结构的函数中
package main
import (
"fmt"
"sync"
)
//创建结构
type workder struct {
in chan int
done func()
}
//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
for n := range c {
fmt.Printf("Workder %d received %c \n", id, n)
wg.Done()
}
}
func createWroker(id int, wg *sync.WaitGroup) workder {
w := workder{
in: make(chan int),
done: func() {
wg.Done()
},
}
go doWork(id, w.in, wg)
return w
}
func chanDemo() {
var wg sync.WaitGroup
var workers [10]workder
for i := 0; i < 10; i++ {
workers[i] = createWroker(i, &wg)
}
wg.Add(20) //我们有20个任务就开20个
for i := 0; i < 10; i++ {
workers[i].in <- 'a' + i
}
for i := 0; i < 10; i++ {
workers[i].in <- 'A' + i
}
wg.Wait()
}
func main() {
chanDemo()
}
channel 来实现树的遍历
// main.go
func main(){
c:=root.TraverseWithChannel()
maxNode:=0
for node:=range c{
if node.Value>maxNode {
maxNode = node.Value
}
}
fmt.Println("Max node value:", maxNode)
}
// traverse.go
package tree
import "fmt"
//中序遍历 先左在中在右
func (node *Node) Traverse() {
/*if node==nil {
return
}
node.Left.Traverse()
//只能作打印,
node.Print()
node.Right.Traverse()*/
node.TraverseFunc(func(n *Node) {
n.Print()
})
fmt.Println()
}
//改造一下
func (node *Node) TraverseFunc(f func(*Node)) {
if node == nil {
return
}
node.Left.TraverseFunc(f)
f(node)
node.Right.TraverseFunc(f)
}
// 返回一个channel
func (node *Node) TraverseWithChannel() chan *Node{
out:=make(chan *Node)
go func() {
node.TraverseFunc(func(node *Node) {
out <- node
})
close(out) //注意关闭
}()
return out
}
select 的使用
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- i
i++
}
}()
return out
}
func main() {
//var c1, c2 chan int
// 从c1或c2里面搜 谁快搜谁 ,实现非阻塞式的获取就用select
var c1, c2 = generator(), generator()
for {
select {
case n := <-c1:
fmt.Println("Received from c1:", n)
case n := <-c2:
fmt.Println("Received from c2:", n)
default:
fmt.Println("No value received")
}
}
}
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
//随机sleep1500毫秒 使得c1,c2输出的速度不一样
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- i
i++
}
}()
return out
}
func workder(id int, c chan int) {
for n := range c {
fmt.Printf("Workder %d received %d \n", id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go workder(id, c)
return c
}
func main() {
//var c1, c2 chan int // c1 and c2 = nil 走的default
// 从c1或c2里面搜 谁来的快从谁里取 ,实现非阻塞式的获取就用select
var c1, c2 = generator(), generator()
workder := createWorker(0)
n := 0
hasValue := false
for {
var activeWorker chan<- int
if hasValue {
activeWorker = workder
}
select {
case n = <-c1:
hasValue = true
case n = <-c2:
hasValue = true
case activeWorker <- n:
hasValue = false
}
}
}
生成数据和消耗数据速度不一样,会导致有些数据丢失
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
//随机sleep1500毫秒 使得c1,c2输出的速度不一样
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- i
i++
}
}()
return out
}
func workder(id int, c chan int) {
for n := range c {
time.Sleep(2 * time.Second) //这样有些数据没打印出来 新收到的数据会把原来的数据冲掉,我们可以做个数据缓存(排队)然后去消耗它
fmt.Printf("Workder %d received %d \n", id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go workder(id, c)
return c
}
func main() {
//var c1, c2 chan int // c1 and c2 = nil 走的default
// 从c1或c2里面搜 谁来的快从谁里取 ,实现非阻塞式的获取就用select
var c1, c2 = generator(), generator()
workder := createWorker(0)
n := 0
//hasValue := false
var values []int //缓存数据
//10秒退出
tm := time.After(20 * time.Second)
//每秒看一下队列的长度
tick := time.Tick(time.Second)
for {
var activeWorker chan<- int
var activeValue int
if len(values) > 0 {
activeWorker = workder
activeValue = values[0]
}
select {
case n = <-c1:
//hasValue = true
values = append(values, n)
case n = <-c2:
//hasValue = true
values = append(values, n)
case activeWorker <- activeValue: //activeValue有值才去送数据
//hasValue = false
values = values[1:]
case <-tick:
fmt.Println("queue len = ", len(values))
//fmt.Printf("values= %v\n",values)
case <-time.After(800 * time.Millisecond):
//数据生的太慢 800毫秒没有数据提示超时
fmt.Println("timeout")
case <-tm:
//10秒后退出,timer
fmt.Println("bye")
return
}
}
}
传统同步方式
互斥量的使用
package main
import (
"fmt"
"time"
)
type atomicInt int //线程安全的
func (a *atomicInt) increment() {
*a++
}
func (a *atomicInt) get() int {
return int(*a)
}
func main() {
var a atomicInt
a.increment()
go func() {
a.increment()
}()
time.Sleep(time.Millisecond)
fmt.Println(a)
}
我们可以在命令行下去查看运行结果,实际上会提示资源DATA Race
go run --raise atomic.go
package main
import (
"fmt"
"sync"
"time"
)
type atomicInt struct {
value int
lock sync.Mutex
} //线程安全的
func (a *atomicInt) increment() {
a.lock.Lock()
defer a.lock.Unlock()
a.value++
}
func (a *atomicInt) get() int {
a.lock.Lock()
defer a.lock.Unlock()
return a.value
}
func main() {
var a atomicInt
a.increment()
go func() {
a.increment()
}()
time.Sleep(time.Millisecond)
fmt.Println(a.get()) // a.value时还会报错data race
}
如果我们想让一块代码区加锁,我们可以如下
func (a *atomicInt) increment() {
func() {
a.lock.Lock()
defer a.lock.Unlock()
a.value++
}()
}
传递同步机制都是通过共享内存来实现同步,我们要通过通信来实现共享内存
主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6739
