본문 바로가기

Go 언어 공부

[GO 마스터하기] 09-동시성2

package main

import (
   "fmt"
   "os"
   "strconv"
   "sync"
)

func main() {
   numGR, _ := strconv.Atoi(os.Args[1])
   var waitGroup sync.WaitGroup
   var i int

   k := make(map[int]int)
   k[1] = 12

   for i = 0; i < numGR; i++ {
      waitGroup.Add(1)
      go func() {
         defer waitGroup.Done()
         k[i] = i
      }()
   }
   k[2] = 10
   waitGroup.Wait()
   fmt.Printf("k = %#v\n", k)
}

Go 스케줄러

스케줄러는 현재 사용할 수 있는 리소스만으로 처리할 수 있는 일의 양을 효율적으로 분산하는 역할을 담당한다. Go 런타임은 m:n 스케줄러를 이용해 고루틴을 스케줄링한다.

Go 런타임은 fork-join concurrency 모델을 사용한다. 이 모델의 포크 부분은 프로그램을 실행하다가 임의의 시점에 자식 브랜치를 생성하는 것을 표현한다. 마찬가지로 Go 동시성 모델에서 조인 부분은 자식 브랜치가 끝나서 부모와 합류하는 것을 나타낸다. sync.Wait() 문장과 고루틴의 결과를 수집한 채널이 조인 지점이 된다. 고루틴이 새로 생성되면 자식 브랜치가 생성된다.

fair scheduling strategy는 모든 부하를 현재 사용할 수 있는 프로세서로 고르게 나누는 방식이다. 분산 태스크의 상당 수는 다른 태스크에 의존하기 때문에 간단하지만은 않다. 따라서 결국 충분히 활용되지 않거나 다른 프로세서보다 더 많이 사용하는 프로세서가 존재하게 된다.

고루틴은 일종의 태스크인 반면, 고루틴을 호출하는 문장은 모두 컨티뉴에이션이다. 스케줄러에서 사용하는 work stealing strategy에 따르면, 충분히 활용되지 않고 있는 프로세서에게 줄 작업을 다른 프로세서에서 찾는다. 이에 맞는 작업을 발견하면 그 프로세서부터 훔쳐 오기 때문에 작업 훔치기 전략이라 불린다. 작업 훔치기 알고리즘은 컨티뉴에이션을 훔쳐서 큐에 저장한다. stalling join이란 이름에서 알수 있듯이, 스레드의 실행이 조인에서 멈춰서 다른 할 일을 찾기 시작하는 지점이다. 작업 훔치기와 컨티뉴에이션 훔치기는 모든 스톨링 조인이 발생하지만, 작업 보다는 컨티뉴에이션에서 더 많이 발생한다.

컨티뉴에이션 훔치기의 가장 큰 단점은 프로그래밍 언어의 컴파일러에서 부가적인 작업을 해야하는 것이다. 장점은 함수만 사용하거나 여러 고루틴으로 구성된 스레드 하나만 사용할 때 결과가 같다는 것이다.

GO에서는 m개의 고루틴이 구동하면, n개의 os 스레드에서 스케줄링 대상이된다. 이때 논리적인 프로세서를 최대 GOMAXPROCS개 사용한다.

스케줄러는 크게 세 종류의 개체, 즉, 현재 사용하는 스레드, 고루틴, 논리적인 프로세서에 작동된다.

이 그림을 보면 두가지 큐가 있다는 것을 알 수있다. 글로벌 ㅋ에 있는 고루틴을 실행하려면 논리적인 프로세서의 큐에서 할당해야한다. 따라서 스케줄러는 각각의 논리적인 프로세서의 로컬 큐에만 존재하는 고루틴을 실행시키지 않도록 항상 글로벌 큐도 확인한다. 하지만 항사 ㅇ검사할 필요는 없다, 다시 말해 로컬 큐보다 항상 우선순위가 높지 않다는 뜻이다. 또한 각각의 논리적인 프로세서마다 여러 개의 스레드를 가지고 있기 때문에, 현재 사용할 수 있느느 논리적인 프로세서에 대한 로컬 큐 사이에 훔치기가 발생할 수 있다.

마지막으로 스케줄러는 필요에 따라 OS 스레드를 더 생성할 수 있다.

명심할 점은, 프로그램에서 고루틴을 많이 사용한다고 해서 성능이 무조건 향상되지 않다는 것이다. 고루틴의 수도 많고, sync 함수들을 많이 호출하면 스케줄러에서 처리해야 할 부가적인 작업이 늘어나 프로그램 속도가 느려진다.

GOMAXPROCS

이 환경 변수를 이용하면 유저 레벨 Go 코드를 동시에 실행 할 수 있는 OS 스레드의 수를 제한 할 수 있다. GOMAXPROCS에 대한 디폴트 값은 현재 사용하는 머신의 코어 수에 따라 결정된다.

현재 사용하는 머신의 코 수보다 저은 값을 GOMAXPROCS에 할당하면 프로그램 성능에 영향을 미칠 수 있다. 반대의 경우는 프로그램 속도가 반드시 빨라진다는 법도 없다.

select 키워드

이 키워드는 상당히 강력한 기능을 제공하며 적용할 수 있는 상황도 굉장히 다양하다. 채널에 대한 switch문이라 생각하면 된다. 실제로 select를 이용해 고루틴의 여러 개의 통신 연산을 기다리게 할 수 있다. 가장 큰 장점은 하나의 select 블록에서 여러 개의 채널을 다룰 수 있다는 것이다.

  • select 키워드로 채널을 다룰 때 발생할 수 있는 가장 큰 문제는 deadlock이다.
package main

import (
    "fmt"
    "math/rand"
    "os"
    "strconv"
    "time"
)

func gen(min, max int, createNumber chan int, end chan bool) {
    for {
        select {
        case createNumber <- rand.Intn(max-min) + min:
        case <-end:
            close(end)
            return
        case <-time.After(4 *time.Second):
            fmt.Println("\ntime.After()!")
        }
    }
}

func main() {
    rand.Seed(time.Now().Unix())
    createNumber := make(chan int)
    end := make(chan bool)

    n, _ := strconv.Atoi(os.Args[1])
    fmt.Printf("Going to created %d random numbers\n", n)
    go gen(0, 2*n, createNumber, end)

    for i := 0; i <n; i++ {
        fmt.Printf("%d ", <-createNumber)
    }

    time.Sleep(5 * time.Second)
    fmt.Println("Bye")
    end <- true
}
$ go run foo.go 10
Going to created 10 random numbers
7 12 9 3 13 9 19 1 9 3 
time.After()!
Bye

select 블록은 크게 세가지 case문으로 작성된다. time.After() 케이스는 일정한 시간이 지나면 리턴한다. 따라서 다른 채널이 블록됐다면 select문에 대한 블록도 해제한다.

select문은 순차적으로 실행되지 않는다. 모든 채널을 동시에 확인한다. select문에 있는 채널 중 어떤 것도 사용할 수 있는 상태가 아니라면 어느 한 채널이 사용 가능한 상태가 될 때까지 블록된다. select 문에서 사용 가능한 상태의 채널이 여러개라면 런타임은 그 중 하나를 임의로 선택한다.

고루틴 만료시키기

첫 번째 방법

package main

import (
    "fmt"
    "time"
)

func main() {
    c1 := make(chan string)
    go func() {
        time.Sleep(time.Second * 3)
        c1 <- "c1 ok"
    }()

    select {
    case res := <-c1:
        fmt.Println(res)
    case <- time.After(time.Second):
        fmt.Println("timeout c1")
    }

    c2 := make(chan string)
    go func() {
        time.Sleep(3 * time.Second)
        c2 <- "c2 ok"
    }()

    select {
    case res := <-c2:
        fmt.Println(res)
    case <- time.After(4 * time.Second):
        fmt.Println("timeout c2")
    }
}
$ go run foo.go   
timeout c1
c2 ok

select 문 + time.After()로 고루틴을 종료시킬 수 있다.

두번째 방법

package main

import (
    "fmt"
    "os"
    "strconv"
    "sync"
    "time"
)

func timeout(w *sync.WaitGroup, t time.Duration) bool {
    temp := make(chan int)
    go func() {
        time.Sleep(5 * time.Second)
        defer close(temp)
        w.Wait()
    }()

    select {
    case <-temp:
        return false
    case <-time.After(t):
        return true
    }
}

func main() {
    var w sync.WaitGroup
    w.Add(1)

    t, _ := strconv.Atoi(os.Args[1])
    duration := time.Duration(int32(t)) * time.Millisecond
    fmt.Printf("Timeout period is %s\n", duration)

    if timeout(&w, duration) {
        fmt.Println("Timed out")
    } else {
        fmt.Println("ok")
    }

    w.Done()
    if timeout(&w, duration) {
        fmt.Println("Timed out")
    } else {
        fmt.Println("ok")
    }
}
$ go run foo.go 10000
Timeout period is 10s
Timed out
ok

Go 채널

채널 타입에서 0에 해당하는 값은 nil이다. 그래서 닫힌 채널에 메시지를 보내면 프로그램이 뻗는다. 하지만 닫힌 채널에서 읽을 대는 그 채널에 지정한 타입의 0에 해당하는 값을 받게 된다. 따라서 채널을 닫은 뒤에는 더 이상 쓸 수는 없지만, 읽기 연산은 여전히 수행할 수 있다.

채널을 닫을 수 있게 하려면, 그 채널을 수신 전용으로 지정하면 안 된다. 또한 nil채널은 항상 블록된다. 다시 말해 nil 채널을 읽거나 쓰면 블록된다. 채널의 이러한 속성은 select문에서 채널 변수에 nil 값을 할당함으로써 해당 브랜치로 가지 못하게 유용하다.

package main

func main() {
    var c chan string
    close(c)
}
$ go run foo.go      
panic: close of nil channel

goroutine 1 [running]:
main.main()
        /Users/imac/Workspace/golang/src/github.com/callistaenterprise/goblog/foo.go:5 +0x2a
exit status 2

시그널 채널

시그널을 보내는 용도로만 사용하는 채널이다. 쉽게 말해, 누군가에게 뭔가를 알려줄 때 시그널 채널을 사용한다.

버퍼 채널

고 스케줄러에서 더 많은 요청을 처리할 수 있도록 작업을 큐에 재빨리 저장할 대 이 타입의 채널을 사용한다. 또한 버퍼 채널을 세마포어처럼 사용해 애플리케이션의 처리량을 제한할 수 있다.

원리는 다음과 같다 : 들어온 요청은 모두 채널에 전달되고, 각각을 하나씩 처리한다. 채널이 어떤 요청에 대한 처리 작업을 끝내면 호출한 측으로 새로운 작업을 처리할 준비가 됐다는 메시지를 보낸다. 따라서 채널에서 사용하는 버퍼의 크기에 따라 동시에 처리할 수 있는 요청의 수가 결정된다.

package main

import "fmt"

func main() {
    numbers := make(chan int, 5)
    counter := 10

    for i := 0; i < counter; i++ {
        select {
        case numbers <- i:
        default:
            fmt.Println("Not enough space for", i)
        }
    }
    for i := 0; i < counter; i++ {
        select {
        case num := <-numbers:
            fmt.Println(num)
        default:
            fmt.Println("Nothing~")
            break
        }
    }
}

numbers 채널에 10개의 정수를 집어 넣는데 5개의 정수만 가질 수 있기 때문에 나머지 대해서는 오류 메시지를 출력한다.

닐 채널

닐 채널은 항상 블록되기 때문에 특수한 종류의 채널로 분류한다.

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func add(c chan int) {
    sum := 0 
    t := time.NewTimer(time.Second) 

    for {
        select {
        case input := <-c:
            sum += input 
        case <-t.C:
            c = nil 
            fmt.Println(sum)
        }
    }
}

func send(c chan int) {
    for {
        c <- rand.Intn(10)
    }
}

func main() {
    c := make(chan int)
    go add(c)
    go send(c) 

    time.Sleep(3 * time.Second)
}
$ go run foo.go 
11625088

채널에 대한 채널

특수한 종류의 채널 변수로써 채널을 다룬다. 하지만 채널에 대한 채널이라도 데이터 타입은 여전히 선언해야 한다.

c1 := make(chan chan int)
package main

import (
    "fmt"
    "os"
    "strconv"
    "time"
)

func f1(cc chan chan int, f chan bool) {
    c := make(chan int)
    cc <- c
    defer close(c)

    sum := 0
    select {
    case x := <-c:
        for i := 0; i <= x; i++ {
            sum += i
        }
        c <- sum
    case <-f:
        return
    }
}

func main() {
    times, _ := strconv.Atoi(os.Args[1])
    cc := make(chan chan int)

    for i := 1; i < times+1; i++ {
        f := make(chan bool)
        go f1(cc, f)
        ch := <-cc 
        ch <- i 
        for sum := range ch {
            fmt.Print("Sum(", i, ")=", sum) 
        }
        fmt.Println()
        time.Sleep(time.Second)
        close(f) 
    }
}
$ go run foo.go 4
Sum(1)=1
Sum(2)=3
Sum(3)=6
Sum(4)=10

f1()에서는 먼저 일반적인 int 채널을 선언한 후에, 이를 채널에 대한 채널 변수로 전달한다. 그리고 나서 일반 채널로부터 데이터를 읽거나 시그널 채널인 f를 사용해 함수를 종료하는 select문을 작성한다. c 채널로부터 값을 하나 읽으면 포문을 돌아 합을 c int채널로 보낸다.

여기서 나온 f 채널은 시그널 채널로, 실제 작업이 끝날 때 고루틴을 종료하는데 사용한다. ch := ←cc라는 문장을 통해 채널에 대한 채널 변수로부터 일반 채널을 받아와서 ch ← i 를 이용해 이 채널에 int 값을 보낸다. 그런 다음, 포 문에서 채널에 들어온 값을 읽는다.

고루틴 실행 순서 지정하기

package main

import (
    "fmt"
    "time"
)

func A(a, b chan struct{}) {
    <-a
    fmt.Println("A()!")
    time.Sleep(time.Second)
    close(b)
}

func B(a, b chan struct{}) {
    <-a
    fmt.Println("B()!")
    close(b)
}

func C(a chan struct{}) {
    <-a
    fmt.Println("C()!")
}

func main() {
    x := make(chan struct{})
    y := make(chan struct{})
    z := make(chan struct{})

    go C(z)
    go A(x,y)
    go C(z)
    go B(y, z)
    go C(z)

    close(x)
    time.Sleep(3 * time.Second)
}
$ go run foo.go 4
A()!
B()!
C()!
C()!
C()!

A() 함수는 매개변수 a에 저장된 채널에 의해 블록된다. Main() 함수에서 이 채널의 블록 상태가 풀리면 이 함수가 실행되기 시작된다. 마지막으로 b 채널을 닫느데, 이렇게 하면 B의 블록 상태가 풀린다.

공유 메모리와 공유 변수

공유 메모리와 공유 변수는 스레드끼리 서로 토인하는데 가장 흔히 사용하는 방식이다. 뮤텍스 변수는 주로 스레드를 동기화하고 여러 스레드가 동시에 공유 데이터를 동시에 쓸 때 이를 보호하기 위한 목적으로 주로 사용한다. 뮤텍스는 마치 크기가 1인 버퍼채널처럼 작동한다. 그래서 공유 변수에 접근할 수 있는 고루틴 수는 최대 하나이다. 다시 말해 두개 이상의 고루틴이 이 변수를 동시에 업데이트 할 수 없다.

크리티컬 섹션이란 동시성 프로그램 코드 중에서 여러 프로세스, 스레드, 고루틴 등에서 동시에 실행할 수 없는 영역이다. 이 영역은 뮤텍스로 보호해야한다.

  • 두개의 크리티컬 섹션이 모두 똑같은 sync.Mutex나 sync.RWMutex 변수를 사용할 때는 어느 한 크리티컬 섹션을 다른 크리티컬 섹션 안에 넣을 수 없다. 쉽게 말해 어떤 일이 있더라도 뮤텍스를 여러 함수에 분산시키면 안된다.

sync.Mutex

// 뮤텍스란 일종의 상호 배재용 락이다.
// 뮤텍스가 0이면 잠기지 않은 뮤텍스 

type Mutex struct {
    state int32
    sema uint32 
}

중요한 작업은 sync.Mutex를 잠그거나 해제하는 sync.Lock()과 sync.Unlock함수에서 처리한다. 뮤텍스를 잠근다는 말은 그 뮤텍스를 해제하기 전까지 아무도 잠금 수 없다는 것을 의미한다.

package main

import (
    "fmt"
    "os"
    "strconv"
    "sync"
    "time"
)

var (
    m sync.Mutex
    v1 int
)

func change(i int) {
    m.Lock()
    time.Sleep(time.Second)
    v1 += 1
    if v1 % 10 == 0 {
        v1 -= 10 * i
    }
    m.Unlock()
}

func read() int {
    m.Lock()
    a := v1
    m.Unlock()
    return a
}

func main() {
    numGR, _ := strconv.Atoi(os.Args[1])
    var waitGroup sync.WaitGroup

    fmt.Printf("%d ", read())
    for i := 0; i < numGR; i++ {
        waitGroup.Add(1)
        go func(i int) {
            defer waitGroup.Done()
            change(i)
            fmt.Printf("-> %d", read())
        }(i)
    }

    waitGroup.Wait()
    fmt.Printf("%d\n", read())
}
$ go run foo.go 21
0 -> 1-> 2-> 3-> 4-> 5-> 6-> 7-> 8-> 9-> -120-> -119-> -118-> -117-> -116-> -115-> -114-> -113-> -112-> -111-> -140-> -139-139

뮤텍스를 잠금해제하는 것을 잊으면 어떻게 될까?

package main

import (
    "fmt"
    "sync"
)

var m sync.Mutex

func function() {
    m.Lock()
    fmt.Println("Locked")
}

func main() {
    var w sync.WaitGroup

    go func() {
        defer w.Done()
        function()
    }()
    w.Add(1)

    go func() {
        defer w.Done()
        function()
    }()
    w.Add(1)
    w.Wait()
}

sync.RWMutex 타입

sync.RWMutex는 Mutex를 개선한 것이다. RWMutex는 쓰기 연산을 수행할 때 단 한가지의 함수만 허용하지만, 리더는 여러개가 있을 수 있다. 하지만 RWMutex에 대한 모든 리더가 뮤텍스에 대한 잠금을 해제하기 전까지는, 쓰기 용도로 잠금 수 없다.

package main

import (
    "fmt"
    "sync"
    "time"
)

var pw = secret{password: "myPassword"}

type secret struct {
    RWM sync.RWMutex
    M sync.Mutex
    password string
}

func Change(c * secret, pass string) {
    c.RWM.Lock()
    fmt.Println("LChange")
    time.Sleep(10 * time.Second)

    c.password = pass
    c.RWM.Unlock()
}

func show(c *secret) string {
    c.RWM.RLock()
    fmt.Print("Show")
    time.Sleep(3 * time.Second)
    defer c.RWM.RUnlock()
    return c.password
}

func showWithLock(c *secret) string {
    c.M.Lock()
    fmt.Println("showWithLock")
    time.Sleep(3 * time.Second)
    defer c.M.Unlock()
    return c.password
}

func main() {
    var showFunction = func(c *secret) string { return ""}
    showFunction = show

    var waitGroup sync.WaitGroup
    fmt.Println("Pass :" , showFunction(&pw))

    for i := 0; i < 15; i++ {
        waitGroup.Add(1)
        go func() {
            defer waitGroup.Done()
            fmt.Println("Go Pass : ", showFunction(&pw))
        }()
    }
    go func() {
        waitGroup.Add(1)
        defer waitGroup.Done()
        Change(&pw, "123456")
    }()

    waitGroup.Wait()
    fmt.Println("Pass:", show(&pw))
}

고루틴으로 메모리 공유하기

공유 메모리는 스레드끼리 통신하기 위한 수단 중에서도 예전 방식에 해당하지만, Go에서는 하나의 고루틴이 데이터의 일정 영역을 소유할 수 있도록 동기화 기능을 제공한다. 다시 말해 어떤 고루틴이 가진 공유 데이터를 다른 고루틴이 접근하려면 반드시 데이터를 보유한 고루틴에 먼저 허락을 받아야한다. 이런 공유 데이터를 가진 고루틴을 모니터 고루티니라 부른다.

package main

import (
    "fmt"
    "math/rand"
    "os"
    "strconv"
    "sync"
    "time"
)

var readValue = make(chan int)
var writeValue = make(chan int)

func set(newValue int) {
    writeValue <- newValue
}

func read() int {
    return <-readValue
}

func monitor() {
    var value int
    for {
        select {
        case newVValue := <-writeValue:
            value = newVValue
            fmt.Printf("%d ", value)
        case readValue <- value:
        }
    }
}

func main() {
    n, _ := strconv.Atoi(os.Args[1]) 
    fmt.Printf("going to create %d random numbers \n", n)
    rand.Seed(time.Now().Unix()) 
    go monitor()

    var w sync.WaitGroup

    for r:=0; r < n; r ++ {
        w.Add(1)
        go func() {
            defer w.Done()
            set(rand.Intn(10 * n))
        }()
    }
    w.Wait()
    fmt.Println("Last Value", read())
}
go run foo.go 20
going to create 20 random numbers 
73 117 122 30 58 86 8 92 0 164 177 134 104 142 181 53 90 52 196 147 Last Value 147

이 프로그램의 핵심 로직은 monitor()함수에서 구현된다. 그중 select 문을 통해 전반적인 연산을 제어한다. 읽기 요청에 대해서는 read()함수를 호출해 readValue 채널로부터 값을 읽는다. 이 채널은 모니터 함수에서 제어한다. 그러면 value 변수에 현재 담겨 있는 값을 리턴한다. 반면 기존에 저장된 값을 변경하려면 set()을 호출한다. 이 함수 또한 모니터함수에 관리된다. 즉, 모니터만이 변수에 접근할 수 있는 것이다.

경쟁 상대 발견하기

데이터 경쟁 상태란 두 개 이상의 스레드나 고루틴과 같은 요소들이 공유 리소스나 프로그램 변수를 서로 제어하거나 수정하려고 경쟁하는 상황을 말한다.

Go 소스 파일을 빌드하거나 실행할 때 -race 플래그를 지정하면 Go 언어에서 제공하는 레이스 디렉터가 작동하며 컴파일러는 기존 실행 파일과 다른 형태로 실행 파일을 생성한다. 이렇게 수정된 버전은 공유 메모리에 대한 접근하는 모든 경우와, 모든 동기화 이벤트를 기록한다.

package main

import (
    "fmt"
    "os"
    "strconv"
    "sync"
)

func main() {
    numGR, _ := strconv.Atoi(os.Args[1])
    var waitGroup sync.WaitGroup
    var i int

    k := make(map[int]int)
    k[1] = 12

    for i = 0; i < numGR; i++ {
        waitGroup.Add(1)
        go func() {
            defer waitGroup.Done()
            k[i] = i
        }()
    }
    k[2] = 10
    waitGroup.Wait()
    fmt.Printf("k = %#v\n", k)
}

레이스 디렉터에 의해 두개의 데이터 경쟁 상태가 발견된다.

첫 번째는 포문 루프에서 발생한다. 루프에서 사용하는 i 값을 일정하게 받아 올수 없기 때문이다.

$ go run foo.go 10
k = map[int]int{1:12, 2:10, 6:7, 9:9, 10:10}

이런식으로 6>7에서 불일치가 발생한다.

두 번째는 맵에 대해 최소 두개의 고루틴이 쓴다는 것을 알 수 있다.

package main

import (
    "fmt"
    "os"
    "strconv"
    "sync"
)
var aMutex sync.Mutex

func main() {
    numGR, _ := strconv.Atoi(os.Args[1])
    var waitGroup sync.WaitGroup
    var i int

    k := make(map[int]int)
    k[1] = 12

    for i = 0; i < numGR; i++ {
        waitGroup.Add(1)
        go func(j int) {
            defer waitGroup.Done()
            aMutex.Lock()
            k[i] = i
            aMutex.Unlock()
        }(i)
    }
    k[2] = 10
    waitGroup.Wait()
    fmt.Printf("k = %#v\n", k)
}
$ go run foo.go 10
k = map[int]int{1:12, 2:10, 4:4, 7:7, 8:8, 9:9, 10:10}

이와 같이 문제를 해결할 수 있다.

context 패키지

이 패키지는 Context 타입을 정의하고 취소 기능을 지원한다.

Context 타입은 일종의 인터페이스로 네 개의 메소를 정의한다. 하지만 메소드를 모두 구현할 필요는 없다.

package main

import (
    "context"
    "fmt"
    "os"
    "strconv"
    "time"
)

func f1(t int) {
    c1 := context.Background()
    c1, cancel := context.WithCancel(c1)
    defer cancel()
    go func() {
        time.Sleep(4 * time.Second)
        cancel()
    }()

    select {
    case <- c1.Done():
        fmt.Println("f1():", c1.Err())
        return
    case r := <-time.After(time.Duration(t) * time.Second):
        fmt.Println("f1():", r)
    }
    return
}

func f2(t int) {
    c2 := context.Background()
    c2, cancel := context.WithTimeout(c2, time.Duration(t) * time.Second)
    defer cancel()
    go func() {
        time.Sleep(4 * time.Second)
        cancel()
    }()

    select {
    case <- c2.Done():
        fmt.Println("f2():", c2.Err())
        return
    case r := <-time.After(time.Duration(t) * time.Second):
        fmt.Println("f2():", r)
    }
    return
}

func f3(t int) {
    c3 := context.Background()
    deadline := time.Now().Add(time.Duration(t * 2) * time.Second)
    c3, cancel := context.WithDeadline(c3, deadline)
    defer cancel()
    go func() {
        time.Sleep(4 * time.Second)
        cancel()
    }()

    select {
    case <- c3.Done():
        fmt.Println("f3():", c3.Err())
        return
    case r := <-time.After(time.Duration(t) * time.Second):
        fmt.Println("f3():", r)
    }
    return
}

func main() {
    delay, _ := strconv.Atoi(os.Args[1])
    fmt.Println("Delay :", delay)
    f1(delay)
    f2(delay)
    f3(delay)
}
$ go run foo.go 4 
Delay : 4
f1(): 2020-09-22 00:28:38.558834 +0900 KST m=+4.000593261
f2(): 2020-09-22 00:28:42.561814 +0900 KST m=+8.003573617
f3(): 2020-09-22 00:28:46.565665 +0900 KST m=+12.007425304
$ go run foo.go 5
Delay : 5
f1(): context canceled
f2(): context canceled
f3(): context canceled
  • context.WithCancel()함수는 기존 컨텍스트를 취소하고 그 자식을 생성한다. context.WithCancel() 함수는 Done 채널도 생성한다. 이 채널이 닫히는 시점은 cancel()이 호출이되거나 부모 컨텍스트의 Done 채널이 닫힐 때이다.

HTTP 클라이언트 + context 예제

package main

import (
    "fmt"
    "math/rand"
    "net/http"
    "time"
)

func random(min, max int) int {
    return rand.Intn(max-min) + min
}

func myHandler(w http.ResponseWriter, r *http.Request) {
    delay := random(0, 15) 
    time.Sleep(time.Duration(delay) *time.Second)

    fmt.Fprintf(w, "SErving : %s\n", r.URL.Path)
    fmt.Fprintf(w, "Delay: %d\n", delay)
    fmt.Printf("Served : %s\n", r.Host) 
}

func main() {
    seed := time.Now().Unix() 
    rand.Seed(seed)
    PORT := ":8081"
    http.HandleFunc("/", myHandler)
    http.ListenAndServe(PORT, nil )
}
package main

import (
    "context"
    "fmt"
    "io/ioutil"
    "net/http"
    "os"
    "sync"
    "time"
)

var (
    myUrl string
    delay int = 5
    w sync.WaitGroup
)

type myData struct {
    r *http.Response
    err error
}

func connect(c context.Context) error {
    defer w.Done()
    data := make(chan myData, 1)

    tr := &http.Transport{}
    httpClient := &http.Client{Transport: tr}

    req, _ := http.NewRequest("GET", myUrl, nil)

    go func() {
        res, err := httpClient.Do(req)
        if err != nil {
            fmt.Println(err)
            data <- myData{nil, err}
            return
        } else {
            pack := myData{res, err}
            data <- pack
        }
    }()

    select {
    case <-c.Done():
        tr.CancelRequest(req)
        <-data
        fmt.Println("Cancelled")
        return c.Err()
    case ok := <-data:
        err := ok.err
        resp := ok.r
        if err != nil {
            fmt.Println("Error select:", err)
            return err
        }
        defer resp.Body.Close()

        data, err := ioutil.ReadAll(resp.Body)
        if err != nil {
            fmt.Println("Error select:", err)
            return err
        }
        fmt.Printf("Response : %s\n", data)
    }
    return nil
}

func main() {
    myUrl = os.Args[1]
    fmt.Println("Delay :", delay)

    c := context.Background()
    c, cancel := context.WithTimeout(c, time.Duration(delay) * time.Second)
    defer cancel()

    fmt.Printf("Connecting to %s \n", myUrl)
    w.Add(1)
    go connect(c)
    w.Wait()
    fmt.Println("bye")
}

워커 풀

워커 풀이란 할당된 작업을 처리하려는 스레드의 집합이다. 아파치 웹 서버도 이방식으로 처리한다. 메인 프로세스는 들어온 요청을 모두 받아서 이를 실제로 처리할 워커 프로세스로 전달한다. 워커 프로세스가 작업을 마치면 새로운 클라이언트를 받아들일 준비를한다.

package main

import (
    "fmt"
    "os"
    "strconv"
    "sync"
    "time"
)

type Client struct {
    id int 
    integer int 
}
type Data struct {
    job Client 
    square int 
}

var (
    size = 10
    clients = make(chan Client, size) 
    data = make(chan Data, size) 
)

func worker(w *sync.WaitGroup) {
    for c := range clients {
        square := c.integer * c.integer
        output := Data{c, square}
        data <- output
        time.Sleep(time.Second)
    }
    w.Done()
}

func makeWP(n int) {
    var w sync.WaitGroup
    for i := 0; i < n; i++ {
        w.Add(1)
        go worker(&w)
    }
    w.Wait()
    close(data)
}

func create(n int) {
    for i := 0; i < n; i++ {
        c := Client{i, i}
        clients <-c 
    }
    close(clients)
}

func main() {
    fmt.Println("Capacity of Clients:", cap(clients))
    fmt.Println("Capacity of data:", cap(data)) 

    nJobs, _ := strconv.Atoi(os.Args[1])
    nWorkers, _ := strconv.Atoi(os.Args[2]) 

    go create(nJobs)
    finished := make(chan interface{})
    go func() {
        for d := range data {
            fmt.Printf("Client ID : %d\tint : ", d.job.id)
            fmt.Printf("%d\tsquare: %d\n", d.job.integer, d.square)
        }
        finished <- true
    }() 

    makeWP(nWorkers)
    fmt.Printf(": %v\n", <-finished)
}
$ go run foo.go 15 5
Capacity of Clients: 10
Capacity of data: 10
Client ID : 0   int : 0 square: 0
Client ID : 1   int : 1 square: 1
Client ID : 2   int : 2 square: 4
Client ID : 3   int : 3 square: 9
Client ID : 4   int : 4 square: 16
Client ID : 5   int : 5 square: 25
Client ID : 6   int : 6 square: 36
Client ID : 7   int : 7 square: 49
Client ID : 8   int : 8 square: 64
Client ID : 9   int : 9 square: 81
Client ID : 10  int : 10        square: 100
Client ID : 11  int : 11        square: 121
Client ID : 12  int : 12        square: 144
Client ID : 13  int : 13        square: 169
Client ID : 14  int : 14        square: 196
: true

Client 구조체로 처리할 요청마다 고유 아이디를 할당한다. Data구조체는 클라이언트의 데이터에 대해 이프로그램에서 실제로 생성한 결과를 그룹으로 묶는 데 사용된다. 클라이언트 구조체는 각 요청에 대한 입력 데이터를 가지고 Data 구조체는 요청의 결과를 담는다.

worker()함수는 처리할 요청을 clients 채널에서 읽어서 요청에 대한 처리가 끝나면 data 채널에 쓴다.

makeWP() 의 목적은 사용할 worker() 고루틴의 개수를 알아내는 것이다.

create()의 목적은 요청을 만드는것이다.