Go Senior Engineer Lecture (MOOC) 004

004

goroutine

package main

import (
    "fmt"
    "time"
)

func main() {
    for i:=0;i<10;i++{
        go func(i int) {
            fmt.Printf("Hello from goroutine %d \n",i) // I/O operations can yield control, manual operations yield control with runtime.Gosched()
        }(i)
    }
    // main exited directly, before goroutines had a chance to execute
  time.Sleep(time.Millisecond)
}
  • Lightweight "threads"
  • Non-preemptive multitasking, where goroutines actively yield control (threads are preemptive multitasking)
  • Multitasking at the compiler/interpreter/VM level
  • Multiple goroutines may run on one or more threads
  • Manually yield control with runtime.Gosched()

Subroutines are a special case of coroutines; coroutines are bidirectional.

Threads and Goroutines
goroutine coroutine
Which goroutine runs on which thread is managed by the scheduler, no need to worry about it.

  • Add the go keyword before a function to submit it to the scheduler for execution
  • No need to distinguish whether a function is asynchronous at definition time
  • The scheduler switches at appropriate points
  • Use race detector to detect data access conflicts

Possible goroutine switching points

  • I/O, select
  • channel
  • Waiting for a lock
  • Function call
  • runtime.Gosched() (manual)

channel

Many bidirectional channels can be opened between goroutines; they are for interaction between goroutines. If you send, you must receive, otherwise it will get stuck (deadlock).

package main

import (
    "fmt"
    "time"
)

func worker(c chan int)  {
    for{
        n:=<-c
        fmt.Println(n)
    }
}

func chanDemo()  {
    c:=make(chan int) //var c chan int This only defines c as a variable of type chan with int internal values, but does not create a channel. To create one, use make.
    // using the go keyword
    /*go func() {
        for{
            n:=<-c // receive data
            fmt.Println(n)
        }
    }()*/
    go worker(c)
    c<-1 // if you send, you must receive
    c<-2
    time.Sleep(time.Millisecond) // simply add a sleep so both values are printed, otherwise c might not print before main function ends, and the received 2 might not have time to print
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %d\n",id,<-c)
    }
}

func chanDemo()  {
    c:=make(chan int)
    go worker(0,c)
    c<-1 // if you send, you must receive
    c<-2
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}
package main

import (
    "fmt"
    "time"
)

func worker(id int ,c chan int)  {
    for{
        fmt.Printf("Workder %d received %c\n",id,<-c)
    }
}

func chanDemo()  {
    //c:=make(chan int)
    //go worker(0,c)
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=make(chan int)
        go worker(i,channels[i])
    }
    //c<-1 // if you send, you must receive
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

Channel as a return value, returning a channel

package main

import (
    "fmt"
    "time"
)

//
func createWorker(id int) chan int  {
    c:=make(chan int)
    // the actual worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func chanDemo()  {
    var channels [10]chan int
    for i:=0;i<10;i++{
        channels[i]=createWorker(i)
    }
    //c<-1 // if you send, you must receive
    //c<-2
    for i:=0;i<10;i++{
        channels[i] <- 'a'+i
    }
    for i:=0;i<10;i++{
        channels[i] <- 'A'+i
    }
    time.Sleep(time.Millisecond)
}

func main() {
    chanDemo()
}

Sometimes we want to explicitly define send/receive capabilities, so we can add arrows to the return type (can receive, can send, pay attention to position). chan<- receives data, <-chan sends data. So func createWorker(id int) chan<- int { and var channels [10]chan<- int are changed to have arrows.

bufferedChan

When we create a channel, if you send, you must receive, otherwise the program will deadlock, as shown below:

func bufferedChannel(){
    c:=make(chan int)
    c <-1
}
func main(){
    bufferedChannel() // program crashes because something was sent but no one received it
}

Adding a buffer

func bufferedChannel(){
    c:=make(chan int,3)
    c <-1
    c <-2
    c <-3
    c <-4 // sending 4 will cause an error, no one receives, buffer size is 3
}
func main(){
    bufferedChannel() // program crashes because something was sent but no one received it
}
package main

import (
    "fmt"
    "time"
)

func createWorker(id int) chan<- int  {
    c:=make(chan int)
    // the actual worker
    go func() {
        for{
            fmt.Printf("Workder %d received %c\n",id,<-c)
        }
    }()
    return c
}

func bufferedChannel(){
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    time.Sleep(time.Millisecond)
}

func main() {
    bufferedChannel()
}

Channels can be closed to signal that sending is complete (done by the sender).

func channelClose()  {
// If the data has a clear end, the sender can signal close
    c:=createWorker(0)
    c <-97
    c <-98
    c <-99
    c <-100
    close(c) // close. Once closed, the receiver will get a zero value (empty string or 0)
    time.Sleep(time.Millisecond) // receive zero value for one millisecond
}
// So our receiver also needs to make a judgment: if there's a value, receive it; if not, exit.
func createWorker(id int) chan<- int  {
    c:=make(chan int)
    // the actual worker
    go func() {
        for{
            n,ok:=<-c
            if !ok {
                break
            }
            fmt.Printf("Workder %d received %c\n",id,n)

        }
    }()
    return c
}
// Another method is to use range to determine if it has ended
func createWorker(id int) chan<- int {
    c := make(chan int)
    // the actual worker
    go func() {
        for n := range c {
            fmt.Printf("Workder %d received %c\n", id, n)
        }
    }()
    return c
}

Channel buffered channel sender close() receiver two ways to determine end: n,ok:=<-c or range CSP model
Do not communicate by sharing memory; instead, share memory by communicating.

Waiting for Tasks

package main

import (
    "fmt"
)

// Create struct
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        done <- true // output a completion true
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        <-workers[i].done // read out the status
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        <-workers[i].done // This way, it will execute in order, reading a lowercase 'a' and outputting one.
    }
}

func main() {
    chanDemo()
}

The hope is to send all tasks first, then wait for 20 done signals.

package main

import (
    "fmt"
)

// Create struct
type workder struct {
    in   chan int
    done chan bool
}

//
func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        go func() {
            done <- true // output a completion true
        }()
    }
}
func createWroker(id int) workder {
    w := workder{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        //<-workers[i].done // read out the status
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        //<-workers[i].done // This way, it will execute in order, reading a lowercase 'a' and outputting one.
    }
     // Wait for all sends to complete before receiving the done status
    for _,workder:=range workers { // This will cause an error. You'll see the lowercase letters printed, but not the uppercase ones. This is because all channel sends are blocking. After the 10 lowercase letters are done, there are 10 'done' signals, but no one receives them. Then 10 uppercase letters are sent, leading to a deadlock. A simple and quick solution is to put `done<-true` into another goroutine, or separate the lowercase and uppercase sends: finish lowercase, wait for done, then do uppercase.
        <-workder.done
        <-workder.done
    }
}

func main() {
    chanDemo()
}

WaitGroup

Let's rewrite the previous example.

package main

import (
    "fmt"
    "sync"
)

// Create struct
type workder struct {
    in chan int
    wg *sync.WaitGroup // share one
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        wg: wg,
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) // We have 20 tasks, so add 20
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

Let's abstract it further by encapsulating wg within the struct's function.

package main

import (
    "fmt"
    "sync"
)

// Create struct
type workder struct {
    in chan int
    done func()
}

//
func doWork(id int, c chan int, wg *sync.WaitGroup) {
    for n := range c {
        fmt.Printf("Workder %d received %c \n", id, n)
        wg.Done()
    }
}
func createWroker(id int, wg *sync.WaitGroup) workder {
    w := workder{
        in: make(chan int),
        done: func() {
            wg.Done()
        },
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {
    var wg sync.WaitGroup
    var workers [10]workder
    for i := 0; i < 10; i++ {
        workers[i] = createWroker(i, &wg)
    }

    wg.Add(20) // We have 20 tasks, so add 20
    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    chanDemo()
}

Channel for Tree Traversal

// main.go
func main(){
    c:=root.TraverseWithChannel()
    maxNode:=0
    for node:=range c{
        if node.Value>maxNode {
            maxNode = node.Value
        }
    }
    fmt.Println("Max node value:", maxNode)
}
// traverse.go
package tree

import "fmt"

// In-order traversal: Left -> Root -> Right
func (node *Node) Traverse() {
    /*if node==nil {
        return
    }
    node.Left.Traverse()
    // only for printing,
    node.Print()
    node.Right.Traverse()*/

    node.TraverseFunc(func(n *Node) {
        n.Print()
    })
    fmt.Println()
}

// Refactor it
func (node *Node) TraverseFunc(f func(*Node)) {
    if node == nil {
        return
    }
    node.Left.TraverseFunc(f)
    f(node)
    node.Right.TraverseFunc(f)
}
// Return a channel
func (node *Node) TraverseWithChannel() chan *Node{
    out:=make(chan *Node)
    go func() {
        node.TraverseFunc(func(node *Node) {
            out <- node
        })
        close(out) // note the close
    }()
    return out
}

Using select

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func main() {
    //var c1, c2 chan int
    //    Search from c1 or c2, whichever is faster. To achieve non-blocking retrieval, use select.
    var c1, c2 = generator(), generator()
    for {
        select {
        case n := <-c1:
            fmt.Println("Received from c1:", n)
        case n := <-c2:
            fmt.Println("Received from c2:", n)
        default:
            fmt.Println("No value received")

        }
    }

}
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            // Randomly sleep for 1500 milliseconds to make c1 and c2 output at different speeds
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil, goes to default
    //    Search from c1 or c2, whichever comes faster. To achieve non-blocking retrieval, use select.
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    hasValue := false
    for {
        var activeWorker chan<- int
        if hasValue {
            activeWorker = workder
        }
        select {
        case n = <-c1:
            hasValue = true

        case n = <-c2:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }

    }

}

Different speeds of data generation and consumption can lead to some data loss.

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            // Randomly sleep for 1500 milliseconds to make c1 and c2 output at different speeds
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func workder(id int, c chan int) {
    for n := range c {
        time.Sleep(2 * time.Second) // This way, some data might not be printed. Newly received data will overwrite old data. We can create a data cache (queue) and then consume it.
        fmt.Printf("Workder %d received %d \n", id, n)
    }
}
func createWorker(id int) chan<- int {
    c := make(chan int)
    go workder(id, c)
    return c
}

func main() {
    //var c1, c2 chan int // c1 and c2 = nil, goes to default
    //    Search from c1 or c2, whichever comes faster. To achieve non-blocking retrieval, use select.
    var c1, c2 = generator(), generator()
    workder := createWorker(0)
    n := 0
    //hasValue := false
    var values []int // cache data
    // exit after 10 seconds
    tm := time.After(20 * time.Second)
    // check queue length every second
    tick := time.Tick(time.Second)
    for {
        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = workder
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            //hasValue = true
            values = append(values, n)
        case n = <-c2:
            //hasValue = true
            values = append(values, n)
        case activeWorker <- activeValue: // send data only if activeValue has a value
            //hasValue = false
            values = values[1:]
        case <-tick:
            fmt.Println("queue len = ", len(values))
            //fmt.Printf("values= %v\n",values)
        case <-time.After(800 * time.Millisecond):
            // Data generation is too slow. Timeout if no data for 800 milliseconds.
            fmt.Println("timeout")
        case <-tm:
            // exit after 10 seconds, timer
            fmt.Println("bye")
            return

        }

    }

}

Traditional Synchronization Methods

Using Mutexes

package main

import (
    "fmt"
    "time"
)

type atomicInt int // thread-safe

func (a *atomicInt) increment()  {
    *a++
}

func (a *atomicInt) get() int {
    return int(*a)
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a)
}

We can check the running result in the command line; it will actually prompt for a resource DATA Race.

 go run --raise atomic.go
package main

import (
    "fmt"
    "sync"
    "time"
)

type atomicInt struct {
    value int
    lock sync.Mutex
} // thread-safe

func (a *atomicInt) increment()  {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.value++
}

func (a *atomicInt) get() int {
    a.lock.Lock()
    defer a.lock.Unlock()
    return a.value
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get()) // will still report data race when accessing a.value
}

If we want to lock a block of code, we can do it as follows:

func (a *atomicInt) increment() {
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        a.value++
    }()
}

All synchronization mechanisms are implemented by sharing memory; we should achieve shared memory through communication.

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://walker-learn.xyz/archives/6739

(0)
Walker的头像Walker
上一篇 13 hours ago
下一篇 17 hours ago

Related Posts

EN
简体中文 繁體中文 English