Wizard Notes

音楽信号解析の技術録、作曲活動に関する雑記

RxPY での並行処理(ThreadPoolScheduler,subscribe_on() と observe_on() の比較)

RxPY でのrx.scheduler.ThreadPoolSchedulerを使った並行処理の書き方です。

注意として、rx.scheduler.ThreadPoolScheduler from concurrent.futures import ThreadPoolExecutor を利用しています。

Pythonでは GIL (Global Interpreter Lock, GIL) により単一スレッドしか処理が実行されずその他のスレッドは待機状態になります。マルチコアによる並列処理とならないことに注意が必要です。

github.com

ソースコード

参考元: Get Started — RxPY Documentation

実行結果

2つのスレッドに処理を投げた後、コード一番下のメインスレッドのfor文が実行されているのが見て取れます。

observe_on() の利用

2つ目のスレッドの ops.subscribe_on(pool_scheduler)ops.observe_on(pool_scheduler) に変えると、以下のような実行結果になります。

先ほど、最初に実行されていたメインスレッドのfor文は最後の方で実行されました。

この理由は、observe_on()の仕様によるものです。observe_on() は、それ以降のOperatorsの処理をスレッドプールで処理します。

したがって、その上にあるOperatorsの処理(今回のコードだと ops.map(lambda s: intense_calculation(s)))はメインスレッドで実行しています。したがって、全てmapし終えるまで

observe_on()とsubscribe_on()はどのスレッドに処理させるかを考慮して使い分ける必要があります。

f:id:Kurene:20201217185622p:plain