「Go言語による並行処理」の気になった所まとめ6 不健全なゴルーチンを直す

やっと最後の記事になりました。 不健全なゴルーチンを直す では長時間動くプロセスの場合、ゴルーチンも生存期間が長くなります。関心の分離からゴルーチン自身が回復の方法を知らないほうが良いとのことです。このゴルーチンを再起動する流れを「回復」と呼ぶそうです。 回復にはハートビートパターンを使って行います。
本ではゴルーチンの健全性を監視するロジックを管理人 (steward)とよび、管理人が監視するゴ ルーチンを中庭 (ward)と呼んでいます。 管理人は中庭にいるゴルーチンが不健全になったら再起動する責任も負っています。そのためには、そのゴルーチンを起動する関数への参照が必要になります。
下記が管理人のサンプル実装です。

package main

import (
    "log"
    "os"
    "time"
)

// 監視と再起動ができるゴルーチンのシグネチャの定義
// doneチャネルとハートビートパターンのpulseIntervalとheartbeatを使用
type startGoroutineFn func(
    done <-chan interface{},
    pulseInterval time.Duration,
) (heartbeat <-chan interface{})

// 管理人が監視するゴルーチンのためのtimeoutと、
// 監視するゴルーチンを起動するための startGoroutine関数をとり、
// 管理人自身はstartGoroutineFnを返していているのでそれ自体も監視対象となりえる
func newSteward(
    timeout time.Duration,
    startGoroutine startGoroutineFn,
) startGoroutineFn {
    return func(
        done <-chan interface{},
        pulseInterval time.Duration,
    ) <-chan interface{} {
        heartbeat := make(chan interface{})
        go goFn(done, heartbeat, pulseInterval, timeout, startGoroutine)
        return heartbeat
    }
}

// 監視しているゴルーチンを起動するための関数
func startWard(
    done <-chan interface{},
    timeout time.Duration,
    startGoroutine startGoroutineFn,
) (
    wardDone chan interface{},
    wardHeartbeat <-chan interface{},
) {
    //  停止グナルを送る必要がある場合に備えて、中庭のゴルーチンに渡す新しいチャネルを作成
    wardDone = make(chan interface{})
    // 監視対象のゴルーチンを起動する
    // 管理人自体が停止するか、管理人が中庭のゴルーチンを停止させる場合があるので、
    // 両方のdoneチャネルをorの中に内包させる
    wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2)
    return
}

func goFn(
    done <-chan interface{},
    heartbeat chan interface{},
    pulseInterval time.Duration,
    timeout time.Duration,
    startGoroutine startGoroutineFn,
) {
    defer close(heartbeat)
    var wardDone chan interface{}
    var wardHeartbeat <-chan interface{}
    wardDone, wardHeartbeat = startWard(done, timeout, startGoroutine)
    pulse := time.Tick(pulseInterval)
monitorLoop:
    for {
        timeoutSignal := time.After(timeout)
        for { // 2重ループにすることで、管理人が自身の鼓動を確実に外へと送信できるようにしている
            select {
            case <-pulse:
                select {
                case heartbeat <- struct{}{}:
                default:
                }
            case <-wardHeartbeat: // 中庭の鼓動を受信したら、監視のループを継続する
                continue monitorLoop
            case <-timeoutSignal:
                // 鼓動が受信できなければ、中庭に停止するようリクエストし、
                // 新しい中庭のゴルーチンを起動しはじめる。その後、監視を続ける
                log.Println("steward: ward unhealthy; restarting")
                close(wardDone)
                wardDone, wardHeartbeat = startWard(done, timeout, startGoroutine)
                continue monitorLoop
            case <-done:
                return
            }
        }
    }
}

func main() {
    log.SetOutput(os.Stdout)
    log.SetFlags(log.Ltime | log.LUTC)
    // doWorkが起動するゴルーチンのための管理人を作る関数を作成
    // doWorkのタイムアウトは4秒に設定
    doWorkWithSteward := newSteward(4*time.Second, doWork)
    done := make(chan interface{})
    // 9秒後に管理人を停止させる、そうすると中庭も停止してこの例が終わる
    time.AfterFunc(9*time.Second, func() {
        log.Println("main: halting steward and ward.")
        close(done)
    })
    // 管理人を起動して、鼓動をrangeで回して調べて、例が終了してしまうのを防ぐ
    // pulseIntervalとして4秒を設定
    for range doWorkWithSteward(done, 4*time.Second) {
    }
    log.Println("Done")

}

func doWork(done <-chan interface{}, _ time.Duration) <-chan interface{} {
    log.Println("ward: Hello, I'm irresponsible!")
    go func() {
        // 何もしておらず、キャンセルされるのを待っている。
        // また、鼓動をまったく送信していない
        <-done
        log.Println("ward: I am halting.")
    }()
    return nil
}

// この関数はチャネルの可変長引数のスライスを受け取り、1つのチャネルを返す
func or(channels ...<-chan interface{}) <-chan interface{} {
    // 再帰関数なので、停止条件が必要
    // スライスが空の場合は単純にnilチャネルを返す。チャネルを渡さなかった場合と同義
    // 必ずorDoneは渡されるので、再起中は引数が0はありえない
    switch len(channels) {
    case 0:
        return nil
    case 1:
        // 可変長引数のスライスが1つしか要素を持っていない場合は、要素を返すだけ
        return channels[0]
    }
    orDone := make(chan interface{})
    // 関数の本体で再帰が発生する部分
    // ゴルーチンを作り、ブロックすることなく作ったチャネルにメッセージを受け取れるようする
    go func() {
        defer close(orDone)

        switch len(channels) {
        // orへの各再帰呼出しは少なくとも2つのチャネルを持っているので
        // ゴルーチンの数を制限するため、特別な条件を設定
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            // スライスの3番目以降のチャネルから再帰的にorチャネルを作成して、
            // そこからselectを行う
            // この再帰関係はスライスの残りの部分をorチャネルに分解して、
            // 最初のシグナルが返ってくる木構造を形成する
            // orDoneチャネルも渡して、木構造の上位の部分が終了したら
            // 下位の部分も終了するようにする
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            // 再起の場合、チャンネルは最低1は渡す
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}

この例では次のような出力を得られます。

18:28:07 ward: Hello, I'm irresponsible! 
18:28:11 steward: ward unhealthy; restarting 
18:28:11 ward: Hello, I'm irresponsible! 
18:28:11 ward: I am halting.
18:28:15 steward: ward unhealthy; restarting 
18:28:15 ward: Hello, I'm irresponsible! 
18:28:15 ward: I am halting.
18:28:16 main: halting steward and ward. 
18:28:16 ward: I am halting.
18:28:16 Done

次に、中庭を抽象的にすることで任意のクロージャーを受け取れるように修正した例です。 この例が自分の目当てのサンプルだったんですが、、、難しい...
本のプログラムはクロージャーを多様していたのですが、ネストすると見にくいのでできるだけ関数にしています。
サンプルの中でbridge チャネルが登場します。ここがよくわからないポイントだったのですが、回復が発生するとそれまで使っていたチャンネルが閉じられてしまうのでずっと閉じられないチャンネルでラップすることでチャンネルのハンドリングをシンプルにしているようです。
また、ハートビートが2つ出てくるのも非常に混乱をしますが、中庭と仕事、それぞれでハートビートをしているというのがわかると理解が進みました。
また、take だけ記事で説明していません...Haskellの無限リストを指定の場所まで読み取る関数のチャンネル版です。

package main

import (
    "fmt"
    "log"
    "os"
    "time"
)

// 監視と再起動ができるゴルーチンのシグネチャの定義
// doneチャネルとハートビートパターンのpulseIntervalとheartbeatを使用
type startGoroutineFn func(
    done <-chan interface{},
    pulseInterval time.Duration,
) (heartbeat <-chan interface{})

// 管理人が監視するゴルーチンのためのtimeoutと、
// 監視するゴルーチンを起動するための startGoroutine関数をとり、
// 管理人自身はstartGoroutineFnを返していているのでそれ自体も監視対象となりえる
func newSteward(
    timeout time.Duration,
    startGoroutine startGoroutineFn,
) startGoroutineFn {
    return func(
        done <-chan interface{},
        pulseInterval time.Duration,
    ) <-chan interface{} {
        heartbeat := make(chan interface{})
        go goFn(done, heartbeat, pulseInterval, timeout, startGoroutine)
        return heartbeat
    }
}

func goFn(
    done <-chan interface{},
    heartbeat chan interface{},
    pulseInterval time.Duration,
    timeout time.Duration,
    startGoroutine startGoroutineFn,
) {
    defer close(heartbeat)
    var wardDone chan interface{}
    var wardHeartbeat <-chan interface{}
    wardDone, wardHeartbeat = startWard(done, timeout, startGoroutine)
    pulse := time.Tick(pulseInterval)
monitorLoop:
    for {
        timeoutSignal := time.After(timeout)
        for { // 2重ループにすることで、管理人が自身の鼓動を確実に外へと送信できるようにしている
            select {
            case <-pulse:
                select {
                case heartbeat <- struct{}{}:
                default:
                }
            case <-wardHeartbeat: // 中庭の鼓動を受信したら、監視のループを継続する
                continue monitorLoop
            case <-timeoutSignal:
                // 鼓動が受信できなければ、中庭に停止するようリクエストし、
                // 新しい中庭のゴルーチンを起動しはじめる。その後、監視を続ける
                log.Println("steward: ward unhealthy; restarting")
                close(wardDone)
                wardDone, wardHeartbeat = startWard(done, timeout, startGoroutine)
                continue monitorLoop
            case <-done:
                return
            }
        }
    }
}

// 監視しているゴルーチンを起動するための関数
func startWard(
    done <-chan interface{},
    timeout time.Duration,
    startGoroutine startGoroutineFn,
) (
    wardDone chan interface{},
    wardHeartbeat <-chan interface{},
) {
    //  停止グナルを送る必要がある場合に備えて、中庭のゴルーチンに渡す新しいチャネルを作成
    wardDone = make(chan interface{})
    // 監視対象のゴルーチンを起動する
    // 管理人自体が停止するか、管理人が中庭のゴルーチンを停止させる場合があるので、
    // 両方のdoneチャネルをorの中に内包させる
    wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2)
    return
}

func main() {
    log.SetFlags(log.Ltime | log.LUTC)
    log.SetOutput(os.Stdout)
    done := make(chan interface{})
    defer close(done)
    // 中庭の関数を作る。整数の可変長引数のスライスを内包して、やり取りに使えるストリームを返す
    doWork, intBridgeStream := doWorkFn(done, 1, 2, -1, 3, 4, 5)
    // doWorkのクロージャーを監視する管理人を作成。すぐ失敗が発生するので監視期間を1ミリ秒に設定
    doWorkWithSteward := newSteward(1*time.Millisecond, doWork)
    // 管理人に中庭を起動して監視を開始するように伝える
    doWorkWithSteward(done, 1*time.Hour)
    // 最後に、構築したパイプラインのステージの1つを使ってintStreamから最初の6つの値を取得する
    for intVal := range take(done, intBridgeStream, 6) {
        fmt.Printf("Received: %v\n", intVal)
    }
}

// 中庭に囲い込んでもらいたい値を取って、中庭からのやり取りに使うチャネルを返す
func doWorkFn(
    done <-chan interface{},
    intList ...int,
) (
    startGoroutineFn, // 中庭の関数
    <-chan interface{},
) {
    // ブリッジパターンとしてチャネルのチャネルを作る
    intChanStream := make(chan (<-chan interface{}))
    intBridgeStream := bridge(done, intChanStream)
    doWork := func(
        done <-chan interface{},
        pulseInterval time.Duration,
    ) <-chan interface{} { // 管理人に監視されるクロージャーを作成
        // wardHeartbeatになる
        heartbeat := make(chan interface{})
        go doWorkGoFn(done, intChanStream, heartbeat, pulseInterval, intList)
        return heartbeat
    }
    return doWork, intBridgeStream
}

func doWorkGoFn(done <-chan interface{},
    intChanStream chan (<-chan interface{}),
    heartbeat chan<- interface{},
    pulseInterval time.Duration,
    intList []int,
) {
    // 中庭のゴルーチンのインスタンスの中でやり取りするチャネルを初期化
    intStream := make(chan interface{})
    defer close(intStream)
    select {
    case intChanStream <- intStream: // 新しいチャネルをブリッジに知らせる
    case <-done:
        return
    }
    pulse := time.Tick(pulseInterval)
    for {
    valueLoop:
        for _, intVal := range intList {
            if intVal < 0 {
                // 負の数字が来たときにエラーのログを取ってゴルーチンからreturnすることで、
                // 不健全な中庭をシミュレートしている
                log.Printf("negative value: %v\n", intVal)
                return
            }
            for {
                select {
                case <-pulse:
                    select {
                    case heartbeat <- struct{}{}:
                    default:
                    }
                case intStream <- intVal:
                    continue valueLoop
                case <-done:
                    return
                }
            }
        }
    }

}

func orDone(done, c <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if ok == false {
                    return
                }
                select {
                case valStream <- v:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

func bridge(
    done <-chan interface{},
    chanStream <-chan <-chan interface{},
) <-chan interface{} {
    // bridgeからすべての値を返すチャネル
    valStream := make(chan interface{})

    go func() {
        defer close(valStream)
        // このループはchanStreamからチャネルを剥ぎ取り、ネストされたループに渡す
        for {
            var stream <-chan interface{}
            select {
            case maybeStream, ok := <-chanStream:
                if ok == false {
                    return
                }
                stream = maybeStream
            case <-done:
                return
            }
            // このループは渡されたチャネルから値を読み込みvalStreamにその値を渡す
            for val := range orDone(done, stream) {
                select {
                case valStream <- val:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

func take(
    done <-chan interface{},
    valueStream <-chan interface{},
    num int,
) <-chan interface{} {
    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
            case <-done:
                return
            case takeStream <- <-valueStream:
            }
        }
    }()
    return takeStream
}

// この関数はチャネルの可変長引数のスライスを受け取り、1つのチャネルを返す
func or(channels ...<-chan interface{}) <-chan interface{} {
    // 再帰関数なので、停止条件が必要
    // スライスが空の場合は単純にnilチャネルを返す。チャネルを渡さなかった場合と同義
    // 必ずorDoneは渡されるので、再起中は引数が0はありえない
    switch len(channels) {
    case 0:
        return nil
    case 1:
        // 可変長引数のスライスが1つしか要素を持っていない場合は、要素を返すだけ
        return channels[0]
    }
    orDone := make(chan interface{})
    // 関数の本体で再帰が発生する部分
    // ゴルーチンを作り、ブロックすることなく作ったチャネルにメッセージを受け取れるようする
    go func() {
        defer close(orDone)

        switch len(channels) {
        // orへの各再帰呼出しは少なくとも2つのチャネルを持っているので
        // ゴルーチンの数を制限するため、特別な条件を設定
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            // スライスの3番目以降のチャネルから再帰的にorチャネルを作成して、
            // そこからselectを行う
            // この再帰関係はスライスの残りの部分をorチャネルに分解して、
            // 最初のシグナルが返ってくる木構造を形成する
            // orDoneチャネルも渡して、木構造の上位の部分が終了したら
            // 下位の部分も終了するようにする
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            // 再起の場合、チャンネルは最低1は渡す
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}
Received: 1
23:25:33 negative value: -1
Received: 2
23:25:33 steward: ward unhealthy; restarting

Received: 1
23:25:33 negative value: -1
Received: 2
23:25:33 steward: ward unhealthy; restarting Received: 1
23:25:33 negative value: -1
Received: 2

(補足)
サンプルではエラーが発生したものをスキップしていないですが、スキップするようにするには、 リストを消費するようにします。

valueLoop:
for _, intVal := range intList {
// ...
 }

// 代わりに下にする

valueLoop: for {
intVal := intList[0] 
intList = intList[1:] // ...
}

ということで、Go言語による並行処理の気になったところを見てきました。
私が書いていない部分も非常にためになる部分が多いので是非、一読してみてください。

「Go言語による並行処理」の気になった所まとめ5 ハートビート

ハートビートは並行処理のプロセスが生きていることを外に伝える方法です。と書かれています。 もう、この1文でこの章のことを言っています。そんななか、この本では2パターン存在するとしています。

  • 一定周期で発生するハートビート
  • 仕事単位の最初に発生するハートビート

一定周期で発生するハートビート

一定周期で発生するハートビートはライフタイムが長いgorutineが生きているかを他の関数に伝える方法となります。

package main

import (
    "fmt"
    "time"
)

func sendPulse(heartbeat chan<- interface{}) {
    select {
    case heartbeat <- struct{}{}:
    default: // 誰もハートビートを確認していない可能性があるのでdefault節は必須
    }
}

func sendResult(r time.Time,
    done <-chan interface{},
    results chan<- time.Time,
    pulse <-chan time.Time,
    heartbeat chan<- interface{}) {
    for {
        select {
        case <-done:
            return
        case <-pulse: // doneチャネル同様、送信・受信時必ずハートビートを含める
            sendPulse(heartbeat)
        case results <- r:
            return
        }
    }
}

// メインの処理を行う関数
func doWork(
    done <-chan interface{},
    pulseInterval time.Duration,
) (<-chan interface{}, <-chan time.Time) {
    // ハートビートを送信する先のチャネルを設定。このチャネルをdoWorkより返す
    heartbeat := make(chan interface{})
    results := make(chan time.Time)
    go func() {
        defer close(heartbeat)
        defer close(results)
        // 与えられたpulseIntervalの周期でハートビートの鼓動を設。
        // pulseIntervalごとにこのチャネルでは何かしら読み込みできる
        pulse := time.Tick(pulseInterval)
        // 実際の処理を行うティッカー(今回はシミュレート)
        // pulseIntervalよりも大きな周期を選択したことでハートビートが確認できる
        workGen := time.Tick(2 * pulseInterval)

        for {
            select {
            case <-done:
                return
            case <-pulse: // doneチャネル同様、送信・受信時必ずハートビートを含める
                sendPulse(heartbeat)
            case r := <-workGen:
                sendResult(r, done, results, pulse, heartbeat)
            }
        }

    }()
    return heartbeat, results
}

func main() {
    done := make(chan interface{})
    // doneチャネルを10秒後に閉じる
    time.AfterFunc(10*time.Second, func() {
        fmt.Println("close(done)")
        close(done)
    })
    const timeout = 2 * time.Second
    // ここでtimeout/2を渡すことでタイムアウトにならないようにする
    heartbeat, results := doWork(done, timeout/2)
    for {
        select {
        // timeout/2経過するごとにメッセージを受け取れる
        case _, ok := <-heartbeat:
            if !ok {
                return
            }
            fmt.Println("pulse")
        case r, ok := <-results:
            if ok == false {
                return
            }
            fmt.Printf("results %v\n", r.Second())
        case <-time.After(timeout): // doWorkが正常な限り、呼ばれない
            fmt.Println("timeout")
            return
        }
    }
}

実行すると、意図したとおり1つの結果につき2つの鼓動を受信しています。

pulse 
pulse results 52 
pulse 
pulse results 54 
pulse 
pulse results 56 
pulse 
pulse results 58 
pulse

下記の例では意図的にちゃんと動かないプログラムを記述して、ハートビートが有用であることを例示しています。

package main

import (
    "fmt"
    "time"
)

func sendPulse(heartbeat chan<- interface{}) {
    select {
    case heartbeat <- struct{}{}:
    default: // 誰もハートビートを確認していない可能性があるのでdefault節は必須
    }
}

func sendResult(r time.Time,
    done <-chan interface{},
    results chan<- time.Time,
    pulse <-chan time.Time,
    heartbeat chan<- interface{}) {
    for {
        select {
        case <-done:
            return
        case <-pulse: // doneチャネル同様、送信・受信時必ずハートビートを含める
            sendPulse(heartbeat)
        case results <- r:
            return
        }
    }
}

// メインの処理を行う関数
func doWork(
    done <-chan interface{}, pulseInterval time.Duration,
) (<-chan interface{}, <-chan time.Time) {
    heartbeat := make(chan interface{})
    results := make(chan time.Time)
    go func() {
        pulse := time.Tick(pulseInterval)
        workGen := time.Tick(2 * pulseInterval)
        // タイムアウトするように2回だけ実行する
        for i := 0; i < 2; i++ {
            select {
            case <-done:
                return
            case <-pulse:
                sendPulse(heartbeat)
            case r := <-workGen:
                sendResult(r, done, results, pulse, heartbeat)
            }
        }
    }()
    return heartbeat, results
}

func main() {
    done := make(chan interface{})
    // doneチャネルを10秒後に閉じる
    time.AfterFunc(10*time.Second, func() {
        fmt.Println("close(done)")
        close(done)
    })
    const timeout = 2 * time.Second
    heartbeat, results := doWork(done, timeout/2)
    for {
        select {
        // timeout/2経過するごとにメッセージを受け取れる
        case _, ok := <-heartbeat:
            if !ok {
                return
            }
            fmt.Println("pulse")
        case r, ok := <-results:
            if ok == false {
                return
            }
            fmt.Printf("results %v\n", r.Second())
        case <-time.After(timeout):
            fmt.Println("worker goroutine is not healthy!")
            return
        }
    }
}

上のコードを実行すると次のような結果となります。

pulse
pulse
worker goroutine is not healthy!

仕事単位の最初に発生するハートビート

テストのときに有用なパターンだそうです。

package main

import (
    "fmt"
    "math/rand"
)

// ハートビートを必ず最初に送ることで呼び出し側でタイムアウトを設定しないでよくする方法
func doWork(done <-chan interface{}) (<-chan interface{}, <-chan int) {
    // バッファが1のheartbeatチャネルを作成
    // 送信待ちのものが何もなくても最低1つの鼓動が常に送られることを保証する
    heartbeatStream := make(chan interface{}, 1)
    workStream := make(chan int)
    go func() {
        defer close(heartbeatStream)
        defer close(workStream)
        for i := 0; i < 10; i++ {
            // heartbeatStream、workStream両方ともselect内で値を渡しているので、
            // 両方同じselectに入れると、どちらを送信したのかわからなくなる
            // heartbeatは確実に分けて送信したいので別にする
            select {
            case heartbeatStream <- struct{}{}:
            default: // 誰もハートビートを待っていない可能性があるため
            }
            select {
            case <-done:
                return
            case workStream <- rand.Intn(10):
            }
        }
    }()
    return heartbeatStream, workStream
}

func main() {

    done := make(chan interface{})
    defer close(done)

    heartbeat, results := doWork(done)
    for {
        select {
        case _, ok := <-heartbeat:
            if ok {
                fmt.Println("pulse")
            } else {
                return
            }
        case r, ok := <-results:
            if ok {
                fmt.Printf("results %v\n", r)
            } else {
                return
            }
        }
    }
}

doWork内に2つのselectが存在するのが分かりにくいですね...selectを1つにしてみると毎回ランダムになってしまいます。heatbeatを確実に送信しつつブロックさせないために2つのselectになっているのだと思います。動作結果は下記となります。

pulse results 1 
pulse results 7 
pulse results 7 
pulse results 9 
pulse results 1 
pulse results 8
pulse results 5 
pulse results 0 
pulse results 6 
pulse results 0

結果毎にheartbeatが届いています。テストで有用だと言うことで、先に悪いパターンを例示します。

func TestDoWork_GeneratesAllNumbers(t *testing.T) {
    done := make(chan interface{})
    defer close(done)
    intSlice := []int{0, 1, 2, 3, 5}
    _, results := DoWork(done, intSlice...)
    for i, expected := range intSlice {
        select {
        case r := <-results:
            if r != expected {
                t.Errorf(
                    "index %v: expected %v, but received %v,", i,
                    expected,
                    r,
                )
            }
            // 壊れたゴルーチンがテストでデッドロックを起こしてしまわないのに十分と思われる時間が経過したあとでタイムアウトさせる
        case <-time.After(1 * time.Second):
            t.Fatal("test timed out")
        }
    }
}

テストを実行すると次のような結果になります。

go test ./bad_concurrent_test.go
--- FAIL: TestDoWork_GeneratesAllNumbers (1.00s)
bad_concurrent_test.go:46: test timed out FAIL
FAIL command-line-arguments 1.002s

このテストが悪い理由は非決定的だからです。 テストが確実に落ちるようにtime.Sleep設定しましたが、なくすと、成功したり失敗したりするようになります。 ハートビートを使うことで決定的なテストとなります。

func TestDoWork_GeneratesAllNumbers(t *testing.T) {
    done := make(chan interface{})
    defer close(done)
    intSlice := []int{0, 1, 2, 3, 5}
    heartbeat, results := DoWork(done, intSlice...)
    // 必ずheartbeatが来てからテストを始めることでタイムアウトをなくす
    <-heartbeat
    i := 0
    for r := range results {
        if expected := intSlice[i]; r != expected {
            t.Errorf("index %v: expected %v, but received %v,", i, expected, r)
        }
        i++
    }
}

テストを実行すると次のような結果となります。

command-line-arguments 2.002s
ok

とここまで、仕事単位の最初に発生するハートビートの説明をしてたのに、初期化など最初に時間がかかる場合に完全に安全な方法として、定期的ハートビートの例が最後に出てきます(仕事単位の説明した意味無いのでは...)

package main

import (
    "testing"
    "time"
)

func DoWork(
    done <-chan interface{},
    pulseInterval time.Duration,
    nums ...int,
) (<-chan interface{}, <-chan int) {
    heartbeat := make(chan interface{}, 1)
    intStream := make(chan int)
    go func() {
        defer close(heartbeat)
        defer close(intStream)
        time.Sleep(2 * time.Second)
        pulse := time.Tick(pulseInterval)
    numLoop:
        for _, n := range nums {
            for { // 内側のループはnがintStreamに無事送信されるまで繰り返すもの
                select {
                case <-done:
                    return
                case <-pulse:
                    select {
                    case heartbeat <- struct{}{}:
                    default:
                    }
                case intStream <- n:
                    continue numLoop // 送信できた場合、外ループが進む
                }
            }
        }
    }()
    return heartbeat, intStream
}
func TestDoWork_GeneratesAllNumbers(t *testing.T) {
    done := make(chan interface{})
    defer close(done)
    intSlice := []int{0, 1, 2, 3, 5}
    const timeout = 2 * time.Second
    heartbeat, results := DoWork(done, timeout/2, intSlice...)
    // 最初にハートビートのが来るのを確認する
    <-heartbeat
    i := 0
    for {
        select {
        case r, ok := <-results:
            if ok == false {
                return
            } else if expected := intSlice[i]; r != expected {
                t.Errorf(
                    "index %v: expected %v, but received %v,", i,
                    expected,
                    r,
                )
            }
            i++
        case <-heartbeat: // タイムアウトが発生しないようにハートビートからも値を取得する
        case <-time.After(timeout):
            t.Fatal("test timed out")
        }
    }
}

このテストを実施すると次のような結果となります。

command-line-arguments 3.002s

ただ、最後のversionはテストが分かりにくいから、その前の方(単純なrangeだけのテスト)のほうでよいならそっちを使ったほうが良いと書かれています。テストの問題なのか!と思ってしまいますが...
最後に

長時間稼働しているゴルーチンや、テストが必要なゴルーチンを扱う際にはこのパターンを使うことを強くおすすめします。

と書かれています。

「Go言語による並行処理」の気になった所まとめ4 bridgeチャネル

復数のチャンネルをまとめて1つのチャンネルで扱うための方法です。
<-chan <-chan interface{}を1段潰した<-chan interface{}として扱えるようになります。サンプルでは、前の記事のorDoneチャンネルも利用しているのでdoneチャンネルからの受信によるキャンセルも対応しています。
<-chan <-chan interface{}は普通にプログラムを書いているとあまり出てこないかもしれないですが、この本のイディオムを使うようになれば出てくることもあると思いますが、未読の人には嫌な顔をされそう...

package main

import (
    "fmt"
)

func bridge(
    done <-chan interface{},
    chanStream <-chan <-chan interface{},
) <-chan interface{} {
    // bridgeからすべての値を返すチャネル
    valStream := make(chan interface{})

    go func() {
        defer close(valStream)
        // このループはchanStreamからチャネルを剥ぎ取り、ネストされたループに渡す
        for {
            var stream <-chan interface{}
            select {
            case maybeStream, ok := <-chanStream:
                if ok == false {
                    return
                }
                stream = maybeStream
            case <-done:
                return
            }
            // このループは渡されたチャネルから値を読み込みvalStreamにその値を渡す
            for val := range orDone(done, stream) {
                select {
                case valStream <- val:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

func orDone(done, c <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if ok == false {
                    return
                }
                select {
                case valStream <- v:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

// 10個のチャネルの列を作って、それぞれのチャネルに要素を1つだけ書き込む
func genVals() <-chan <-chan interface{} {
    chanStream := make(chan (<-chan interface{}))
    go func() {
        defer close(chanStream)
        for i := 0; i < 10; i++ {
            stream := make(chan interface{}, 1)
            stream <- i
            close(stream)
            chanStream <- stream
        }
    }()
    return chanStream
}

func main() {
    for v := range bridge(nil, genVals()) {
        fmt.Printf("%v ", v)
    }
}

このコードを実行すると下記の結果となります。

0 1 2 3 4 5 6 7 8 9

「Go言語による並行処理」の気になった所まとめ3 orチャネルとor-done チャネル

or チャネル

任意の数のチャンネルを待ちたいときに作るパターンです。
サンプルでは引数が0個のときにnilを返します。最初、再起だからなのかと思ったのですが、再起では必ず1個以上の引数になります。初回に0個の場合だけなので、0個の引数は取らないほうがいいような気がします。3.3章でnilチャンネルは読み込みも書き込みもブロックしてcloseはpanicになると書いていたのに...

ただ、このパターンを実装した場合、可読性が低いのでこの本を読んでいない人には何これって言われそうです...

package main

import (
  "fmt"
  "time"
)

// この関数はチャネルの可変長引数のスライスを受け取り、1つのチャネルを返す
func or(channels ...<-chan interface{}) <-chan interface{} {
  // 再帰関数なので、停止条件が必要
  // スライスが空の場合は単純にnilチャネルを返す。チャネルを渡さなかった場合と同義
  // 必ずorDoneは渡されるので、再起中は引数が0はありえない
  switch len(channels) {
  case 0:
    return nil
  case 1:
    // 可変長引数のスライスが1つしか要素を持っていない場合は、要素を返すだけ
    return channels[0]
  }
  orDone := make(chan interface{})
  // 関数の本体で再帰が発生する部分
  // ゴルーチンを作り、ブロックすることなく作ったチャネルにメッセージを受け取れるようする
  go func() {
    defer close(orDone)

    switch len(channels) {
    // orへの各再帰呼出しは少なくとも2つのチャネルを持っているので
    // ゴルーチンの数を制限するため、特別な条件を設定
    case 2:
      select {
      case <-channels[0]:
      case <-channels[1]:
      }
    default:
      // スライスの3番目以降のチャネルから再帰的にorチャネルを作成して、
      // そこからselectを行う
      // この再帰関係はスライスの残りの部分をorチャネルに分解して、
      // 最初のシグナルが返ってくる木構造を形成する
      // orDoneチャネルも渡して、木構造の上位の部分が終了したら
      // 下位の部分も終了するようにする
      select {
      case <-channels[0]:
      case <-channels[1]:
      case <-channels[2]:
      // 再起の場合、チャンネルは最低1は渡す
      case <-or(append(channels[3:], orDone)...):
      }
    }
  }()
  return orDone
}

//試す用のチャンネルを生成する関数
func sig(after time.Duration) <-chan interface{} {
  c := make(chan interface{})
  go func() {
    defer close(c)
    time.Sleep(after)
  }()
  return c
}

func main() {
  start := time.Now()
  // 最も早い1秒後に終わる
  <-or(
    sig(1*time.Second),
    sig(2*time.Hour),
    sig(5*time.Minute),
    sig(1*time.Hour),
  )
  fmt.Printf("done after %v", time.Since(start))
}

単純にキャンセルしたいだけならcontextパッケージを使ったほうがよいですね。本にも書いてます。
5.6 不健全なゴルーチンを直すのサンプルでorパターンが出てくるので書きました。

or-done チャネル

Goを書いていると目的のチャンネルとdoneチャンネルの最低2つselectする場合がよくあります。その場合、下記のようになります。

  for val := range myChan {
    // valに対して何かする
  }

// 次のように膨れ上がる

loop:
  for {
    select {
    case <-done:
      break loop
    case maybeVal, ok := <-myChan:
      if ok == false {
        return // あるいはforからbreakするとか
      }
      // valに対して何かする
    }
  }

上のイディオムをラップしたものがorDoneです。チャンネルが1つ増えているので嫌いな人はいるかも知れません...本では、とりあえず可読性のほうが重要だよね。問題になったら変えればいいじゃん的な雰囲気で推奨(?)しています。

func orDone(done, c <-chan interface{}) <-chan interface{} {
  valStream := make(chan interface{})
  go func() {
    defer close(valStream)
    for {
      select {
      case <-done:
        return
      case v, ok := <-c:
        if ok == false {
          return
        }
        select {
        case valStream <- v:
        case <-done:
        }
      }
    }
  }()
  return valStream
}

// またrangeが使える
for val := range orDone(done, myChan) { 
  // valに対して何かする
} 
``

Goのosパケージのerrorを1.12と1.13で比較する

Goの1.13でGo2で提案されていたerrorsの変更が入り(全部ではない?)、ErrorValueFAQに移行方法などが書いてあります。その中でosパッケージに触れてありました。移行サンプルとして挙げられているわけではないですが、独自の型を作成しているライブラリがどうやって移行しているのか気になったので調べてみました。
調べてみた結果、なんか複雑になったな、というのが感想です...
今回のerrorの変更と独自エラー型との相性はあんまりよくない気がします。 前の記事では(Goによる並行処理)ではエラーは独自エラーでラップしたほうがいいという感じでしたが...

ErrorValueFAQにも下記のようにエラーをラップすべきかというのは2つのトレードオフであると書かれています。 そして、errorsは中間的な選択だそうです。

I'm writing new code, with no clients. Should I wrap returned errors or not?
Since you have no clients, you aren't constrained by backwards compatibility. But you still need to balance two opposing considerations:

Giving client code access to underlying errors can help it make decisions, which can lead to better software.
Every error you expose becomes part of your API: your clients may come to rely on it, so you can't change it.
For each error you return, you have to weigh the choice between helping your clients and locking yourself in. Of course, this choice is not unique to errors; as a package author, you make many decisions about whether a feature of your code is important for clients to know or an implementation detail.

With errors, though, there is an intermediate choice: you can expose error details to people reading your code's error messages without exposing the errors themselves to client code. One way to do that is to put the details in a string using fmt.Errorf with %s or %v. Another is to write a custom error type, add the details to the string returned by its Error method, and avoid defining an Unwrap method.

1.12

package os

import (
    "errors"
)

var (
    ErrPermission = errors.New("permission denied")
)

type SyscallError struct {
    Syscall string
    Err     error
}

func (e *SyscallError) Error() string { return e.Syscall + ": " + e.Err.Error() }

func NewSyscallError(syscall string, err error) error {
    if err == nil {
        return nil
    }
    return &SyscallError{syscall, err}
}

func IsPermission(err error) bool {
    return isPermission(err)
}

// underlyingError returns the underlying error for known os error types.
func underlyingError(err error) error {
    switch err := err.(type) {
    case *PathError:
        return err.Err
    case *LinkError:
        return err.Err
    case *SyscallError:
        return err.Err
    }
    return err
}
// +build aix darwin dragonfly freebsd js,wasm linux nacl netbsd openbsd solaris

package os

import "syscall"

func isPermission(err error) bool {
    err = underlyingError(err)
    return err == syscall.EACCES || err == syscall.EPERM || err == ErrPermission
}

1.13

package os

import (
    "internal/oserror"
)

var (
    ErrPermission = errPermission() // "permission denied"
)

func errPermission() error { return oserror.ErrPermission }

type SyscallError struct {
    Syscall string
    Err     error
}

func (e *SyscallError) Error() string { return e.Syscall + ": " + e.Err.Error() }

func (e *SyscallError) Unwrap() error { return e.Err }

func NewSyscallError(syscall string, err error) error {
    if err == nil {
        return nil
    }
    return &SyscallError{syscall, err}
}

func IsPermission(err error) bool {
    return underlyingErrorIs(err, ErrPermission)
}

func underlyingErrorIs(err, target error) bool {
    // Note that this function is not errors.Is:
    // underlyingError only unwraps the specific error-wrapping types
    // that it historically did, not all errors.Wrapper implementations.
    err = underlyingError(err)
    if err == target {
        return true
    }
    // To preserve prior behavior, only examine syscall errors.
    e, ok := err.(syscallErrorType)
    return ok && e.Is(target)
}

// underlyingError returns the underlying error for known os error types.
func underlyingError(err error) error {
    switch err := err.(type) {
    case *PathError:
        return err.Err
    case *LinkError:
        return err.Err
    case *SyscallError:
        return err.Err
    }
    return err
}
// +build !plan9

package os

import "syscall"

type syscallErrorType = syscall.Errno
package oserror

import "errors"

var (
    ErrPermission = errors.New("permission denied")
)
package syscall

import (
    "internal/oserror"
)

type Errno uintptr

func (e Errno) Is(target error) bool {
    switch target {
    case oserror.ErrPermission:
        return e == EACCES || e == EPERM
    case oserror.ErrExist:
        return e == EEXIST || e == ENOTEMPTY
    case oserror.ErrNotExist:
        return e == ENOENT
    }
    return false
}

ちなみに、PathErrorはUnwapメソッドが増えているだけです。

1.12

type PathError struct {
    Op   string
    Path string
    Err  error
}

func (e *PathError) Error() string { return e.Op + " " + e.Path + ": " + e.Err.Error() }

1.13

type PathError struct {
    Op   string
    Path string
    Err  error
}

func (e *PathError) Error() string { return e.Op + " " + e.Path + ": " + e.Err.Error() }

func (e *PathError) Unwrap() error { return e.Err }

「Go言語による並行処理」の気になった所まとめ2 エラー伝播

前回の記事に続き「Goによる並行処理」の気になった所を書いて行きます。

今回は、 5.1 エラー伝播 について書きたいと思います。 4.5エラーハンドリングという箇所で、並行処理の中でエラーを握りつぶすべきではない、Ether(RustだとResult)のようなもので成功と失敗のどちらかを持つ結果を返すべきと書いてありました。この章ではエラーをどう伝えていくのかという方に焦点があたっています。 そして、データの流れは慎重に設計するが、エラーの流れはあまり考えられていないと問題視しています。

エラーで必要なこととして下記のことが挙げられています。

  • 何が起きたのか
  • いつどこでエラーが発生したか
  • ユーザー向けの読みやすいメッセージ
  • ユーザーがさらに情報を得るにはどうするべきか

また、すべてのエラーは次の2つのうちのどちらかに分類できると書かれています。

  • バグ
  • 既知のエッジケース(例: ネットワーク接続の切断、ディスクへの書き込みの失敗など)

上記のことを踏まえて、エラーはコンポーネントごとにふさわしい形に変換して返すべきと言う主張があります。下記の引用とサンプルコードが記載されています。

エラーが「低水準コンポーネント」で発生したとして、上位のスタックに渡されるべくエラーがきちんとした形になっていたとしましょう。 「低水準コンポーネント」の文脈では、このエラーはきちんとした形になっていると思っていたかもしれませんが、それを含むシステム全体の文脈ではそうではないかもしれません。 各コンポーネントの境界では、下から上がってきたエラーは自分のコンポーネント向けにきちんとした形のエラーになるように包んで整えてやらなければなりません。

func PostReport(id string) error {
    result, err := lowlevel.DoWork()
    if err != nil {
        // エラーがきちんとした形になっているか確認して、
        // そうでなければ誤った形のエラーを単に上位のスタックに戻して、バグである旨を示唆する
        if _, ok := err.(lowlevel.Error); ok {
            // 自分のモジュール向けの付加情報とともにやってきたエラーを包んで新しい型にする
            err = WrapErr(err, "cannot post report with id %q", id)
        }
        return err
    }
    // ...
}

エラーをラップするというのは、不要な情報を隠蔽することにもなり注意が必要ではあると書かれています。 また、自分たちの作った型のエラーであればコントロールできているが、そうでない不正なエラーの場合はバグであるとみなすことができるとのことです。その不正なエラーを潰していくことでシステムは成長していくということを主張しています。不正なエラーの場合は少なくとも予期しないエラーが発生ししたことをユーザーに伝えるべきとのことです。

下記が、サンプルコードを動く形にしたものとなります。runJobBadlowlevelモジュールのエラーをラップしていないので、具体的なエラーが不明となっています。一方runJobはラップすることでどういうことをしようとしてエラーが発生したのかを伝えることとなっています。

package main

import (
    "fmt"
)

type MyError struct {
    Inner      error
    Message    string
    StackTrace string
    Misc       map[string]interface{}
}

func wrapError(err error, messagef string, msgArgs ...interface{}) MyError {
    return MyError{
        // 包んでいるエラーを保管する。調査する必要があるときに低水準のエラー見れるようにする
        Inner:   err,
        Message: fmt.Sprintf(messagef, msgArgs...),
        // エラーが作られたときにスタックトレースを記録するためのもの
        StackTrace: string(debug.Stack()),
        // 雑多な情報を保管するための場所
        // エラーの診断をする際に助けになる並行処理のIDやスタックトレースのハッシュ、
        // あるいは他のコンテキストに関する情報を保管する
        Misc: make(map[string]interface{}),
    }
}

func (err MyError) Error() string {
    return err.Message
}

// "lowlevel" モジュール
type LowLevelErr struct {
    error
}

func isGloballyExec(path string) (bool, error) {
    info, err := os.Stat(path)
    if err != nil {
        // os.Statの呼び出しから発生する生のエラーをカスタマイズしたエラーで内包する
        // 今回の場合、このエラーから出てくるメッセージは特に問題ないので、それをそのまま使う
        return false, LowLevelErr{(wrapError(err, err.Error()))}
    }
    return info.Mode().Perm()&0100 == 0100, nil
}

// lowlevelパッケージの関数を呼び出す別のモジュール intermediate

// "intermediate" モジュール
type IntermediateErr struct {
    error
}

// エラーをラップしない、よくないversion
func runJobBad(id string) error {
    const jobBinPath = "/bad/job/binary"
    isExecutable, err := lowlevel.isGloballyExec(jobBinPath)
    if err != nil {
        // lowlevelモジュールからのエラーを渡す
        // 独自の型で内包されないまま他のモジュールから渡されたエラーをバグとみなすので、
        // この実装は後々問題となる
        return err
    } else if isExecutable == false {
        return wrapError(nil, "job binary is not executable")
    }
    return exec.Command(jobBinPath, "--id="+id).Run()
}

func runJob(id string) error {
    const jobBinPath = "/bad/job/binary"
    isExecutable, err := isGloballyExec(jobBinPath)
    if err != nil {
        // 付加情報を加えたメッセージでエラーをカスタマイズする
        return IntermediateErr{
            wrapError(
                err,
                "cannot run job %q: requisite binaries not available",
                id)}
    } else if isExecutable == false {
        return wrapError(
            nil,
            "cannot run job %q: requisite binaries are not executable",
            id,
        )
    }
    return exec.Command(jobBinPath, "--id="+id).Run()
}

func handleError(key int, err error, message string) {
    // 何が起きたかを掘り下げる必要が出てきたときのためにすべてのエラーをログ出力する
    // これを実行すると、次のようなログメッセージが表示される
    log.SetPrefix(fmt.Sprintf("[logID: %v]: ", key))
    log.Printf("%#v", err)
    fmt.Printf("[%v] %v", key, message)
}

func main() {
    log.SetOutput(os.Stdout)
    log.SetFlags(log.Ltime | log.LUTC)
    err := runJob("1")
    if err != nil {
        // ここでエラーが期待した型かどうかを確認する
        // きちんとした形式のエラーならば、メッセージを単純にそのままユーザーに渡せる
        msg := "There was an unexpected issue; please report this as a bug."
        if _, ok := err.(IntermediateErr); ok {
            msg = err.Error()
        }
        // この行ではログとエラーメッセージをID1として紐付けている
        handleError(1, err, msg)
    }
}

「Go言語による並行処理」の気になった所まとめ1 Goの並行処理

並列処理を使いたかったので読んでみました。 サンプルコードがクロージャーのネストが激しく辛いですが...いい本だと思います。

www.oreilly.co.jp

5.6 不健全なゴルーチンを直す という章がとにかく難しい...
必要な知識だったので、ここを目的に何回か記事を書いていきます。
その前段として、Goの並行処理のことを少し書きます。

ゴールーチン

ゴールーチンはコルーチンに似ていますが、一般的なコルーチンは一時停止ポイントや再エントリーポイントを自分で記述します。 しかし、ゴールーチンはランタイムと密結合しているため、プログラマーがそのようなことを書かず、ランタイムが勝手に判断して一時停止と再開をしてくれます。 コルーチン・ゴルーチンは、暗黙的には並行処理の構成要素ですが、並行性というのはコルーチンの性質ではありません。 とちゃんと書いてあるのがこの本のいいところですね。
また、ゴルーチンのM:N スケジューラーについても詳しく書いてあるので、読んでみてください。

CSP

CSPは「Communicating Sequential Processes」の略で、手法とそれを紹介した論文のタイトルの両方を指します。 CSPってあんまり他の言語では聞かないですが(古典過るから?)、 ここのまとめを見るとHaskellのMvarは影響を受けているんですね。Rustの方は具体的にはよく分からず...

入力と出力がプログラミング、特に並行なコードにおいてのプリミティブとして見落とされている2つの要素だと提案しています。 と記述されていてプリミティブとしてこだわっているんだなーと思いました。個人的にはRustみたいなstd::sync::mpscモジュール形式でもいいんじゃないかと思うのですが... また、CSPはやり取りするもの(Channel)に名前をつけて送信者を指定しないがActor Modelだと送信を指定するがやり取りするものには名前をつけないというのが考え方の違いとして面白いです。参考

sync.Mutexを使うか?

また、いくつもの記事、講義あるいはGoコアチームへのインタビューの内容がsync.MutexよりもCSP形式のプリミティブを支持しています。
それゆえ、Goチームがメモリアクセス同期のプリミティブを公開した理由を考えて混乱してしまうのはまったくもって無理のないことです。
さらに混乱させるのが、メモリアクセス同期のプリミティブが野放図に使われていたり、人々がチャネルの濫用を非難していたり、
Goチームのメンバーがメモリアクセス同期の利用は別に構わないと言っているのを見かけることです。
Go Wiki(https://github. com/golang/go/wiki/MutexOrChannel)での問題の箇所です。
Goのモットーの1つに「通信によってメモリを共有し、メモリの共有によって通信してはいけない」というものがあります。
とは言っても、Goでは syncパッケージで伝統的なロック機構を提供しています。
ロックに関するたいていの問題はチャネルか伝統的なロックを用いることで解決します。
ではどちらを使うべきなのでしょう。
最も適切に表現できて、かつ、あるいはまたは、最も簡潔に書けるならどちらでも構いません。

sync.Mutexは場合によっては使っていいという感じなんですね。
調べると、netパッケージのsocketでCloseではsync.Mutexを使っていて、Dialではchannelを使っているというのもありました。

使い分けのチャートが本に記載されていました。

f:id:t10471:20190831181229p:plain