pythonのasyncioでrun_in_executor()を使ってもブロックしてるように見えて上手く処理を逃がせないと感じたとき。

asyncioを使っていて、awaitableな関数群で構成されていないコードセットを無理やり非同期化してお茶を濁す時にrun_in_executor()が使われる。

ただしこのrun_in_executor()を使っても上手く処理を別スレッドに逃がせずという事態に陥ることもある。その理由などについてのメモ。

(実験用のコードのgist。分かっている人はこれを見るだけで良いかもしれない)

(正確には言えば、しっかりと別スレッドで実行されてはいるのだけれど、特定のコードブロックの実行がCPUを占有して、非同期的に動いている他の処理の待ち受けにすら反応を返さない様に見える状態のこと)

run_in_executor()を使ってもブロックしてしまう

とりあえず以下の様なコードを見て欲しい。1秒掛かりそうなタスクを2個、2秒掛かりそうなタスクを1個、並行で動かそうとしているが実際の実行時間は4秒になっている。

import logging
import asyncio
import time
from functools import partial
from mysleep import mysleep

logger = logging.getLogger(__name__)


async def run():
    loop = asyncio.get_event_loop()
    st = time.time()
    logger.info("**start**")
    result = await asyncio.gather(
        *[
            loop.run_in_executor(None, partial(mysleep, "x", 1)),
            loop.run_in_executor(None, partial(mysleep, "y", 1)),
            loop.run_in_executor(None, partial(mysleep, "z", 2)),
        ]
    )
    logger.info("**end** %r %r", result, time.time() - st)


logging.basicConfig(level=logging.INFO, format="%(asctime)s" + logging.BASIC_FORMAT)
asyncio.run(run(), debug=True)

x,y,zのmaxで2秒で終わりそうな気がするのだけれど。4(1+1+2)秒掛かる。

実行結果

$ 2019-03-20 20:54:59,837INFO:__main__:**start**
('    ... start mysleep', 'x', 1)
('    ... end mysleep', 'x')
('    ... start mysleep', 'y', 1)
('    ... end mysleep', 'y')
('    ... start mysleep', 'z', 2)
('    ... end mysleep', 'z')
2019-03-20 20:55:03,846WARNING:asyncio:Executing <Task pending coro=<run() running at 00use_thread_pool_executor.py:18> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7f4d1a05a828>()] created at /usr/lib/python3.7/asyncio/tasks.py:615> cb=[_run_until_complete_cb() at /usr/lib/python3.7/asyncio/base_events.py:158] created at /usr/lib/python3.7/asyncio/base_events.py:563> took 4.009 seconds
2019-03-20 20:55:03,850INFO:__main__:**end** ['x', 'y', 'z'] 4.012997150421143

run_in_executor()は同期的な処理を非同期化するので上手く動いてくれそうなものだけれど(と思う人が多い)。

そもそもrun_in_executor()とは?

実際の所、run_in_executor()は、concurrent.futuresのExecutorに処理を任せているだけ。(defaultではThreadPoolExecutorが使われるので)別スレッドで動くということにはなるものの結局pythonのthreadを立ち上げているというだけなので、pythonの実行コンテキスト上ロックが掛かってしまっては処理がブロックしてしまう。

asyncio/base_events.py

# concurrent.futuresのExecutorに渡しているだけ

class class BaseEventLoop(events.AbstractEventLoop):
# ...
    def run_in_executor(self, executor, func, *args):
        self._check_closed()
        if self._debug:
            self._check_callback(func, 'run_in_executor')
        if executor is None:
            executor = self._default_executor
            if executor is None:
                executor = concurrent.futures.ThreadPoolExecutor()
                self._default_executor = executor
        return futures.wrap_future(
            executor.submit(func, *args), loop=self)

本当にconcurrent.futuresのExecutorを使っているだけ。

run_in_executor() が効かない理由

run_in_executor()が効かないと感じた時にその原因の可能性は複数ある。

  • IOバウンドな処理とCPUバウンドな処理のうちCPUバウンドな処理に該当する場合
  • 内部の処理が雑でGILが掛かってる系のコードになっている(Py_BEGIN_ALLOW_THREADSで囲まれていない部分(Cの話))

ぱっと見た感じ前者と後者は全く別のものなように感じてしまうかもしれないけれど。cpythonがこの両者を混同して解釈してしまうことがある(前者と後者を同じものだと解釈した人は処理系の気持ちが分かっている人かもしれない)。

加えて本番用のコードを書く前の実験で書いた仮のfakeのコードがtime.sleep()などを使っていた場合には、fakeの方のコードでは上手く動いているのに。。と感じてしまったりする。こういうときなどにpythonへの負の感情を募らせてしまうということがあるかもしれない。

結局、非同期処理と言っても、1つのCPUで動いているだけなので、息継ぎ的な部分がないコードがずっとCPUを掴んだらasyncioのloop自体も止まる。

(例えばlifetime的なものを定期的に更新するheartbeatを兼ねたループを実行したい場合などに全体の実行が止まっては困る。こういうときのためにどうして想定とは異なる挙動をするのかと言うことが把握できていると後々便利)

つまり非同期に優しいと言うのは2段階の意味があり、例えば、awaitable ready(asyncioのツールセットで構成されたコード)とrun_in_executor friendly(GILを気にして書かれたコード)と呼び分けることはできるかもしれない。この後者ですら無い場合にはrun_in_executor()で処理を逃したつもりになっても他のtaskの実行も止めてしまうかのように動くことがある。

最後の砦としてのProcessPoolExecutor

run_in_executor()では実行するExecutorを指定することができる。そして、concurent.futuresのExecutorにはThreadPoolExecutor(defaultで使われる)の他にもう1つProcessPoolExecutorが存在する。

こちらはprocess自体を分けて実行するので処理を別の実行コンテキストに逃がせるかもしれない(正確に言うとforkしたprocessPool内のprocessとpickleで通信する。詳しいアーキテクチャは直接コードを見るとコメントにがあるので知りたい人は覗いてみると良い)。

これを使ってあげるとprocessが別なのでGILの影響から逃れられる(逃れてはいないけれど、他には影響を与えないという意味で)。

--- 00use_thread_pool_executor.py    2019-03-20 20:53:38.629294940 +0900
+++ 01use_process_pool_executor.py    2019-03-20 20:53:38.615961610 +0900
@@ -1,6 +1,7 @@
 import logging
 import asyncio
 import time
+from concurrent.futures import ProcessPoolExecutor
 from functools import partial
 from mysleep import mysleep
 
@@ -11,11 +12,12 @@
     loop = asyncio.get_event_loop()
     st = time.time()
     logger.info("**start**")
+    executor = ProcessPoolExecutor(max_workers=3)
     result = await asyncio.gather(
         *[
-            loop.run_in_executor(None, partial(mysleep, "x", 1)),
-            loop.run_in_executor(None, partial(mysleep, "y", 1)),
-            loop.run_in_executor(None, partial(mysleep, "z", 2)),
+            loop.run_in_executor(executor, partial(mysleep, "x", 1)),
+            loop.run_in_executor(executor, partial(mysleep, "y", 1)),
+            loop.run_in_executor(executor, partial(mysleep, "z", 2)),
         ]
     )
     logger.info("**end** %r %r", result, time.time() - st)

2秒(max(1,1,2))で終わった。

PYTHONPATH=. python 01use_process_pool_executor.py
2019-03-20 20:55:03,959INFO:__main__:**start**
('    ... start mysleep', 'x', 1)
('    ... start mysleep', 'y', 1)
('    ... start mysleep', 'z', 2)
('    ... end mysleep', 'x')
('    ... end mysleep', 'y')
2019-03-20 20:55:04,967INFO:asyncio:poll took 1001.933 ms: 1 events
('    ... end mysleep', 'z')
2019-03-20 20:55:05,969INFO:__main__:**end** ['x', 'y', 'z'] 2.0095720291137695

なので最後の手段としてprocessを分けるという方法は使えるかもしれない。

このようなときにはなるべくmainのloopには実際のタスクの実行をさせないようにしてあげることが大切(逆に言うと、他が非同期でも一部の心ない同期的な処理で全体を壊すということは普通にありえる)。

詳細:実際にmysleepを作る

(ここからは詳細の話なので興味がない人はProcessPoolExecutorで解決することもあるよで納得しちゃっても良いです)

ここまでの説明だけでは、「ある特殊な状態のときにrun_in_executor()に処理を逃しても全体としてブロックしてしまったかのように他の処理の実行が遅れてしまう」ということが分かっただけの状態。そもそもどういう時にこのような事態に陥るかかと言うことがわからないまま。

なので実際に再現するようなコードを作ってみる。

mysleep

time.sleep()run_in_executor()フレンドリーと考えると、そうではないsleep関数を作るのが一番手軽。cでのsleepを直接呼ぶようなモジュールを作ってしまうのが手軽そう。

めんどくさいのでcythonを使う。man 3 sleepなどを実行してみるとunistd.hにposixのsleepは入っているようなのでこれを使う(実際インポート用のglueコードが用意されているのでcimportで直接取り出せる)。

以下のようなmysleep.pyxを作る。

mysleep.pyx

from posix.unistd cimport sleep

def mysleep(x, unsigned int n):
    print("    ... start mysleep", x, n)
    sleep(n)
    print("    ... end mysleep", x)
    return x

ビルドは以下のようなsetup.pyを書く

setup.py

from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext

setup(
    cmdclass={"build_ext": build_ext},
    ext_modules=[Extension("mysleep", sources=["mysleep.pyx"])],
)

実際にbuildして使ってみる。

$ python setup.py build_ext --inplace
$ time PYTHONPATH=. python -c 'import mysleep; print(mysleep.mysleep("OK", 1))'
('    ... start mysleep', 'OK', 1)
('    ... end mysleep', 'OK')
OK

real    0m1.068s
user    0m0.051s
sys 0m0.017s

完成。このmysleepモジュールを使ったコードが冒頭のコード。

実行結果を再掲してみると、2秒を期待している所で4秒掛かっている(WARNINGでログが出ているのである程度は気づけるかもしれない?)。

$ 2019-03-20 20:54:59,837INFO:__main__:**start**
('    ... start mysleep', 'x', 1)
('    ... end mysleep', 'x')
('    ... start mysleep', 'y', 1)
('    ... end mysleep', 'y')
('    ... start mysleep', 'z', 2)
('    ... end mysleep', 'z')
2019-03-20 20:55:03,846WARNING:asyncio:Executing <Task pending coro=<run() running at 00use_thread_pool_executor.py:18> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7f4d1a05a828>()] created at /usr/lib/python3.7/asyncio/tasks.py:615> cb=[_run_until_complete_cb() at /usr/lib/python3.7/asyncio/base_events.py:158] created at /usr/lib/python3.7/asyncio/base_events.py:563> took 4.009 seconds
2019-03-20 20:55:03,850INFO:__main__:**end** ['x', 'y', 'z'] 4.012997150421143

GILを一時的に開放する

本来Cで拡張モジュールを使う場合にはマクロでゴニョゴニョしないとダメなのだけれど、cythonは便利なのでnogilのcontext managerで包んであげればその間のコードはGILの制限から解放される。

diff --git a/daily/20190320/example_run_in_executor/mysleep.pyx b/daily/20190320/example_run_in_executor/mysleep.pyx
index c27e4da..bbf6764 100644
--- a/daily/20190320/example_run_in_executor/mysleep.pyx
+++ b/daily/20190320/example_run_in_executor/mysleep.pyx
@@ -2,6 +2,7 @@ from posix.unistd cimport sleep
 
 def mysleep(x, unsigned int n):
     print("    ... start mysleep", x, n)
-    sleep(n)
+    with nogil:
+        sleep(n)
     print("    ... end mysleep", x)
     return x

今度はThreadPoolExecutorでも上手く動く。

$ $ PYTHONPATH=. python 00use_thread_pool_executor.py
2019-03-21 15:12:40,999INFO:__main__:**start**
('    ... start mysleep', 'x', 1)
('    ... start mysleep', 'y', 1)
('    ... start mysleep', 'z', 2)
('    ... end mysleep', 'x')
('    ... end mysleep', 'y')
2019-03-21 15:12:42,004INFO:asyncio:poll took 1000.853 ms: 1 events
('    ... end mysleep', 'z')
2019-03-21 15:12:43,005INFO:__main__:**end** ['x', 'y', 'z'] 2.00624680519104

やりましたね。

gist

(実験用のコードのgist。分かっている人はこれを見るだけで良いかもしれない)

see also

uvloopとasyncioとの違いはみたいな話が気になった人はこの記事を見るのも良いかも

asyncioのもう少し外側の話という意味では、trioやcruioの入門でこのあたりを見るのも良いかも

N個のchannelがcloseされるまで読み込む方法について

N個のchannelがcloseされるまで読み込む方法についてのメモ。

1個のchannelがcloseされるまで読み込む

まずはじめにN=1の場合について。これは単にforループで取り出せば良い。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    ch := make(chan int)
    wg.Add(1)
    go func() {
        defer close(ch)
        defer wg.Done()
        for _, x := range []int{1, 2, 3, 4, 5} {
            ch <- x
            time.Sleep(10 * time.Millisecond)
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        var r []int
        for x := range ch {
            r = append(r, x)
        }
        fmt.Println(r)
    }()
    wg.Wait()
    // Output:
    // [1 2 3 4 5]
}

重要なのはここの部分。

for x := range ch {
    r = append(r, x)
}

channelをforループでループするとcloseされるまでの値を取り出すことができる。

forループをselectに変換する

forループで読み込めるのは1つのchannelだけ。以下の様なコードはch0を全部読み込んだ後にch1を読み込むというような挙動になる。このままでは複数のchannelを並行して読み込むことができない。

for x := range ch0 {
    r = append(r, x)
}
for x := range ch1 {
    r = append(r, x)
}

これを防ぐためにはselectを使う。mapの添字アクセスと同様に代入時に2つの値を取るようにした場合に、2番目の値に成功したかどうかの真偽値が入る。この真偽値は対象のchannelがcloseされた時にfalseになる。

なので先程のforループは以下の様なselectに書き換えることができる(returnではなくlabelをつけてbreakでも良いかもしれない)。

func() {
    for {
        select {
        case x, ok := <-ch:
            if !ok {
                return
            }
            r = append(r, x)
        }
    }
}()

2個のchannelがcloseされるまで読み込む

今度はN=2の場合、ようやく意味が出はじめた。ch0とch1の2つのchannelを読み込もうとしている。注意点としてch0は10ミリ秒のsleep,ch1は20ミリ秒のsleepを間に入れている。

先程のselectへの変換の例から似たような形に変換してみる。以下は期待通りには動かないコード。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup

    ch0 := make(chan int)
    ch1 := make(chan int)
    wg.Add(2)
    go func() {
        defer close(ch0)
        defer wg.Done()
        for _, x := range []int{1, 2, 3, 4, 5} {
            ch0 <- x
            time.Sleep(10 * time.Millisecond)
        }
    }()
    go func() {
        defer close(ch1)
        defer wg.Done()
        for _, x := range []int{-1, -2, -3, -4, -5} {
            ch1 <- x
            time.Sleep(20 * time.Millisecond)
        }
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        var r []int
        for n := 2; n > 0; {
            select {
            case x, ok := <-ch0:
                if !ok {
                    n--
                    continue
                }
                r = append(r, x)
            case x, ok := <-ch1:
                if !ok {
                    n--
                    continue
                }
                r = append(r, x)
            }
        }
        fmt.Println(r)
    }()
    wg.Wait()
}

これは上手くいかない。

[1 -1 2 3 -2 4 5 -3]
fatal error: all goroutines are asleep - deadlock!

ch0の方が先に読み込み終わり、一方でch1はまだ読み込み中、にもかかわらずnがch側のcaseを何度も走るためforループの終了条件を抜けてしまう。

例えば以下の様に変更すると動くようにはなる。片側が読み終わったらもう片側を直接forループで読み込む様に変更した形。しかし不格好に見える。

go func() {
    defer wg.Done()
    var r []int
    func() {
        for {
            select {
            case x, ok := <-ch0:
                if !ok {
                    for y := range ch1 {
                        r = append(r, y)
                    }
                    return
                }
                r = append(r, x)
            case x, ok := <-ch1:
                if !ok {
                    for y := range ch0 {
                        r = append(r, y)
                    }
                    return
                }
                r = append(r, x)
            }
        }
    }()
    fmt.Println(r)
}()

動きはする。

[1 -1 2 3 -2 4 5 -3 -4 -5]

nil channel

select対象にnilを代入した場合に対応するcase節(?)では単に無視される。この挙動を使って終了済み(closed)のchannelの処理をスキップさせるコードが書ける。これはnil channelパターンと呼ばれていたりもするらしい。

go func() {
    defer wg.Done()
    var r []int
    for n := 2; n > 0; {
        select {
        case x, ok := <-ch0:
            if !ok {
                ch0 = nil
                n--
                continue
            }
            r = append(r, x)
        case x, ok := <-ch1:
            if !ok {
                ch1 = nil
                n--
                continue
            }
            r = append(r, x)
        }
    }
    fmt.Println(r)
}()

nil channelを使えば片側の終了後にもう片側をforループで読み込むなど凝ったことを考えずに自然に書ける。

[1 -1 2 -2 3 4 -3 5 -4 -5]

N個のchannelがcloseされるまで読み込む(reflect)

nil channelを使ったコードはすごく画一的な形になる。これを上手く使えば読み込むchannelの数がN個の場合にも対応できそうな気がしてきた。

一方で任意個のchannelに対応するためには動的に読み込むchannelの数を増やせる必要がある。ただしselectは構文なので直接手で明示的に書かなければいけない。この悩ましさを解決するためにはもう少し動的な記述が必要になる。つまりreflectパッケージのちからが必要になる。

実際以下の様な形でselectは動的に使うことができる。

package main

import (
    "fmt"
    "reflect"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup

    ch0 := make(chan int)
    ch1 := make(chan int)
    ch2 := make(chan int)
    wg.Add(3)
    go func() {
        defer close(ch0)
        defer wg.Done()
        for _, x := range []int{1, 2, 3, 4, 5} {
            ch0 <- x
            time.Sleep(10 * time.Millisecond)
        }
    }()
    go func() {
        defer close(ch1)
        defer wg.Done()
        for _, x := range []int{-1, -2, -3, -4, -5} {
            ch1 <- x
            time.Sleep(20 * time.Millisecond)
        }
    }()
    go func() {
        defer close(ch2)
        defer wg.Done()
        for _, x := range []int{10, 20, 30} {
            ch1 <- x
            time.Sleep(30 * time.Millisecond)
        }
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        var r []int

        cases := []reflect.SelectCase{
            reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch0)},
            reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch1)},
            reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch2)},
        }
        for n := 3; n > 0; {
            i, x, ok := reflect.Select(cases)
            if !ok {
                n--
                cases[i].Chan = reflect.ValueOf(nil) // nil channel
                continue
            }
            r = append(r, int(x.Int()))
        }
        fmt.Println(r)
    }()
    wg.Wait()
}

reflect.SelectCase()でCase節(?)に対応する値を作り、reflect.Select()でselectの条件分岐(+channelの待ち受け)を行う。ここでも同様にnil channelを使う事はできる。

ただし、全ての値はreflect.Valueなので変換が必要。

reflectのちからを借りずにどうにかできないものか。

N個のchannelがcloseされるまで読み込む(reflect無し)

reflect無しにN個のchannelを読み込むことはできないか?

固定回数の繰り返しを任意回の繰り返しに変換するときの定石として再帰がある。同じ理屈で再帰的な記述ができればN個のchannelがcloseされるまで読み切ることができるかもしれない。

線形リストの総和

簡単な再帰の練習に線形リストの総和を求めてみる。ここで線形リストをわざわざ作るのは面倒なのでsliceで代用する。

package main

import (
    "fmt"
)

func sum(xs []int) int {
    if len(xs) == 0 {
        return 0
    }
    return xs[0] + sum(xs[1:])
}

func main() {
    fmt.Println(sum([]int{1, 2, 3, 4, 5}))
    // Output:
    // 15
}

これはもう少し丁寧に書くと以下の様な形で計算される。

1 + sum([2,3,4,5])
1 + (2 + sum([3,4,5]))
1 + (2 + (3 + sum([4,5])))
1 + (2 + (3 + (4 + sum([5]))))
1 + (2 + (3 + (4 + (5 + sum([])))))
1 + (2 + (3 + (4 + (5 + 0))))
1 + (2 + (3 + (4 + 5)))
1 + (2 + (3 + 9))
1 + (2 + 12)
1 + 14
15

ここで最初の部分に注目する。これは最初の値と残りの値のsumとの和。

1 + sum([2,3,4,5])

もちろん再帰は終了するために基底条件を持っていなければ行けなくて、今回の場合は長さが0の場合に0というもの。

sum = 0 if len(xs) == 0
sum = xs[0] + sum(xs[1:])

短く雰囲気を書くとこんな感じ。

再帰的な定義をchannelにも。

先程の総和の計算をchannelにも適用できないだろうか?

同じ理屈で考えるなら、あるchannelを合成するmerge()という関数があり、それは再帰的な定義になっている。chanelの数が0個以上のときに最初のchannelと残りのchannelをmergeしたものを組み合わせる。

+ではないので(+)という表記にしてみた。

merge = chs[0] (+) merge(chs[1:])

基底条件も同様に考えるなら、おそらくchannelの数が0のときにはnilを返せば良さそう。nil channelパターンもあるし。

merge = nil if len(chs) == 0
merge = chs[0] (+) merge(chs[1:])

(追記: 0のときnilではダメ。nilはselectをすり抜ける。closeされたchannelを返す)

merge = new closed channel if len(chs) == 0
merge = chs[0] if len(chs) == 1
merge = chs[0] (+) merge(chs[1:])

ただし注意点として残りの計算の部分は別途goroutineを走らせる必要がある。これをコードにすると実際に動く。キモいのだけれど動く。

go func() {
    defer wg.Done()
    var r []int

    var merge func(chs []<-chan int) <-chan int
    merge = func(chs []<-chan int) <-chan int {
        switch len(chs) {
        case 0:
            ch := make(chan int)
            close(ch)
            return ch
        case 1:
            return chs[0]
        default:
            ch := make(chan int)
            go func() {
                defer close(ch)
                restCH := merge(chs[1:])
                for n := 2; n > 0; {
                    select {
                    case x, ok := <-chs[0]:
                        if !ok {
                            n--
                            chs[0] = nil
                            continue
                        }
                        ch <- x
                    case x, ok := <-restCH:
                        if !ok {
                            n--
                            restCH = nil
                            continue
                        }
                        ch <- x
                    }
                }
            }()
            return ch
        }
    }
    for x := range merge([]<-chan int{ch0, ch1, ch2}) {
        r = append(r, x)
    }
    fmt.Println(r)
}()

そういえば、無名関数での再帰やN個のchannelへの対応で再帰を使うのは並行処理のgoの本などでも紹介されていたのを思い出した(or-done channel)。

ところでこのとき作られるgoroutineの数はreflectのときのそれよりも多い。

最終的なコード

最終的なコード

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup

    ch0 := make(chan int)
    ch1 := make(chan int)
    ch2 := make(chan int)
    wg.Add(3)
    go func() {
        defer close(ch0)
        defer wg.Done()
        for _, x := range []int{1, 2, 3, 4, 5} {
            ch0 <- x
            time.Sleep(10 * time.Millisecond)
        }
    }()
    go func() {
        defer close(ch1)
        defer wg.Done()
        for _, x := range []int{-1, -2, -3, -4, -5} {
            ch1 <- x
            time.Sleep(20 * time.Millisecond)
        }
    }()
    go func() {
        defer close(ch2)
        defer wg.Done()
        for _, x := range []int{10, 20, 30} {
            ch1 <- x
            time.Sleep(30 * time.Millisecond)
        }
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        var r []int

        var merge func(chs []<-chan int) <-chan int
        merge = func(chs []<-chan int) <-chan int {
            switch len(chs) {
            case 0:
                ch := make(chan int)
                close(ch)
                return ch
            case 1:
                return chs[0]
            default:
                ch := make(chan int)
                go func() {
                    defer close(ch)
                    restCH := merge(chs[1:])
                    for n := 2; n > 0; {
                        select {
                        case x, ok := <-chs[0]:
                            if !ok {
                                n--
                                chs[0] = nil
                                continue
                            }
                            ch <- x
                        case x, ok := <-restCH:
                            if !ok {
                                n--
                                restCH = nil
                                continue
                            }
                            ch <- x
                        }
                    }
                }()
                return ch
            }
        }
        for x := range merge([]<-chan int{ch0, ch1, ch2}) {
            r = append(r, x)
        }
        fmt.Println(r)
    }()
    wg.Wait()
    // Output:
    // [1 -1 10 2 3 -2 20 4 -3 5 30 -4 -5]
}