レンタルサーバでWebアプリ作り:無音検出 → speach to text

ロリポップレンタルサーバではできることが限られていたので、ホスティングサービスの DigitalOcean を使ってWebアプリを公開するまでの道のりを記録します。
データ分析を仕事としているのでパソコンやITのことは多少は詳しいですが、インフラまわりは全くの素人です。そんな人が見て参考になる情報をまとめていきたいと思います。

DigitalOceanの紹介リンク

ここから手続きを進めてもらえると200ドル分の無料チケット(有効期間2ヶ月)がもらえるようですので、ぜひご活用ください。
※ 2023/12/29時点


背景

次なるアプリでいよいよチャット系のものを作ろうと思います。
特にスマホの場合、キーボードで入力するのが面倒なので、音声入力をさせたいのですが、まずは機構作りにトライです。


ゴール1=音声入力の機能を作る

「録音開始」→「無音検知」→「Whisperで音声認識」という手順で作ります。
Whisperは以下の記事でもご紹介した通りいたってシンプル!
なので、無音検知の部分がメインになります。

無音検知プログラム(単独実行)

以下のような「test_muon.py」プログラムを作りました。
Whisper部分は最後の7行(実質1行)。
「record_until_silence」関数がポイントです。

まずは必要なライブラリ。requirements.txt に以下を追加してください。

sounddevice 
numpy 
wavio

`pipenv install -r requirements.txt` でインストールが終わったら、以下を実行してみてください。3秒間無音が続いた時に録音停止してくれれば成功です。
※ 引数 silent_duration を1にすれば、1秒間の無音で停止します。

import sounddevice as sd
import numpy as np
import queue
import wavio

from openai import OpenAI
from dotenv import load_dotenv

load_dotenv() # 環境変数を読み込む
client = OpenAI()


def record_until_silence(silent_duration:int, filename:str, fs:int=16000, threshold:int=500):
    """
    音声を録音し、無音が一定時間続いたところで録音を停止し、その音声データをファイルに保存します。

    Args:
        silent_duration (int)   : 無音が続く時間(秒)。この時間が経過すると録音が停止します。
        filename (str)          : 録音データを保存するファイルの名前。
        fs (int, optional)      : サンプリングレート。デフォルトは16000Hz。
        threshold (int, optional): 音量の閾値。この値以下の音量が `silent_duration` だけ続いた場合、録音を停止します。デフォルトは500。
    """
    
    blocksize       = 8000
    block_duration  = blocksize / fs  # ブロック長(秒)
    blocks_to_check = int(silent_duration / block_duration)  # 無音判定に必要なブロック数
    q = queue.Queue()
    data_rec = np.array([], dtype='int16')  # data_recをnumpy配列として初期化
    
    def calculate_rms(data):
        """ RMS(Root Mean Square)を計算して音量を測定する """
        return np.sqrt(np.mean(np.square(data)))

    def callback(indata, frames, time, status):
        """コールバック関数"""
        if status:
            print(status, file=sys.stderr)
        q.put(np.frombuffer(indata, dtype=np.int16))

    with sd.RawInputStream(samplerate=fs, blocksize=blocksize, dtype='int16', channels=1, callback=callback):
        silent_blocks = 0
        f_zero = True
        while True:
            data = q.get()
            data_rec = np.concatenate([data_rec, data]) 
            rms = calculate_rms(data.astype(np.float32))
            print(rms) #無音の時のこの数字を確認し、thresholdを設定してください

            if rms < threshold:
                silent_blocks += 1
                if silent_blocks >= blocks_to_check and f_zero == False:
                    print("無音検知 - 録音を停止します")
                    break
            else:
                silent_blocks = 0
                f_zero = False

    # 録音データの保存
    wavio.write(filename, data_rec, fs, sampwidth=2)


if __name__ == "__main__":
    # 使用例:
    record_until_silence(silent_duration=3, filename="my_recording.wav")

    # WhisperでSpeech to Text
    audio_file= open("my_recording.wav", "rb")
    response = client.audio.transcriptions.create(
        model="whisper-1", 
        file=audio_file,
        response_format="text",
    )
    print(response)

この例では、「無音の定義 = 音量(rms)が500未満」としています。もしうまく機能しない場合は、この数字を変えてみてください。
0.5秒ごとにターミナルにrmsの値が表示されていると思います。私の環境では、何もしゃべらなくても100~300くらいの数字になっていて、何か話すと1,000前後になります。ということで閾値を500にしていますが、環境によってこの数字が変わってくると思います。いい塩梅で閾値を設定してみてください。

ゴール2:ボタンを押して開始 / 停止

ゴール1では「無音で停止」としていますが、ボタンを押しても停止したくなります。その部分を組み込んでいきます。

非同期処理ということで、いつもの「threading」を使います。ボタンを押したら start_recording 関数を呼び、非同期処理で record_until_silence を動かします。
また、stop_recording 関数が呼ばれたら stop_recording_event 変数をTrueにして、 非同期処理の record_until_silence が止まるようにします。 stop_recording 関数では、 f_on_recording が False になるまで待ちます。これは record_until_silence 関数の最後に False にしているため、このスレッド処理が終わるまで待つことになります。

うーん、、行ったり来たりでややこしい。。。
ということで、以下のようになりました。無音が3秒続くか、もしくはターミナルでCtrl+Cが押されるまで録音を続けます。
※ except KeyboardInterrupt の部分で、Ctrl+Cを検知してます

import sounddevice as sd
import numpy as np
import queue
import wavio
import threading

from openai import OpenAI
from dotenv import load_dotenv

load_dotenv() # 環境変数を読み込む
client = OpenAI()


recording_thread     = None
stop_recording_event = threading.Event()
f_on_recording       = False

def start_recording(data):
    global recording_thread
    if recording_thread is None:
        stop_recording_event.clear()
        recording_thread = threading.Thread(
            target= record_until_silence, 
            args  = (data['silent_duration'], data['filename'])
        )
        recording_thread.start()

def stop_recording():
    stop_recording_event.set()
    while f_on_recording: #処理が終わるまで待つ
        pass

def record_until_silence(silent_duration:int, filename:str, fs:int=16000, threshold:int=500):
    """
    音声を録音し、無音が一定時間続いたところで録音を停止し、その音声データをファイルに保存します。

    Args:
        silent_duration (int)   : 無音が続く時間(秒)。この時間が経過すると録音が停止します。
        filename (str)          : 録音データを保存するファイルの名前。
        fs (int, optional)      : サンプリングレート。デフォルトは16000Hz。
        threshold (int, optional): 音量の閾値。この値以下の音量が `silent_duration` だけ続いた場合、録音を停止します。デフォルトは500。
    """
    
    global stop_recording_event, recording_thread, f_on_recording

    blocksize       = 8000
    block_duration  = blocksize / fs  # ブロック長(秒)
    blocks_to_check = int(silent_duration / block_duration)  # 無音判定に必要なブロック数
    q = queue.Queue()
    data_rec = np.array([], dtype='int16')  # data_recをnumpy配列として初期化
    
    def calculate_rms(data):
        """ RMS(Root Mean Square)を計算して音量を測定する """
        return np.sqrt(np.mean(np.square(data)))

    def callback(indata, frames, time, status):
        """コールバック関数"""
        if status:
            print(status, file=sys.stderr)
        q.put(np.frombuffer(indata, dtype=np.int16))

    f_on_recording = True
    with sd.RawInputStream(samplerate=fs, blocksize=blocksize, dtype='int16', channels=1, callback=callback):
        silent_blocks = 0
        f_zero = True
        while not stop_recording_event.is_set():
            data = q.get()
            data_rec = np.concatenate([data_rec, data]) 
            rms = calculate_rms(data.astype(np.float32))
            print(rms) #無音の時のこの数字を確認し、thresholdを設定してください

            if rms < threshold:
                silent_blocks += 1
                if silent_blocks >= blocks_to_check and f_zero == False:
                    print("無音検知 - 録音を停止します")
                    break
            else:
                silent_blocks = 0
                f_zero = False

    # 録音データの保存
    wavio.write(filename, data_rec, fs, sampwidth=2)

    # Reset the thread
    stop_recording_event.set()
    recording_thread = None
    f_on_recording   = False




if __name__ == "__main__":

    start_recording({"silent_duration": 3, "filename": 'my_recording.wav'})
    try:
        while not stop_recording_event.is_set():
            pass
    except KeyboardInterrupt:
        stop_recording()
        print("Ctrl+Cが押されました。処理Xを実行します。")


    # WhisperでSpeech to Text
    audio_file= open("my_recording.wav", "rb")
    response = client.audio.transcriptions.create(
        model="whisper-1", 
        file=audio_file,
        response_format="text",
    )
    print(response)


あとは、HTMLで「ボタンを押したら start_recording を呼ぶ」「停止ボタンを押したら stop_recording を呼ぶ」と組み込めばOKです。

クラス化

whisperとセットで使うだろうし、以前作った myopenai.py 内のクラスに組み込みました。
追加した部分だけ書き出しました。

#whisper用
from openai import OpenAI
# 録音関連
import sounddevice as sd
import numpy as np
import queue
import wavio


class myopenai :

    # 録音関係
    recording_thread     = None
    stop_recording_event = threading.Event()
    f_on_recording       = False
    client = OpenAI() #whisperで使う

    #--------------------------------------------------------#
    #--- 録音関係 --------------------------------------------#
    #--------------------------------------------------------#
    def start_recording(self, data):
        if self.recording_thread is None:
            self.f_on_recording = True #record_until_silence内でもOnにしているが、タイムラグがあるのですぐ立てる
            self.stop_recording_event.clear()
            self.recording_thread = threading.Thread(
                target= self.__record_until_silence, 
                args  = (data['silent_duration'], data['filename'])
            )
            self.recording_thread.start()

    def stop_recording(self):
        self.stop_recording_event.set()
        while self.f_on_recording: #処理が終わるまで待つ
            pass

    def is_recording(self):
        return self.f_on_recording

    def __record_until_silence(self, silent_duration:int, filename:str, fs:int=16000, threshold:int=500):
        """
        音声を録音し、無音が一定時間続いたところで録音を停止し、その音声データをファイルに保存します。

        Args:
            silent_duration (int)   : 無音が続く時間(秒)。この時間が経過すると録音が停止します。
            filename (str)          : 録音データを保存するファイルの名前。
            fs (int, optional)      : サンプリングレート。デフォルトは16000Hz。
            threshold (int, optional): 音量の閾値。この値以下の音量が `silent_duration` だけ続いた場合、録音を停止します。デフォルトは500。
        """
        

        blocksize       = 8000
        block_duration  = blocksize / fs  # ブロック長(秒)
        blocks_to_check = int(silent_duration / block_duration)  # 無音判定に必要なブロック数
        q = queue.Queue()
        data_rec = np.array([], dtype='int16')  # data_recをnumpy配列として初期化
        
        def calculate_rms(data):
            """ RMS(Root Mean Square)を計算して音量を測定する """
            return np.sqrt(np.mean(np.square(data)))

        def callback(indata, frames, time, status):
            """コールバック関数"""
            if status:
                print(status, file=sys.stderr)
            q.put(np.frombuffer(indata, dtype=np.int16))

        self.f_on_recording = True
        with sd.RawInputStream(samplerate=fs, blocksize=blocksize, dtype='int16', channels=1, callback=callback):
            silent_blocks = 0
            f_zero = True
            while not self.stop_recording_event.is_set():
                data = q.get()
                data_rec = np.concatenate([data_rec, data]) 
                rms = calculate_rms(data.astype(np.float32))
                print(rms) #無音の時のこの数字を確認し、thresholdを設定してください

                if rms < threshold:
                    silent_blocks += 1
                    if silent_blocks >= blocks_to_check and f_zero == False:
                        print("無音検知 - 録音を停止します")
                        break
                else:
                    silent_blocks = 0
                    f_zero = False

        # 録音データの保存
        wavio.write(filename, data_rec, fs, sampwidth=2)

        # Reset the thread
        self.stop_recording_event.set()
        self.recording_thread = None
        self.f_on_recording   = False


    def mywhisper(self, filename:str) -> str :
        # WhisperでSpeech to Text
        audio_file= open(filename, "rb")
        response = self.client.audio.transcriptions.create(
            model="whisper-1", 
            file=audio_file,
            response_format="text",
        )

        return response

これを、以下のような感じで呼び出します。(GPTの指定は何でもOKです(今回は関係ない))

from app.common import myopenai
from dotenv import load_dotenv
load_dotenv() # 環境変数を読み込む

if __name__ == "__main__":
    mo = myopenai.myopenai('gpt-4-1106-preview')
    mo.start_recording({"silent_duration": 3, "filename": 'my_recording.wav'})

    try:
        while mo.is_recording():
            pass
    except KeyboardInterrupt:
        mo.stop_recording()
        print("Ctrl+Cが押されました。処理Xを実行します。")

    res = mo.mywhisper("my_recording.wav")
    print(res)

マイクに向かってしゃべった内容が print されれば大成功です!


最後まで見ていただきありがとうございました!
次は、この録音機能をWebアプリ内に組み込んでいきます!


サポート問い合わせ先

DigitalOceanのサポート問い合わせリンクがなかなか見つからないので、リンクを載せておきます。

https://cloudsupport.digitalocean.com/s/

場所は、トップページの右下にある「Ask a question」に行き、そのページの一番下(欄外っぽいところ)にひっそりと「Support」というリンクがあります(Contact内)。そのページの一番最後に「Contact Support」ボタンがあります。


この記事が気に入ったらサポートをしてみませんか?