「Go言語による並行処理」の気になった所まとめ6 不健全なゴルーチンを直す

やっと最後の記事になりました。 不健全なゴルーチンを直す では長時間動くプロセスの場合、ゴルーチンも生存期間が長くなります。関心の分離からゴルーチン自身が回復の方法を知らないほうが良いとのことです。このゴルーチンを再起動する流れを「回復」と呼ぶそうです。 回復にはハートビートパターンを使って行います。
本ではゴルーチンの健全性を監視するロジックを管理人 (steward)とよび、管理人が監視するゴ ルーチンを中庭 (ward)と呼んでいます。 管理人は中庭にいるゴルーチンが不健全になったら再起動する責任も負っています。そのためには、そのゴルーチンを起動する関数への参照が必要になります。
下記が管理人のサンプル実装です。

package main

import (
    "log"
    "os"
    "time"
)

// 監視と再起動ができるゴルーチンのシグネチャの定義
// doneチャネルとハートビートパターンのpulseIntervalとheartbeatを使用
type startGoroutineFn func(
    done <-chan interface{},
    pulseInterval time.Duration,
) (heartbeat <-chan interface{})

// 管理人が監視するゴルーチンのためのtimeoutと、
// 監視するゴルーチンを起動するための startGoroutine関数をとり、
// 管理人自身はstartGoroutineFnを返していているのでそれ自体も監視対象となりえる
func newSteward(
    timeout time.Duration,
    startGoroutine startGoroutineFn,
) startGoroutineFn {
    return func(
        done <-chan interface{},
        pulseInterval time.Duration,
    ) <-chan interface{} {
        heartbeat := make(chan interface{})
        go goFn(done, heartbeat, pulseInterval, timeout, startGoroutine)
        return heartbeat
    }
}

// 監視しているゴルーチンを起動するための関数
func startWard(
    done <-chan interface{},
    timeout time.Duration,
    startGoroutine startGoroutineFn,
) (
    wardDone chan interface{},
    wardHeartbeat <-chan interface{},
) {
    //  停止グナルを送る必要がある場合に備えて、中庭のゴルーチンに渡す新しいチャネルを作成
    wardDone = make(chan interface{})
    // 監視対象のゴルーチンを起動する
    // 管理人自体が停止するか、管理人が中庭のゴルーチンを停止させる場合があるので、
    // 両方のdoneチャネルをorの中に内包させる
    wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2)
    return
}

func goFn(
    done <-chan interface{},
    heartbeat chan interface{},
    pulseInterval time.Duration,
    timeout time.Duration,
    startGoroutine startGoroutineFn,
) {
    defer close(heartbeat)
    var wardDone chan interface{}
    var wardHeartbeat <-chan interface{}
    wardDone, wardHeartbeat = startWard(done, timeout, startGoroutine)
    pulse := time.Tick(pulseInterval)
monitorLoop:
    for {
        timeoutSignal := time.After(timeout)
        for { // 2重ループにすることで、管理人が自身の鼓動を確実に外へと送信できるようにしている
            select {
            case <-pulse:
                select {
                case heartbeat <- struct{}{}:
                default:
                }
            case <-wardHeartbeat: // 中庭の鼓動を受信したら、監視のループを継続する
                continue monitorLoop
            case <-timeoutSignal:
                // 鼓動が受信できなければ、中庭に停止するようリクエストし、
                // 新しい中庭のゴルーチンを起動しはじめる。その後、監視を続ける
                log.Println("steward: ward unhealthy; restarting")
                close(wardDone)
                wardDone, wardHeartbeat = startWard(done, timeout, startGoroutine)
                continue monitorLoop
            case <-done:
                return
            }
        }
    }
}

func main() {
    log.SetOutput(os.Stdout)
    log.SetFlags(log.Ltime | log.LUTC)
    // doWorkが起動するゴルーチンのための管理人を作る関数を作成
    // doWorkのタイムアウトは4秒に設定
    doWorkWithSteward := newSteward(4*time.Second, doWork)
    done := make(chan interface{})
    // 9秒後に管理人を停止させる、そうすると中庭も停止してこの例が終わる
    time.AfterFunc(9*time.Second, func() {
        log.Println("main: halting steward and ward.")
        close(done)
    })
    // 管理人を起動して、鼓動をrangeで回して調べて、例が終了してしまうのを防ぐ
    // pulseIntervalとして4秒を設定
    for range doWorkWithSteward(done, 4*time.Second) {
    }
    log.Println("Done")

}

func doWork(done <-chan interface{}, _ time.Duration) <-chan interface{} {
    log.Println("ward: Hello, I'm irresponsible!")
    go func() {
        // 何もしておらず、キャンセルされるのを待っている。
        // また、鼓動をまったく送信していない
        <-done
        log.Println("ward: I am halting.")
    }()
    return nil
}

// この関数はチャネルの可変長引数のスライスを受け取り、1つのチャネルを返す
func or(channels ...<-chan interface{}) <-chan interface{} {
    // 再帰関数なので、停止条件が必要
    // スライスが空の場合は単純にnilチャネルを返す。チャネルを渡さなかった場合と同義
    // 必ずorDoneは渡されるので、再起中は引数が0はありえない
    switch len(channels) {
    case 0:
        return nil
    case 1:
        // 可変長引数のスライスが1つしか要素を持っていない場合は、要素を返すだけ
        return channels[0]
    }
    orDone := make(chan interface{})
    // 関数の本体で再帰が発生する部分
    // ゴルーチンを作り、ブロックすることなく作ったチャネルにメッセージを受け取れるようする
    go func() {
        defer close(orDone)

        switch len(channels) {
        // orへの各再帰呼出しは少なくとも2つのチャネルを持っているので
        // ゴルーチンの数を制限するため、特別な条件を設定
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            // スライスの3番目以降のチャネルから再帰的にorチャネルを作成して、
            // そこからselectを行う
            // この再帰関係はスライスの残りの部分をorチャネルに分解して、
            // 最初のシグナルが返ってくる木構造を形成する
            // orDoneチャネルも渡して、木構造の上位の部分が終了したら
            // 下位の部分も終了するようにする
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            // 再起の場合、チャンネルは最低1は渡す
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}

この例では次のような出力を得られます。

18:28:07 ward: Hello, I'm irresponsible! 
18:28:11 steward: ward unhealthy; restarting 
18:28:11 ward: Hello, I'm irresponsible! 
18:28:11 ward: I am halting.
18:28:15 steward: ward unhealthy; restarting 
18:28:15 ward: Hello, I'm irresponsible! 
18:28:15 ward: I am halting.
18:28:16 main: halting steward and ward. 
18:28:16 ward: I am halting.
18:28:16 Done

次に、中庭を抽象的にすることで任意のクロージャーを受け取れるように修正した例です。 この例が自分の目当てのサンプルだったんですが、、、難しい...
本のプログラムはクロージャーを多様していたのですが、ネストすると見にくいのでできるだけ関数にしています。
サンプルの中でbridge チャネルが登場します。ここがよくわからないポイントだったのですが、回復が発生するとそれまで使っていたチャンネルが閉じられてしまうのでずっと閉じられないチャンネルでラップすることでチャンネルのハンドリングをシンプルにしているようです。
また、ハートビートが2つ出てくるのも非常に混乱をしますが、中庭と仕事、それぞれでハートビートをしているというのがわかると理解が進みました。
また、take だけ記事で説明していません...Haskellの無限リストを指定の場所まで読み取る関数のチャンネル版です。

package main

import (
    "fmt"
    "log"
    "os"
    "time"
)

// 監視と再起動ができるゴルーチンのシグネチャの定義
// doneチャネルとハートビートパターンのpulseIntervalとheartbeatを使用
type startGoroutineFn func(
    done <-chan interface{},
    pulseInterval time.Duration,
) (heartbeat <-chan interface{})

// 管理人が監視するゴルーチンのためのtimeoutと、
// 監視するゴルーチンを起動するための startGoroutine関数をとり、
// 管理人自身はstartGoroutineFnを返していているのでそれ自体も監視対象となりえる
func newSteward(
    timeout time.Duration,
    startGoroutine startGoroutineFn,
) startGoroutineFn {
    return func(
        done <-chan interface{},
        pulseInterval time.Duration,
    ) <-chan interface{} {
        heartbeat := make(chan interface{})
        go goFn(done, heartbeat, pulseInterval, timeout, startGoroutine)
        return heartbeat
    }
}

func goFn(
    done <-chan interface{},
    heartbeat chan interface{},
    pulseInterval time.Duration,
    timeout time.Duration,
    startGoroutine startGoroutineFn,
) {
    defer close(heartbeat)
    var wardDone chan interface{}
    var wardHeartbeat <-chan interface{}
    wardDone, wardHeartbeat = startWard(done, timeout, startGoroutine)
    pulse := time.Tick(pulseInterval)
monitorLoop:
    for {
        timeoutSignal := time.After(timeout)
        for { // 2重ループにすることで、管理人が自身の鼓動を確実に外へと送信できるようにしている
            select {
            case <-pulse:
                select {
                case heartbeat <- struct{}{}:
                default:
                }
            case <-wardHeartbeat: // 中庭の鼓動を受信したら、監視のループを継続する
                continue monitorLoop
            case <-timeoutSignal:
                // 鼓動が受信できなければ、中庭に停止するようリクエストし、
                // 新しい中庭のゴルーチンを起動しはじめる。その後、監視を続ける
                log.Println("steward: ward unhealthy; restarting")
                close(wardDone)
                wardDone, wardHeartbeat = startWard(done, timeout, startGoroutine)
                continue monitorLoop
            case <-done:
                return
            }
        }
    }
}

// 監視しているゴルーチンを起動するための関数
func startWard(
    done <-chan interface{},
    timeout time.Duration,
    startGoroutine startGoroutineFn,
) (
    wardDone chan interface{},
    wardHeartbeat <-chan interface{},
) {
    //  停止グナルを送る必要がある場合に備えて、中庭のゴルーチンに渡す新しいチャネルを作成
    wardDone = make(chan interface{})
    // 監視対象のゴルーチンを起動する
    // 管理人自体が停止するか、管理人が中庭のゴルーチンを停止させる場合があるので、
    // 両方のdoneチャネルをorの中に内包させる
    wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2)
    return
}

func main() {
    log.SetFlags(log.Ltime | log.LUTC)
    log.SetOutput(os.Stdout)
    done := make(chan interface{})
    defer close(done)
    // 中庭の関数を作る。整数の可変長引数のスライスを内包して、やり取りに使えるストリームを返す
    doWork, intBridgeStream := doWorkFn(done, 1, 2, -1, 3, 4, 5)
    // doWorkのクロージャーを監視する管理人を作成。すぐ失敗が発生するので監視期間を1ミリ秒に設定
    doWorkWithSteward := newSteward(1*time.Millisecond, doWork)
    // 管理人に中庭を起動して監視を開始するように伝える
    doWorkWithSteward(done, 1*time.Hour)
    // 最後に、構築したパイプラインのステージの1つを使ってintStreamから最初の6つの値を取得する
    for intVal := range take(done, intBridgeStream, 6) {
        fmt.Printf("Received: %v\n", intVal)
    }
}

// 中庭に囲い込んでもらいたい値を取って、中庭からのやり取りに使うチャネルを返す
func doWorkFn(
    done <-chan interface{},
    intList ...int,
) (
    startGoroutineFn, // 中庭の関数
    <-chan interface{},
) {
    // ブリッジパターンとしてチャネルのチャネルを作る
    intChanStream := make(chan (<-chan interface{}))
    intBridgeStream := bridge(done, intChanStream)
    doWork := func(
        done <-chan interface{},
        pulseInterval time.Duration,
    ) <-chan interface{} { // 管理人に監視されるクロージャーを作成
        // wardHeartbeatになる
        heartbeat := make(chan interface{})
        go doWorkGoFn(done, intChanStream, heartbeat, pulseInterval, intList)
        return heartbeat
    }
    return doWork, intBridgeStream
}

func doWorkGoFn(done <-chan interface{},
    intChanStream chan (<-chan interface{}),
    heartbeat chan<- interface{},
    pulseInterval time.Duration,
    intList []int,
) {
    // 中庭のゴルーチンのインスタンスの中でやり取りするチャネルを初期化
    intStream := make(chan interface{})
    defer close(intStream)
    select {
    case intChanStream <- intStream: // 新しいチャネルをブリッジに知らせる
    case <-done:
        return
    }
    pulse := time.Tick(pulseInterval)
    for {
    valueLoop:
        for _, intVal := range intList {
            if intVal < 0 {
                // 負の数字が来たときにエラーのログを取ってゴルーチンからreturnすることで、
                // 不健全な中庭をシミュレートしている
                log.Printf("negative value: %v\n", intVal)
                return
            }
            for {
                select {
                case <-pulse:
                    select {
                    case heartbeat <- struct{}{}:
                    default:
                    }
                case intStream <- intVal:
                    continue valueLoop
                case <-done:
                    return
                }
            }
        }
    }

}

func orDone(done, c <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if ok == false {
                    return
                }
                select {
                case valStream <- v:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

func bridge(
    done <-chan interface{},
    chanStream <-chan <-chan interface{},
) <-chan interface{} {
    // bridgeからすべての値を返すチャネル
    valStream := make(chan interface{})

    go func() {
        defer close(valStream)
        // このループはchanStreamからチャネルを剥ぎ取り、ネストされたループに渡す
        for {
            var stream <-chan interface{}
            select {
            case maybeStream, ok := <-chanStream:
                if ok == false {
                    return
                }
                stream = maybeStream
            case <-done:
                return
            }
            // このループは渡されたチャネルから値を読み込みvalStreamにその値を渡す
            for val := range orDone(done, stream) {
                select {
                case valStream <- val:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

func take(
    done <-chan interface{},
    valueStream <-chan interface{},
    num int,
) <-chan interface{} {
    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
            case <-done:
                return
            case takeStream <- <-valueStream:
            }
        }
    }()
    return takeStream
}

// この関数はチャネルの可変長引数のスライスを受け取り、1つのチャネルを返す
func or(channels ...<-chan interface{}) <-chan interface{} {
    // 再帰関数なので、停止条件が必要
    // スライスが空の場合は単純にnilチャネルを返す。チャネルを渡さなかった場合と同義
    // 必ずorDoneは渡されるので、再起中は引数が0はありえない
    switch len(channels) {
    case 0:
        return nil
    case 1:
        // 可変長引数のスライスが1つしか要素を持っていない場合は、要素を返すだけ
        return channels[0]
    }
    orDone := make(chan interface{})
    // 関数の本体で再帰が発生する部分
    // ゴルーチンを作り、ブロックすることなく作ったチャネルにメッセージを受け取れるようする
    go func() {
        defer close(orDone)

        switch len(channels) {
        // orへの各再帰呼出しは少なくとも2つのチャネルを持っているので
        // ゴルーチンの数を制限するため、特別な条件を設定
        case 2:
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default:
            // スライスの3番目以降のチャネルから再帰的にorチャネルを作成して、
            // そこからselectを行う
            // この再帰関係はスライスの残りの部分をorチャネルに分解して、
            // 最初のシグナルが返ってくる木構造を形成する
            // orDoneチャネルも渡して、木構造の上位の部分が終了したら
            // 下位の部分も終了するようにする
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            // 再起の場合、チャンネルは最低1は渡す
            case <-or(append(channels[3:], orDone)...):
            }
        }
    }()
    return orDone
}
Received: 1
23:25:33 negative value: -1
Received: 2
23:25:33 steward: ward unhealthy; restarting

Received: 1
23:25:33 negative value: -1
Received: 2
23:25:33 steward: ward unhealthy; restarting Received: 1
23:25:33 negative value: -1
Received: 2

(補足)
サンプルではエラーが発生したものをスキップしていないですが、スキップするようにするには、 リストを消費するようにします。

valueLoop:
for _, intVal := range intList {
// ...
 }

// 代わりに下にする

valueLoop: for {
intVal := intList[0] 
intList = intList[1:] // ...
}

ということで、Go言語による並行処理の気になったところを見てきました。
私が書いていない部分も非常にためになる部分が多いので是非、一読してみてください。