RxPY でのrx.scheduler.ThreadPoolScheduler
を使った並行処理の書き方です。
注意として、rx.scheduler.ThreadPoolScheduler
は
from concurrent.futures import ThreadPoolExecutor
を利用しています。
Pythonでは GIL (Global Interpreter Lock, GIL) により単一スレッドしか処理が実行されずその他のスレッドは待機状態になります。マルチコアによる並列処理とならないことに注意が必要です。
github.com
参考元:
Get Started — RxPY Documentation
import os
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
def intense_calculation(value):
print(f"\t\t\tintense_calculation() {current_thread().name} {value}")
time.sleep(random.randint(5, 20) * 0.1)
return value
optimal_thread_count = os.cpu_count()
print(f"\n[optimal_thread_count: {optimal_thread_count}]\n")
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)
print("Start")
rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
ops.map(lambda s: intense_calculation(s)),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda s: print(f"Thread 1:\t{current_thread().name} {s}"),
on_error=lambda e: print(e),
on_completed=lambda: print("Thread 1 done!"),
)
rx.of("aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh", "iii").pipe(
ops.map(lambda s: intense_calculation(s)),
ops.observe_on(pool_scheduler)
).subscribe(
on_next=lambda s: print(f"Thread 2:\t{current_thread().name} {s}"),
on_error=lambda e: print(e),
on_completed=lambda: print("Thread 2 done!"),
)
for c in range(0, 10):
print(f"{current_thread().name} {c}")
実行結果
[optimal_thread_count: 8]
Start
intense_calculation() ThreadPoolExecutor-0_0 Alpha
MainThread 0
MainThread 1
MainThread 2
MainThread 3
MainThread 4
MainThread 5
intense_calculation() ThreadPoolExecutor-0_1 aaa
MainThread 6
MainThread 7
MainThread 8
MainThread 9
Thread 1: ThreadPoolExecutor-0_0 Alpha
intense_calculation() ThreadPoolExecutor-0_0 Beta
Thread 2: ThreadPoolExecutor-0_1 aaa
intense_calculation() ThreadPoolExecutor-0_1 bbb
Thread 1: ThreadPoolExecutor-0_0 Beta
intense_calculation() ThreadPoolExecutor-0_0 Gamma
Thread 2: ThreadPoolExecutor-0_1 bbb
intense_calculation() ThreadPoolExecutor-0_1 ccc
Thread 1: ThreadPoolExecutor-0_0 Gamma
intense_calculation() ThreadPoolExecutor-0_0 Delta
Thread 1: ThreadPoolExecutor-0_0 Delta
intense_calculation() ThreadPoolExecutor-0_0 Epsilon
Thread 2: ThreadPoolExecutor-0_1 ccc
intense_calculation() ThreadPoolExecutor-0_1 ddd
Thread 1: ThreadPoolExecutor-0_0 Epsilon
Thread 1 done!
Thread 2: ThreadPoolExecutor-0_1 ddd
intense_calculation() ThreadPoolExecutor-0_1 eee
Thread 2: ThreadPoolExecutor-0_1 eee
intense_calculation() ThreadPoolExecutor-0_1 fff
Thread 2: ThreadPoolExecutor-0_1 fff
intense_calculation() ThreadPoolExecutor-0_1 ggg
Thread 2: ThreadPoolExecutor-0_1 ggg
intense_calculation() ThreadPoolExecutor-0_1 hhh
Thread 2: ThreadPoolExecutor-0_1 hhh
intense_calculation() ThreadPoolExecutor-0_1 iii
Thread 2: ThreadPoolExecutor-0_1 iii
Thread 2 done!
2つのスレッドに処理を投げた後、コード一番下のメインスレッドのfor文が実行されているのが見て取れます。
observe_on() の利用
2つ目のスレッドの ops.subscribe_on(pool_scheduler)
を ops.observe_on(pool_scheduler)
に変えると、以下のような実行結果になります。
$python rxpy_multithread.py
[optimal_thread_count: 8]
Start
intense_calculation() ThreadPoolExecutor-0_0 Alpha
intense_calculation() MainThread aaa
intense_calculation() MainThread bbb
Thread 2: ThreadPoolExecutor-0_1 aaa
Thread 1: ThreadPoolExecutor-0_0 Alpha
intense_calculation() ThreadPoolExecutor-0_0 Beta
Thread 1: ThreadPoolExecutor-0_0 Beta
intense_calculation() ThreadPoolExecutor-0_0 Gamma
intense_calculation() MainThread ccc
Thread 2: ThreadPoolExecutor-0_1 bbb
Thread 1: ThreadPoolExecutor-0_0 Gamma
intense_calculation() ThreadPoolExecutor-0_0 Delta
intense_calculation() MainThread ddd
Thread 2: ThreadPoolExecutor-0_3 ccc
intense_calculation() MainThread eee
Thread 2: ThreadPoolExecutor-0_1 ddd
Thread 1: ThreadPoolExecutor-0_0 Delta
intense_calculation() ThreadPoolExecutor-0_0 Epsilon
Thread 1: ThreadPoolExecutor-0_0 Epsilon
Thread 1 done!
intense_calculation() MainThread fff
Thread 2: ThreadPoolExecutor-0_5 eee
intense_calculation() MainThread ggg
Thread 2: ThreadPoolExecutor-0_3 fff
intense_calculation() MainThread hhh
Thread 2: ThreadPoolExecutor-0_7 ggg
intense_calculation() MainThread iii
Thread 2: ThreadPoolExecutor-0_2 hhh
MainThread 0
MainThread 1
MainThread 2
MainThread 3
MainThread 4
MainThread 5
MainThread 6
MainThread 7
MainThread 8
MainThread 9
Thread 2: ThreadPoolExecutor-0_5 iii
先ほど、最初に実行されていたメインスレッドのfor文は最後の方で実行されました。
この理由は、observe_on()の仕様によるものです。observe_on() は、それ以降のOperatorsの処理をスレッドプールで処理します。
したがって、その上にあるOperatorsの処理(今回のコードだと ops.map(lambda s: intense_calculation(s))
)はメインスレッドで実行しています。したがって、全てmapし終えるまで
observe_on()とsubscribe_on()はどのスレッドに処理させるかを考慮して使い分ける必要があります。