詳解python的webrtc庫實現語音端點檢測

詳解python的webrtc庫實現語音端點檢測

引言

語音端點檢測最早應用於電話傳輸和檢測系統當中,用於通訊通道的時間分配,提高傳輸線路的利用效率.端點檢測屬於語音處理系統的前端操作,在語音檢測領域意義重大.

但是目前的語音端點檢測,尤其是檢測 人聲 開始和結束的端點始終是屬於技術難點,各家公司始終處於 能判斷,但是不敢保證 判別準確性 的階段.

Screenshot from 2017-05-25 22-42-50.png 

現在基於雲端語義庫的聊天機器人層出不窮,其中最著名的當屬amazon的 Alexa/Echo 智慧音箱.

timg.jpg

國內如雨後春筍般出現了各種搭載語音聊天的智慧音箱(如前幾天在知乎上廣告的若琪機器人)和各類智慧機器人產品.國內語音服務提供商主要面對中文語音服務,由於語音不像影象有解析度等等較為客觀的指標,很多時候憑主觀判斷,所以較難判斷各家語音識別和合成技術的好壞.但是我個人認為,國內的中文語音服務和國外的英文語音服務,在某些方面已經有超越的趨勢.

timg (1).jpg

通常搭建機器人聊天系統主要包括以下三個方面: 

 語音轉文字(ASR/STT)
 語義內容(NLU/NLP)
 文字轉語音(TTS)

語音轉文字(ASR/STT)

在將語音傳給雲端API之前,是本地前端的語音採集,這部分主要包括如下幾個方面: 

 麥克風降噪
 聲源定位
 回聲消除
 喚醒詞
 語音端點檢測
 音訊格式壓縮

python 端點檢測

由於實際應用中,單純依靠能量檢測特徵檢測等方法很難判斷人聲說話的起始點,所以市面上大多數的語音產品都是使用喚醒詞判斷語音起始.另外加上聲音迴路,還可以做語音打斷.這樣的互動方式可能有些傻,每次必須喊一下 喚醒詞 才能繼續聊天.這種方式聊多了,個人感覺會嘴巴疼:-O .現在github上有snowboy喚醒詞的開源庫,大家可以登入snowboy官網訓練自己的喚醒詞模型. 

 Kitt-AI : Snowboy 
 Sensory : Sensory

考慮到用喚醒詞嘴巴會累,所以大致調研了一下,Python擁有豐富的庫,直接import就能食用.這種方式容易受強噪聲干擾,適合一個人在家玩玩. 

 pyaudio: pip install pyaudio 可以從裝置節點讀取原始音訊流資料,音訊編碼是PCM格式;
 webrtcvad: pip install webrtcvad 檢測判斷一組語音資料是否為空語音;

當檢測到持續時間長度 T1 vad檢測都有語音活動,可以判定為語音起始;

當檢測到持續時間長度 T2 vad檢測都沒有有語音活動,可以判定為語音結束;

完整程式程式碼可以從我的github下載

程式很簡單,相信看一會兒就明白了


'''
Requirements:
pyaudio - `pip install pyaudio`
py-webrtcvad - `pip install webrtcvad`
'''
import webrtcvad
import collections
import sys
import signal
import pyaudio
from array import array
from struct import pack
import wave
import time
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK_DURATION_MS = 30    # supports 10, 20 and 30 (ms)
PADDING_DURATION_MS = 1500  # 1 sec jugement
CHUNK_SIZE = int(RATE CHUNK_DURATION_MS / 1000) # chunk to read
CHUNK_BYTES = CHUNK_SIZE 2 # 16bit = 2 bytes, PCM
NUM_PADDING_CHUNKS = int(PADDING_DURATION_MS / CHUNK_DURATION_MS)
# NUM_WINDOW_CHUNKS = int(240 / CHUNK_DURATION_MS)
NUM_WINDOW_CHUNKS = int(400 / CHUNK_DURATION_MS) # 400 ms/ 30ms ge
NUM_WINDOW_CHUNKS_END = NUM_WINDOW_CHUNKS 2
START_OFFSET = int(NUM_WINDOW_CHUNKS CHUNK_DURATION_MS 0.5 RATE)
vad = webrtcvad.Vad(1)
pa = pyaudio.PyAudio()
stream = pa.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
start=False,
# input_device_index=2,
frames_per_buffer=CHUNK_SIZE)
got_a_sentence = False
leave = False
def handle_int(sig, chunk):
global leave, got_a_sentence
leave = True
got_a_sentence = True
def record_to_file(path, data, sample_width):
"Records from the microphone and outputs the resulting data to 'path'"
# sample_width, data = record()
data = pack('<'   ('h' len(data)), data)
wf = wave.open(path, 'wb')
wf.setnchannels(1)
wf.setsampwidth(sample_width)
wf.setframerate(RATE)
wf.writeframes(data)
wf.close()
def normalize(snd_data):
"Average the volume out"
MAXIMUM = 32767 # 16384
times = float(MAXIMUM) / max(abs(i) for i in snd_data)
r = array('h')
for i in snd_data:
r.append(int(i times))
return r
signal.signal(signal.SIGINT, handle_int)
while not leave:
ring_buffer = collections.deque(maxlen=NUM_PADDING_CHUNKS)
triggered = False
voiced_frames = []
ring_buffer_flags = [0] NUM_WINDOW_CHUNKS
ring_buffer_index = 0
ring_buffer_flags_end = [0] NUM_WINDOW_CHUNKS_END
ring_buffer_index_end = 0
buffer_in = ''
# WangS
raw_data = array('h')
index = 0
start_point = 0
StartTime = time.time()
print(" recording: ")
stream.start_stream()
while not got_a_sentence and not leave:
chunk = stream.read(CHUNK_SIZE)
# add WangS
raw_data.extend(array('h', chunk))
index  = CHUNK_SIZE
TimeUse = time.time() - StartTime
active = vad.is_speech(chunk, RATE)
sys.stdout.write('1' if active else '_')
ring_buffer_flags[ring_buffer_index] = 1 if active else 0
ring_buffer_index  = 1
ring_buffer_index %= NUM_WINDOW_CHUNKS
ring_buffer_flags_end[ring_buffer_index_end] = 1 if active else 0
ring_buffer_index_end  = 1
ring_buffer_index_end %= NUM_WINDOW_CHUNKS_END
# start point detection
if not triggered:
ring_buffer.append(chunk)
num_voiced = sum(ring_buffer_flags)
if num_voiced > 0.8 NUM_WINDOW_CHUNKS:
sys.stdout.write(' Open ')
triggered = True
start_point = index - CHUNK_SIZE 20 # start point
# voiced_frames.extend(ring_buffer)
ring_buffer.clear()
# end point detection
else:
# voiced_frames.append(chunk)
ring_buffer.append(chunk)
num_unvoiced = NUM_WINDOW_CHUNKS_END - sum(ring_buffer_flags_end)
if num_unvoiced > 0.90 NUM_WINDOW_CHUNKS_END or TimeUse > 10:
sys.stdout.write(' Close ')
triggered = False
got_a_sentence = True
sys.stdout.flush()
sys.stdout.write('\n')
# data = b''.join(voiced_frames)
stream.stop_stream()
print(" done recording")
got_a_sentence = False
# write to file
raw_data.reverse()
for index in range(start_point):
raw_data.pop()
raw_data.reverse()
raw_data = normalize(raw_data)
record_to_file("recording.wav", raw_data, 2)
leave = True
stream.close()

程式執行方式sudo python vad.py