一ヶ月ほど前に社内のインフラ共有会でタイトルの話をしました。記録の
ために記事を書いておきます。

Gist に置いてある ので、コードは git clone で取得可能です。

$ git clone https://gist.github.com/c0a4234a5264c89655c40adcf7c27cb2.git

Ruby

例えば Ruby で 30 個の処理をするコードがあったとします。こんな素朴
なコードです。それぞれ 3 秒かかる処理が 30 あるので、とても遅いです。

Thread を使って 5 並列にしました。明らかに速くなりました。

ついでにそれぞれの結果(というほどのものではありませんが)を
results に代入し、最後にまとめて表示しました。

results は共有リソースになるので、Thread::Mutex#synchronize でロッ
クをかけて安全に書き込んでいます。ロックをかけないと results に同
時に書き込まれるケースを救うことが出来ません。

parallel gem を使うと、ずいぶんとすっきり書くことが出来ます。

golang

直列処理に関しては、Ruby と同様に素朴です。goroutineNum(あとで説
明します)は常に 1 です。

何も考えずに heavyProcess() を goroutine で動かします。19 行目に
“go " を追加しただけです。驚いたことに何も表示されずに終了します。

実は何も表示されなかったのは、goroutine の終了を待たずに main() が
終了してしまったためです。シェルのバックグラウンド実行(&)とよく
似ていると思いました。

今度は sync パッケージを使って、goroutine の終了を待ちましたが、
今回も何も表示されません。

これは heavyProcess() の終了を待たずに WaitGroup.Done しているからです。

sync.WaitGroup.Done() も含めた処理を goroutine で実行すると(23-26
行目)、出力されます。ただしこの時点では 30 並列です。goroutineNum
は最大 31 です。

defer を使ってリファクタリングしました。23 行目の無名関数の最後で
必ず WaitGroup.Done を実行してくれることが明示&保証されました。

5 並列にしました。サイズ 5 で int 型の channel を作り、同時実行数
を制限しました。変数 semaphore には 1 が書き込まれ、heavyProcess()
が終わったら、出ていきます。

実行すると、goroutineNum が最大 31 なのは変わりませんが、徐々に減っ
ていくのがわかると思います。

少し脱線して並列数を CPU コア数にしました。通常はこれで良いと思い
ますが、Web API へのアクセス等、CPU パワーを使わない処理であれば、
もっと増やして良いと思います。

parallels_v1.rb と同じように、結果を最後にまとめて表示します。
sync.Mutex でロック用の変数を作り、34~36 行目でロックをかけながら
results に書き込んでいます。

少しリファクタリングして、semaphore に詰める値を 1 から i に変えま
した。少し自信がありませんが…。

考察など

goroutine とは

何か処理を “go” で括れば、main() とは別のスレッド(という表現で良
いか分からない)で動作すると理解しました。シェルのバックグラウンド
実行とよく似ていますし、これ自体はシンプルだと思います。

そのままだと main() が先に終わってしまうので、sync.WaitGroup など
で待つ必要があります。

ちなみに sync.WaitGroup を使わずに channel で同じことができます。
でも、私は読みやすさの点から sync.WaitGroup を使うほうが好きです。

Ruby と golang の並列処理の違い

今回の Ruby と golang のコードを実行すると、Ruby は並列数分の処理
が全部終わってから次に進み、golang は並列処理のうち 1 つでも終わる
と、次の処理が並列処理に加わります。

擬音にすると、Ruby は「ガッガッガッガッ」で、golang は「スルスルス
ルスル」です。分かるかな?

main() も goroutine

実は main() 自体も goroutine で動いているようです。例えば以下の変
更を加えて”$ GOTRACEBACK=2 go run parallels_v7.go" を実行すると、
なんとなく分かります。

diff --git a/parallels_v7.go b/parallels_v7.go
index f17339a..d34b0b1 100644
--- a/parallels_v7.go
+++ b/parallels_v7.go
@@ -33,6 +33,7 @@ func main() {
 			mu.Lock()
 			defer mu.Unlock()
 			results = append(results, <-semaphore) // Is this safe?
+			select {}
 		}(i)
 	}
 	wg.Wait()

参考: Go の並行処理 - Block Rockin’ Codes