「Go言語による並行処理」の気になった所まとめ5 ハートビート

ハートビートは並行処理のプロセスが生きていることを外に伝える方法です。と書かれています。 もう、この1文でこの章のことを言っています。そんななか、この本では2パターン存在するとしています。

  • 一定周期で発生するハートビート
  • 仕事単位の最初に発生するハートビート

一定周期で発生するハートビート

一定周期で発生するハートビートはライフタイムが長いgorutineが生きているかを他の関数に伝える方法となります。

package main

import (
    "fmt"
    "time"
)

func sendPulse(heartbeat chan<- interface{}) {
    select {
    case heartbeat <- struct{}{}:
    default: // 誰もハートビートを確認していない可能性があるのでdefault節は必須
    }
}

func sendResult(r time.Time,
    done <-chan interface{},
    results chan<- time.Time,
    pulse <-chan time.Time,
    heartbeat chan<- interface{}) {
    for {
        select {
        case <-done:
            return
        case <-pulse: // doneチャネル同様、送信・受信時必ずハートビートを含める
            sendPulse(heartbeat)
        case results <- r:
            return
        }
    }
}

// メインの処理を行う関数
func doWork(
    done <-chan interface{},
    pulseInterval time.Duration,
) (<-chan interface{}, <-chan time.Time) {
    // ハートビートを送信する先のチャネルを設定。このチャネルをdoWorkより返す
    heartbeat := make(chan interface{})
    results := make(chan time.Time)
    go func() {
        defer close(heartbeat)
        defer close(results)
        // 与えられたpulseIntervalの周期でハートビートの鼓動を設。
        // pulseIntervalごとにこのチャネルでは何かしら読み込みできる
        pulse := time.Tick(pulseInterval)
        // 実際の処理を行うティッカー(今回はシミュレート)
        // pulseIntervalよりも大きな周期を選択したことでハートビートが確認できる
        workGen := time.Tick(2 * pulseInterval)

        for {
            select {
            case <-done:
                return
            case <-pulse: // doneチャネル同様、送信・受信時必ずハートビートを含める
                sendPulse(heartbeat)
            case r := <-workGen:
                sendResult(r, done, results, pulse, heartbeat)
            }
        }

    }()
    return heartbeat, results
}

func main() {
    done := make(chan interface{})
    // doneチャネルを10秒後に閉じる
    time.AfterFunc(10*time.Second, func() {
        fmt.Println("close(done)")
        close(done)
    })
    const timeout = 2 * time.Second
    // ここでtimeout/2を渡すことでタイムアウトにならないようにする
    heartbeat, results := doWork(done, timeout/2)
    for {
        select {
        // timeout/2経過するごとにメッセージを受け取れる
        case _, ok := <-heartbeat:
            if !ok {
                return
            }
            fmt.Println("pulse")
        case r, ok := <-results:
            if ok == false {
                return
            }
            fmt.Printf("results %v\n", r.Second())
        case <-time.After(timeout): // doWorkが正常な限り、呼ばれない
            fmt.Println("timeout")
            return
        }
    }
}

実行すると、意図したとおり1つの結果につき2つの鼓動を受信しています。

pulse 
pulse results 52 
pulse 
pulse results 54 
pulse 
pulse results 56 
pulse 
pulse results 58 
pulse

下記の例では意図的にちゃんと動かないプログラムを記述して、ハートビートが有用であることを例示しています。

package main

import (
    "fmt"
    "time"
)

func sendPulse(heartbeat chan<- interface{}) {
    select {
    case heartbeat <- struct{}{}:
    default: // 誰もハートビートを確認していない可能性があるのでdefault節は必須
    }
}

func sendResult(r time.Time,
    done <-chan interface{},
    results chan<- time.Time,
    pulse <-chan time.Time,
    heartbeat chan<- interface{}) {
    for {
        select {
        case <-done:
            return
        case <-pulse: // doneチャネル同様、送信・受信時必ずハートビートを含める
            sendPulse(heartbeat)
        case results <- r:
            return
        }
    }
}

// メインの処理を行う関数
func doWork(
    done <-chan interface{}, pulseInterval time.Duration,
) (<-chan interface{}, <-chan time.Time) {
    heartbeat := make(chan interface{})
    results := make(chan time.Time)
    go func() {
        pulse := time.Tick(pulseInterval)
        workGen := time.Tick(2 * pulseInterval)
        // タイムアウトするように2回だけ実行する
        for i := 0; i < 2; i++ {
            select {
            case <-done:
                return
            case <-pulse:
                sendPulse(heartbeat)
            case r := <-workGen:
                sendResult(r, done, results, pulse, heartbeat)
            }
        }
    }()
    return heartbeat, results
}

func main() {
    done := make(chan interface{})
    // doneチャネルを10秒後に閉じる
    time.AfterFunc(10*time.Second, func() {
        fmt.Println("close(done)")
        close(done)
    })
    const timeout = 2 * time.Second
    heartbeat, results := doWork(done, timeout/2)
    for {
        select {
        // timeout/2経過するごとにメッセージを受け取れる
        case _, ok := <-heartbeat:
            if !ok {
                return
            }
            fmt.Println("pulse")
        case r, ok := <-results:
            if ok == false {
                return
            }
            fmt.Printf("results %v\n", r.Second())
        case <-time.After(timeout):
            fmt.Println("worker goroutine is not healthy!")
            return
        }
    }
}

上のコードを実行すると次のような結果となります。

pulse
pulse
worker goroutine is not healthy!

仕事単位の最初に発生するハートビート

テストのときに有用なパターンだそうです。

package main

import (
    "fmt"
    "math/rand"
)

// ハートビートを必ず最初に送ることで呼び出し側でタイムアウトを設定しないでよくする方法
func doWork(done <-chan interface{}) (<-chan interface{}, <-chan int) {
    // バッファが1のheartbeatチャネルを作成
    // 送信待ちのものが何もなくても最低1つの鼓動が常に送られることを保証する
    heartbeatStream := make(chan interface{}, 1)
    workStream := make(chan int)
    go func() {
        defer close(heartbeatStream)
        defer close(workStream)
        for i := 0; i < 10; i++ {
            // heartbeatStream、workStream両方ともselect内で値を渡しているので、
            // 両方同じselectに入れると、どちらを送信したのかわからなくなる
            // heartbeatは確実に分けて送信したいので別にする
            select {
            case heartbeatStream <- struct{}{}:
            default: // 誰もハートビートを待っていない可能性があるため
            }
            select {
            case <-done:
                return
            case workStream <- rand.Intn(10):
            }
        }
    }()
    return heartbeatStream, workStream
}

func main() {

    done := make(chan interface{})
    defer close(done)

    heartbeat, results := doWork(done)
    for {
        select {
        case _, ok := <-heartbeat:
            if ok {
                fmt.Println("pulse")
            } else {
                return
            }
        case r, ok := <-results:
            if ok {
                fmt.Printf("results %v\n", r)
            } else {
                return
            }
        }
    }
}

doWork内に2つのselectが存在するのが分かりにくいですね...selectを1つにしてみると毎回ランダムになってしまいます。heatbeatを確実に送信しつつブロックさせないために2つのselectになっているのだと思います。動作結果は下記となります。

pulse results 1 
pulse results 7 
pulse results 7 
pulse results 9 
pulse results 1 
pulse results 8
pulse results 5 
pulse results 0 
pulse results 6 
pulse results 0

結果毎にheartbeatが届いています。テストで有用だと言うことで、先に悪いパターンを例示します。

func TestDoWork_GeneratesAllNumbers(t *testing.T) {
    done := make(chan interface{})
    defer close(done)
    intSlice := []int{0, 1, 2, 3, 5}
    _, results := DoWork(done, intSlice...)
    for i, expected := range intSlice {
        select {
        case r := <-results:
            if r != expected {
                t.Errorf(
                    "index %v: expected %v, but received %v,", i,
                    expected,
                    r,
                )
            }
            // 壊れたゴルーチンがテストでデッドロックを起こしてしまわないのに十分と思われる時間が経過したあとでタイムアウトさせる
        case <-time.After(1 * time.Second):
            t.Fatal("test timed out")
        }
    }
}

テストを実行すると次のような結果になります。

go test ./bad_concurrent_test.go
--- FAIL: TestDoWork_GeneratesAllNumbers (1.00s)
bad_concurrent_test.go:46: test timed out FAIL
FAIL command-line-arguments 1.002s

このテストが悪い理由は非決定的だからです。 テストが確実に落ちるようにtime.Sleep設定しましたが、なくすと、成功したり失敗したりするようになります。 ハートビートを使うことで決定的なテストとなります。

func TestDoWork_GeneratesAllNumbers(t *testing.T) {
    done := make(chan interface{})
    defer close(done)
    intSlice := []int{0, 1, 2, 3, 5}
    heartbeat, results := DoWork(done, intSlice...)
    // 必ずheartbeatが来てからテストを始めることでタイムアウトをなくす
    <-heartbeat
    i := 0
    for r := range results {
        if expected := intSlice[i]; r != expected {
            t.Errorf("index %v: expected %v, but received %v,", i, expected, r)
        }
        i++
    }
}

テストを実行すると次のような結果となります。

command-line-arguments 2.002s
ok

とここまで、仕事単位の最初に発生するハートビートの説明をしてたのに、初期化など最初に時間がかかる場合に完全に安全な方法として、定期的ハートビートの例が最後に出てきます(仕事単位の説明した意味無いのでは...)

package main

import (
    "testing"
    "time"
)

func DoWork(
    done <-chan interface{},
    pulseInterval time.Duration,
    nums ...int,
) (<-chan interface{}, <-chan int) {
    heartbeat := make(chan interface{}, 1)
    intStream := make(chan int)
    go func() {
        defer close(heartbeat)
        defer close(intStream)
        time.Sleep(2 * time.Second)
        pulse := time.Tick(pulseInterval)
    numLoop:
        for _, n := range nums {
            for { // 内側のループはnがintStreamに無事送信されるまで繰り返すもの
                select {
                case <-done:
                    return
                case <-pulse:
                    select {
                    case heartbeat <- struct{}{}:
                    default:
                    }
                case intStream <- n:
                    continue numLoop // 送信できた場合、外ループが進む
                }
            }
        }
    }()
    return heartbeat, intStream
}
func TestDoWork_GeneratesAllNumbers(t *testing.T) {
    done := make(chan interface{})
    defer close(done)
    intSlice := []int{0, 1, 2, 3, 5}
    const timeout = 2 * time.Second
    heartbeat, results := DoWork(done, timeout/2, intSlice...)
    // 最初にハートビートのが来るのを確認する
    <-heartbeat
    i := 0
    for {
        select {
        case r, ok := <-results:
            if ok == false {
                return
            } else if expected := intSlice[i]; r != expected {
                t.Errorf(
                    "index %v: expected %v, but received %v,", i,
                    expected,
                    r,
                )
            }
            i++
        case <-heartbeat: // タイムアウトが発生しないようにハートビートからも値を取得する
        case <-time.After(timeout):
            t.Fatal("test timed out")
        }
    }
}

このテストを実施すると次のような結果となります。

command-line-arguments 3.002s

ただ、最後のversionはテストが分かりにくいから、その前の方(単純なrangeだけのテスト)のほうでよいならそっちを使ったほうが良いと書かれています。テストの問題なのか!と思ってしまいますが...
最後に

長時間稼働しているゴルーチンや、テストが必要なゴルーチンを扱う際にはこのパターンを使うことを強くおすすめします。

と書かれています。