「Go言語による並行処理」の気になった所まとめ4 bridgeチャネル

復数のチャンネルをまとめて1つのチャンネルで扱うための方法です。
<-chan <-chan interface{}を1段潰した<-chan interface{}として扱えるようになります。サンプルでは、前の記事のorDoneチャンネルも利用しているのでdoneチャンネルからの受信によるキャンセルも対応しています。
<-chan <-chan interface{}は普通にプログラムを書いているとあまり出てこないかもしれないですが、この本のイディオムを使うようになれば出てくることもあると思いますが、未読の人には嫌な顔をされそう...

package main

import (
    "fmt"
)

func bridge(
    done <-chan interface{},
    chanStream <-chan <-chan interface{},
) <-chan interface{} {
    // bridgeからすべての値を返すチャネル
    valStream := make(chan interface{})

    go func() {
        defer close(valStream)
        // このループはchanStreamからチャネルを剥ぎ取り、ネストされたループに渡す
        for {
            var stream <-chan interface{}
            select {
            case maybeStream, ok := <-chanStream:
                if ok == false {
                    return
                }
                stream = maybeStream
            case <-done:
                return
            }
            // このループは渡されたチャネルから値を読み込みvalStreamにその値を渡す
            for val := range orDone(done, stream) {
                select {
                case valStream <- val:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

func orDone(done, c <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if ok == false {
                    return
                }
                select {
                case valStream <- v:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

// 10個のチャネルの列を作って、それぞれのチャネルに要素を1つだけ書き込む
func genVals() <-chan <-chan interface{} {
    chanStream := make(chan (<-chan interface{}))
    go func() {
        defer close(chanStream)
        for i := 0; i < 10; i++ {
            stream := make(chan interface{}, 1)
            stream <- i
            close(stream)
            chanStream <- stream
        }
    }()
    return chanStream
}

func main() {
    for v := range bridge(nil, genVals()) {
        fmt.Printf("%v ", v)
    }
}

このコードを実行すると下記の結果となります。

0 1 2 3 4 5 6 7 8 9