Wizard Notes

Python, JavaScript を使った音楽信号分析の技術録、作曲活動に関する雑記

Python:sounddeviceを使った音声ファイルの再生・リアルタイム処理の実装方法

以下の記事では,Pythonで音の再生・録音を行うためのライブラリ sounddevice を紹介しました。

Python-sounddevice で音声や歌声をリアルタイム収音・再生・録音 - Wizard Notes

この記事では、音の信号処理/アプリ開発でよく使う、音声ファイルの再生sounddeviceを使ってどのように実装するかを紹介します。

方法1: オーディオをそのまま再生する

sounddevice を使った音声ファイルの再生で最も簡単なのが sd.play() を使う方法です。

sd.play() を使えば下記の例のように数行で音を再生できます。

ただし、再生する音信号データを丸ごとsd.play() に渡すため,マイク入力等に応じてリアルタイムで信号を加工し再生する用途には不向きです

import sounddevice as sd
import soundfile as sf


filepath = "./audio.wav"
# sig: 信号, sr: サンプリング周波数
sig, sr = sf.read(filepath, always_2d=True)
sd.play(sig, sr)

sd.wait() # sd.playが完了するのを待つ
print("End")

なお、再生途中でsd.stop()を実行すると再生を停止することができます。

また、オーディオファイルの再生にはsoundfileを使っています。詳しくは以下の記事をご覧ください。

www.wizard-notes.com

方法2: オーディオをリアルタイム処理して再生

こちらはblocksizeの短いサンプル数ごとに信号を処理するため,マイク入力やその他の信号に応じてリアルタイムで音信号を加工するようなエフェクタを作るのに向いています。

import sounddevice as sd
import soundfile as sf
import threading

"""
filepath = "./audio.wav"
# sig: 信号, sr: サンプリング周波数
sig, sr = sf.read(filepath, always_2d=True)
sd.play(sig, sr)

sd.wait() # sd.playが完了するのを待つ
print("End")
"""


filepath = "./elepi.wav"
sig, sr = sf.read(filepath, always_2d=True) # sig: 信号, sr: サンプリング周波数

n_samples, n_channels = sig.shape
blocksize = 1024
current_frame = 0

# コールバック関数
def callback(indata, outdata, frames, time, status):
    global current_frame, n_samples, n_channelss
    chunksize = min(n_samples - current_frame, frames)
    
    outdata[:] *= 0.0
    # チャンネルごとの信号処理
    for k in range(n_channels): 
        outdata[0:chunksize, k] = self.sig[current_frame:current_frame + chunksize, k]

    if chunksize < frames:
        raise sd.CallbackStop()
        
    current_frame += chunksize


event = threading.Event()
with sd.Stream(
    samplerate=sr, 
    blocksize=blocksize,
    channels=sig.shape[1],
    callback=callback, 
    finished_callback=event.set
):
    print("start")
    event.wait() # 再生終了まで待機
    
print("End")

方法2.1 マルチスレッド処理

方法2でマルチスレッド処理を利用することで、バックグラウンドでのリアルタイム再生・信号処理ができます。

import sounddevice as sd
import soundfile as sf
import threading

class Player(threading.Thread):
    def __init__(self, sig, sr, blocksize = 1024):
        super(Player, self).__init__()
        
        self.sig = sig
        self.sr  = sr
        self.n_samples, self.n_channels = sig.shape
        self.blocksize = blocksize

    def callback(self, indata, outdata, frames, time, status):
        chunksize = min(self.n_samples - self.current_frame, frames)
    
        outdata[:] *= 0.0
        # チャンネルごとの信号処理
        for k in range(self.n_channels): 
            outdata[0:chunksize, k] = self.sig[self.current_frame:self.current_frame + chunksize, k]

        if chunksize < frames:
            raise sd.CallbackStop()
        
        self.current_frame += chunksize

    def run(self):
        self.current_frame = 0
        self.event = threading.Event()
        
        with sd.Stream(
            samplerate=self.sr, 
            blocksize=self.blocksize,
            channels=self.n_channels,
            callback=self.callback, 
            finished_callback=self.event.set
        ):
            self.event.wait()


filepath = "./audio.wav"
sig, sr = sf.read(filepath, always_2d=True)

player = Player(sig, sr)
player.start()

print("Start")

import code
console = code.InteractiveConsole(locals=locals())
console.interact()

方法3: オーディオをリアルタイム処理して再生(同期処理)

方法2はcallback関数を渡すことで非同期(ノンブロッキング)処理となっていました。

もう一つの再生方法として、stream.write()によって信号データを書き込むことで 同期(ノンブロッキング)処理として音声ファイルの再生ができます。

import numpy as np
import sounddevice as sd
import soundfile as sf


filepath = "./audio.wav"
sig, sr = sf.read(filepath, always_2d=True) # sig: 信号, sr: サンプリング周波数

n_samples, n_channels = sig.shape
blocksize = 1024
current_frame = 0
chunk = np.zeros((blocksize, n_channels))
dtype = np.float32


with sd.OutputStream(
    samplerate=sr, 
    blocksize=blocksize,
    channels=n_channels,
    dtype=dtype
) as stream:
    while True:
        chunksize = min(n_samples - current_frame, blocksize)            
        chunk[0:chunksize] = sig[current_frame:current_frame + chunksize, :]
        
        stream.write( chunk.astype(dtype) ) # chunkの信号を書き込み
        
        if chunksize < blocksize:
            break
        current_frame += chunksize

まとめ

sounddeviceを使った音声ファイルの再生・リアルタイム処理の実装方法を紹介しました。

ご自身の作成している信号処理/アプリ開発に合った実装方法を是非利用してみてください。