リアルタイム音響信号処理プラグインの開発において、開発言語がC++であればVSTを使うことでDAW上で自作プラグインを動かすことができます。
一方で Python の場合、Numpy, Scipyといった強力な信号処理モジュールを使って簡単にリアルタイム音響信号処理のプログラムを作ることができますが、自作のプログラムを動かすアプリケーションや環境に恵まれていません。
そこで、Pythonで書いた音響信号処理のプログラムを利用できる音楽プレイヤーを作ってみました。
実装
"""
date: 2020/08/10
author: @_kurene
"""
import sys
import pyaudio
import threading
import numpy as np
import librosa
import warnings
from numba import jit
warnings.simplefilter('ignore')
@jit
def process_block(input, output, n_ch, n_chunk):
for c in range(0, n_ch):
for n in range(0, n_chunk):
output[c, n] = input[c, n]
class AudioPyler():
def __init__(self, sr=44100, n_chunk=1024, format="float32", loop_on=True):
self.sr = sr
self.n_ch = 2
self.n_chunk = n_chunk
self.loop_on = loop_on
self.length = 0
self.offset = 0
self.format = pyaudio.paInt16 if format == "int16" else pyaudio.paFloat32
self.dtype = np.float32
self.signal = None
self.p = pyaudio.PyAudio()
self.stream = self.p.open(format=self.format, channels=self.n_ch, rate=self.sr, frames_per_buffer=self.n_chunk,
input=False, output=True)
self.stream.stop_stream()
def set_audiofile(self, filepath):
self.stop()
signal, self.sr = librosa.load(filepath, sr=self.sr, mono=False)
length = signal.shape[0] if signal.ndim == 1 else signal.shape[1]
self.length = (length // self.n_chunk)*self.n_chunk + (self.n_chunk if length % self.n_chunk > 0 else 0)
self.signal = np.zeros((self.n_ch, self.length))
for k in range(0, self.n_ch):
self.signal[k, 0:length] = signal / self.n_ch if signal.ndim == 1 else signal[k]
def __run(self):
output = np.zeros((self.n_ch, self.n_chunk))
while self.stream.is_active():
input = self.signal[:, self.offset : self.offset + self.n_chunk]
process_block(input, output, self.n_ch, self.n_chunk)
chunk_data = np.reshape(output.T, (self.n_chunk * self.n_ch))
chunk = chunk_data.astype(self.dtype).tostring()
self.stream.write(chunk)
self.offset += self.n_chunk
if self.offset >= self.length:
if self.loop_on:
self.offset = 0
else:
self.stop()
return True
def pause(self):
self.stream.stop_stream()
def stop(self):
self.stream.stop_stream()
self.offset = 0
def play(self):
self.stream.start_stream()
t = threading.Thread(target=self.__run)
t.start()
def terminate(self):
self.stream.stop_stream()
self.stream.close()
self.p.terminate()
if __name__ == "__main__":
'''
audio_player = AudioPyler()
audio_player.set_audiofile("./audio/orgel_1.mp3")
audio_player.play()
audio_player.stop()
audio_player.pause()
audio_player.terminate()
audio_player.set_audiofile("./audio/orgel_2.mp3")
'''
import code
console = code.InteractiveConsole(locals=locals())
console.interact()
実装の解説(メモ)
楽曲ファイルの読み込み
librosa.load()
を利用
- 私の環境では、.mp3, .wav の読み込みに成功
- ステレオ信号用のプレイヤーとするため、
mono=True
とした
- pyaudioの出力ストリームへの書き込み処理の分岐を減らすため、信号サイズを n_chunk の倍整数に調性
オーディオの再生(書き込み)方法
- PyAudio
を利用
- ストリームに対してはndarrayではなくバイト列を渡す必要があることに注意(以下の記事を参考にしてください)
www.wizard-notes.com
AudioPyler クラスの機能
set_audiofile(filepath)
play()
- 楽曲ファイルを再生する
- 実際にストリームに書き込み処理は
__run()
が行う(threading で非同期処理)
stop()
pause()
- 楽曲の再生を停止する
stop()
との違いは、再生位置 (offset) を保存するかどうか。
terminate()
__run()