Wizard Notes

Python, JavaScript を使った音楽信号分析の技術録、作曲活動に関する雑記

RxPY 3 入門メモ(簡単なサンプルコード、用語、使用例)

Python 用の Reactive Extensions ライブラリである RxPY を勉強し始めました。

RxPY を使えば、非同期的に流れてくるデータのシーケンス/ストリームに対する処理を簡単に記述できます。

f:id:Kurene:20201217165141p:plain
https://www.atmarkit.co.jp/fdotnet/introrx/introrx_01/introrx_01_01.html より

RxPYに関する日本語の情報はまだまだ少ないため、勉強した内容を簡単に残しておきたいと思います。

注意: RxPY は ver. 1 と ver. 3 がありますが、後者を扱います。この2つは、主要な機能の記述方法が異なるため、注意が必要です。

rxpy.readthedocs.io

基本

ReactiveX (Reactive eXtension) はObserverパターンと関数型プログラミングに基づくツールであり、非同期的に流れてくるデータのシーケンス/ストリームに対する処理を簡単に記述できます。

f:id:Kurene:20201217162210p:plain
http://reactivex.io/documentation/observable.html

RxPY の簡単な例:

import rx
import rx.operators as ops


class MyObserver():
    ```Observerクラス```
    def on_next(self, value):
        print(f"データ受信時の処理 {value}")
        
    def on_error(self, error):
        print("エラー時の処理")
        
    def on_completed(self):
        print("完了")

def my_func(value):
    return value ** 2
        
# observable オブジェクトの作成
observable_stream = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

# Operators による演算
observable_stream.pipe(
    ops.map(lambda s: len(s)),                 
    ops.filter(lambda value: value < 6),
    ops.map(lambda value: my_func(value)),
).subscribe(MyObserver()) # Observerを登録

実行結果:

データ受信時の処理 25
データ受信時の処理 16
データ受信時の処理 25
データ受信時の処理 25
完了
  • Observable
    • データのストリーム/シーケンス
    • 作り方
      1. stream = rx.range(0,5)
      2. stream = rx.of('aaa','bbb','ccc')
      3. stream = rx.from_([0,1,2,3,4])
      4. rx.creat()
  • Observer
    • データを観測(受信)する
    • 観測時の役割は on_next(value) などに書く
  • observable_stream.subscribe()
    • データを観測した時の処理
    • 書き方は以下の3つ
      1. Observerを作って渡す
        • on_next(value), 必要であればon_error(error)on_completed()メソッドを実装する
      2. 直接関数を渡す
        • observable_stream.subscribe(print)
      3. lambda関数で直接書く
observable_stream.subscribe(
   on_next=lambda data: ...,
   on_error=lambda error: my_error_func(error),
   on_completed=...,
) 

例:通知に使う

import time
import random
import threading

import rx
from rx.subject import Subject

# Subject: 自分でデータを送れるObservableストリーム
stream = Subject() 

def func(k):
    print(f"start func({k})")
    time.sleep(random.randint(5, 30) * 0.1) # 0.5 ~ 3.0秒待つ
    print(f"end   func({k})")
    stream.on_next(1)         #終わったらストリームにデータを送る

stream.pipe(
    ops.buffer_with_count(3) #データ(通知)を3つ溜める
).subscribe(
    on_next=lambda x:print("\t3つ完了")
)

for k in range(0, 9):
    threading.Thread(target=lambda : func(k)).start()

実行結果

start func(0)
start func(1)
start func(2)
start func(3)
start func(4)
start func(5)
start func(6)
start func(7)
start func(8)
end   func(5)
end   func(8)
end   func(1)
    3つ完了
end   func(6)
end   func(7)
end   func(0)
    3つ完了
end   func(2)
end   func(3)
end   func(4)
    3つ完了

参考文献