x/sync/errgroupの亜種みたいなものを考えたりしていた

突然ですがx/sync/errgroupはとても便利です。ちょっと並行で外部リソースにアクセスしたいときなどはsync/WaitGroupで書いてエラーの扱いを自分で頑張るよりerrgroupに任せた方が楽です。

g, ctx := errgroup.WithContext(ctx)

// ここでのf0,f1,..などはgoroutine経由で並行で実行される
g.Go(f0)
g.Go(f1)
g.Go(f2)

if err := g.Wait(); err != nil {
    doSomething(err)
}

errgroupの振る舞いについて

そんな便利なerrgroupなのですが。同期的な疑似コードを書いてみると以下の様な振る舞いをするものとみなすことができます。

for f1 := range fs {
    // 実際はgorountine経由で並行に実行される
    if err := f(ctx); err != nil {
        return err
    }
}

そうです。やってることは順に実行してエラーが発生した時点で処理を中断するというような形式のものです。

errgroupの亜種

ちょっとだけ発想を変えてみましょう。先程のerrgroupによる処理を読み込むと実行される列のようなもので考えてみます。

f0 and f1 and f2 and f3 // ここでf0,f1,...はfsの要素

すると成功したら(失敗しなかったら)処理を継続する(and)処理と見ることができます。個人的にはこの見た目が論理演算のandと似たような形のように見えました。andがあるならorについても考えることができそうです。

さて以下のようなorによる演算に対応する処理はどういったものでしょう?

f0 or f1 or f2 or f3 // ここでf0,f1,...はfsの要素

orによる連鎖は、たとえ失敗したとしても次の処理を実行していくものと考えることにします(一度trueになったら短絡して処理を打ち切りということは考えない)。すると今度は以下の様な疑似コードを模した構造のものが考えられそうです。

var errs []error

for f := range fs {
    // 実際はgorountine経由で並行に実行される
    if err := f(ctx); err != nil {
        errs = append(errs, err)
    }
}

// ここでerrsに対して何か処理を行う

そうです。error値がたとえあったとしても処理を継続しつつerror値はどこかに格納しておくみたいな処理になっています。格納する場所はどのような形式でも良いのですが。上の疑似コード上ではsliceということにしています。

こういった構造を持ったヘルパーをerrgroupの亜種として考えることはできるでしょうか?というのが今回の話です。

chan error ?

名前は特に良いものが思いつかなかったのでテキトウにConditionalとか付けて作ってみます。errgroupのGroupと対応づけるとすると以下の様なインターフェイスを持っているはずです。

type ErrGroupLike interface {
    Go(func() error)
    Wait() error
}

ここでちょっと考えてみると、Wait()は違いそうです。エラーが複数存在しているはずなので。そして使用しているときの状況も考えてみたのですが。error値を取得したらすぐにその場で消化したいという気持ちになりました(例えば100個くらいタスクが存在してその5個目のタスクで発生したエラーを全部のタスクが終わってからみたいでしょうか?ということなどを考えたら発生したエラーは発生したタイミングでハンドリングしたいと思いました)。

channel as iterator

発生したerror値をその場で使いたいと来たらchannelです。実際channelを一種のiteratorと見做す使いかたはたまに存在するデザインパターンの一種だと思います。例えば以下の様に。

func generator() chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        ch <- 1
        time.Sleep(300 * time.Millisecond)
        ch <- 2
        time.Sleep(300 * time.Millisecond)
        ch <- 3
    }()
    return ch
}

func main() {
    for n := range generator() {
        log.Println(n)
    }
}

// 2018/03/24 22:58:15 1
// 2018/03/24 22:58:16 2
// 2018/03/24 22:58:16 3

内部的にはスリープしたりなどの待つ処理が入るのですが。channelをrangeでforループすると他の言語のイテレータ(ジェネレータ)のように扱うことができます。ちなみに複数のchannelを同時に消費したい場合には、forループの部分で複数このgoroutineを回すだとか。rangeによるforループを諦めるといったことをする必要があります。

複数のworkerで消費する例

var wg sync.WaitGroup
wg.Add(5)

ch := generator()
for i := 0; i < 5; i++ {
    i := i
    go func(){
        defer wg.Done()
        for n := range ch {
            log.Println(n)
        }
    }()
}

wg.Wait()

複数のproducerを消費する例。

var wg sync.WaitGroup
wg.Add(2)

ch0 := generator()
ch1 := generator()

go func(){
    defer wg.Done()
    for n := range ch0 {
        log.Println(0, n)
    }
}()
go func(){
    defer wg.Done()
    for n := range ch1 {
        log.Println(1, n)
    }
}()

ちなみに複数のproducerを1つのworkerで消費することもできるのですが。終了条件の扱いなどでちょっと大変になります。実際の所rangeでのchannelのループは以下のような記述の省略形です。

// for n := range ch {
//     log.Println(n)
// }

for {
    select {
        n, ok := <-ch:
        if !ok {
            break
        }
        log.Println(n)
    }
}

このselectでのokの部分はcloseされたchannelでfalseの値がかえってきます。つまりforループでchannelを使うにはcloseが必須ということになります。どうしてこのcloseの必要性の話まで丁寧に書いてきたかと言うと後々必要になるからです。

もうちょっとだけインターフェイスについて考える

channelをイテレータと見做すという話を含めると、以下の様なインターフェイスの方が妥当かもしれません。

type ErrGroupLike interface {
    Go(func() error)
    Wait() <-chan error
}

実はこれではだめです。理由は単純で使用するときコードは以下の様になるのですが。goのruntimeは全てのgoroutineが止っていることを許しません。例えばf0,f1でsleepのような処理が止まる処理が存在してしまった場合に、c.Wait()部分のmainの処理も止まってしまうのでだめです。

c.Go(f0)
c.Go(f1)

for err := range c.Wait() {
    log.Println("error", err)
}

// fatal error: all goroutines are asleep - deadlock!

そんなわけでちょっとだけ変更した以下のようなインターフェイスにします。

type ErrGroupLike interface {
    Go(func() error)
    Wait(func(<-chan error))
}

使いかたはこうです。以下のコードは、時折2で割り切れる数の場合にエラーが発生するのですが。その発生したエラーは発生したタイミングですぐにログに出力されて欲しいと言うようなもの例です。

c := New()
for i := 0; i < 5; i++ {
    i := i
    c.Go(func() error {
        n := int(rand.Float64() * 1000)
        log.Println("start", i, n)

        time.Sleep(time.Duration(n) * time.Millisecond)

        if n%2 == 0 {
            return errors.Errorf("%d %d", i, n)
        }

        log.Println("end  ", i, n)
        return nil
    })
}

c.WaitWith(func(errch <-chan error) {
    for err := range errch {
        log.Println("error", err)
    }
})
fmt.Println("done")

失敗した場合も即座にエラーが出力されてますね(start..endが終わってからerrorが来ていない)。

2018/03/24 23:40:33 start 2 940
2018/03/24 23:40:33 start 0 664
2018/03/24 23:40:33 start 4 424
2018/03/24 23:40:33 start 1 604
2018/03/24 23:40:33 start 3 437
2018/03/24 23:40:34 error 4 424
2018/03/24 23:40:34 end   3 437
2018/03/24 23:40:34 error 1 604
2018/03/24 23:40:34 error 0 664
2018/03/24 23:40:34 error 2 940
done

(このコードだけなら直接エラー値を返さずgoroutineでループしている処理の中でエラーの出力をすれば良いのでは?という話があったりもしますが。実際のコードではエラー処理が共通で行われていたりなどあると思います)。

実装

実装は以下の様な感じです。

// Conditional :
type Conditional struct {
    wg       sync.WaitGroup
    waitOnce sync.Once
    errCh    chan error
}

// New :
func New() *Conditional {
    return &Conditional{errCh: make(chan error)}
}

// WaitWith :
func (c *Conditional) WaitWith(fn func(errch <-chan error)) {
    go fn(c.errCh)
    c.waitOnce.Do(func() {
        c.wg.Wait()
        close(c.errCh)
    })
    c.wg.Wait()
}

// Go :
func (c *Conditional) Go(f func() error) {
    c.wg.Add(1)
    go func() {
        defer c.wg.Done()
        if err := f(); err != nil {
            c.errCh <- err
        }
    }()
}

同時並行実行数制限?

並行実行の数を制限したい場合には、semaphoreを使うと便利です。

go get -v golang.org/x/sync/semaphore

こんな感じに。

@@ -45,10 +47,15 @@ func (c *Conditional) Go(f func() error) {
 
 func main() {
    c := New()
+   sem := semaphore.NewWeighted(2)
+
    for i := 0; i < 5; i++ {
        i := i
 
        c.Go(func() error {
+           sem.Acquire(context.Background(), 1)
+           defer sem.Release(1)
+
            n := int(rand.Float64() * 1000)
            log.Println("start", i, n)
            time.Sleep(time.Duration(n) * time.Millisecond)

例えば2個ずつまでしか実行しない場合。

2018/03/24 23:49:17 start 2 940
2018/03/24 23:49:17 start 0 604
2018/03/24 23:49:17 start 3 664
2018/03/24 23:49:17 error 0 604
2018/03/24 23:49:18 error 2 940
2018/03/24 23:49:18 start 4 437
2018/03/24 23:49:18 error 3 664
2018/03/24 23:49:18 start 1 424
2018/03/24 23:49:18 end   4 437
done

キャンセル処理は?

キャンセル処理はどうしましょう?今回の実装では対応していません。あったほうが良いですか?contextを見るというのもありだとおもいます。以下みたいな処理をGoの中などで書いてみるイメージです。

select {
    case <- ctx.Done():
        break
    default:
        do something
}

あるいはGo()メソッドの実装を以下の様にしてみるだとか。

// Go :
func (c *Conditional) Go(ctx context.Context, f func() error) {
    select {
    case <-ctx.Done():
        return
    default:
        c.wg.Add(1)
        go func() {
            defer c.wg.Done()
            select {
            case <-ctx.Done():
                return
            default:
                if err := f(); err != nil {
                    c.errCh <- err
                }

            }
        }()
    }
}

使う時にctxを渡すように変わります。

c := New()

c.Go(ctx, fn0)
c.Go(ctx, fn1)

c.WaitWith(func(errch <-chan error) {
    for err := range errch {
        log.Println("error", err)
    }
})

正直な所キャンセルの対応はあまり良い感じではない気がしています。