Python 用の Reactive Extensions ライブラリである RxPY を勉強し始めました。
RxPY を使えば、非同期的に流れてくるデータのシーケンス/ストリームに対する処理を簡単に記述できます。
RxPYに関する日本語の情報はまだまだ少ないため、勉強した内容を簡単に残しておきたいと思います。
注意: RxPY は ver. 1 と ver. 3 がありますが、後者を扱います。この2つは、主要な機能の記述方法が異なるため、注意が必要です。
基本
ReactiveX (Reactive eXtension) はObserverパターンと関数型プログラミングに基づくツールであり、非同期的に流れてくるデータのシーケンス/ストリームに対する処理を簡単に記述できます。
RxPY の簡単な例:
import rx import rx.operators as ops class MyObserver(): ```Observerクラス``` def on_next(self, value): print(f"データ受信時の処理 {value}") def on_error(self, error): print("エラー時の処理") def on_completed(self): print("完了") def my_func(value): return value ** 2 # observable オブジェクトの作成 observable_stream = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon") # Operators による演算 observable_stream.pipe( ops.map(lambda s: len(s)), ops.filter(lambda value: value < 6), ops.map(lambda value: my_func(value)), ).subscribe(MyObserver()) # Observerを登録
実行結果:
データ受信時の処理 25 データ受信時の処理 16 データ受信時の処理 25 データ受信時の処理 25 完了
- Observable
- データのストリーム/シーケンス
- 作り方
-
stream = rx.range(0,5)
-
stream = rx.of('aaa','bbb','ccc')
-
stream = rx.from_([0,1,2,3,4])
- rx.creat()
-
- Observer
- データを観測(受信)する
- 観測時の役割は
on_next(value)
などに書く
- observable_stream.subscribe()
- データを観測した時の処理
- 書き方は以下の3つ
- Observerを作って渡す
on_next(value)
, 必要であればon_error(error)
やon_completed()
メソッドを実装する
- 直接関数を渡す
observable_stream.subscribe(print)
- lambda関数で直接書く
- Observerを作って渡す
observable_stream.subscribe( on_next=lambda data: ..., on_error=lambda error: my_error_func(error), on_completed=..., )
例:通知に使う
import time import random import threading import rx from rx.subject import Subject # Subject: 自分でデータを送れるObservableストリーム stream = Subject() def func(k): print(f"start func({k})") time.sleep(random.randint(5, 30) * 0.1) # 0.5 ~ 3.0秒待つ print(f"end func({k})") stream.on_next(1) #終わったらストリームにデータを送る stream.pipe( ops.buffer_with_count(3) #データ(通知)を3つ溜める ).subscribe( on_next=lambda x:print("\t3つ完了") ) for k in range(0, 9): threading.Thread(target=lambda : func(k)).start()
実行結果
start func(0) start func(1) start func(2) start func(3) start func(4) start func(5) start func(6) start func(7) start func(8) end func(5) end func(8) end func(1) 3つ完了 end func(6) end func(7) end func(0) 3つ完了 end func(2) end func(3) end func(4) 3つ完了