当前位置: 首页 » 产品 » 农牧养殖 » 正文

python的webrtc库如何实现语音端点检测

放大字体  缩小字体 发布日期: 2025-01-24 16:20   来源:http://www.baidu.com/  作者:无忧资讯  浏览次数:26
核心提示:  语音端点检测最早应用于电话传输和检测系统当中,用于通信信道的时间分配,提高传输线路的利用效率.端点检测属于语音处理系统

  语音端点检测最早应用于电话传输和检测系统当中,用于通信信道的时间分配,提高传输线路的利用效率.端点检测属于语音处理系统的前端操作,在语音检测领域意义重大.

  但是目前的语音端点检测,尤其是检测 人声 开始和结束的端点始终是属于技术难点,各家公司始终处于 能判断,但是不敢保证 判别准确性 的阶段.

python的webrtc库如何实现语音端点检测 三联

  现在基于云端语义库的聊天机器人层出不穷,其中最著名的当属amazon的 Alexa/Echo 智能音箱.

timg.jpg

  国内如雨后春笋般出现了各种搭载语音聊天的智能音箱(如前几天在知乎上广告的若琪机器人)和各类智能机器人产品.国内语音服务提供商主要面对中文语音服务,由于语音不像图像有分辨率等等较为客观的指标,很多时候凭主观判断,所以较难判断各家语音识别和合成技术的好坏.但是我个人认为,国内的中文语音服务和国外的英文语音服务,在某些方面已经有超越的趋势.

timg (1).jpg

  通常搭建机器人聊天系统主要包括以下三个方面:

  语音转文字(ASR/STT)

  语义内容(NLU/NLP)

  文字转语音(TTS)

  语音转文字(ASR/STT)

  在将语音传给云端API之前,是本地前端的语音采集,这部分主要包括如下几个方面:

  麦克风降噪

  声源定位

  回声消除

  唤醒词

  语音端点检测

  音频格式压缩

  python 端点检测

  由于实际应用中,单纯依靠能量检测特征检测等方法很难判断人声说话的起始点,所以市面上大多数的语音产品都是使用唤醒词判断语音起始.另外加上声音回路,还可以做语音打断.这样的交互方式可能有些傻,每次必须喊一下 唤醒词 才能继续聊天.这种方式聊多了,个人感觉会嘴巴疼:-O .现在github上有snowboy唤醒词的开源库,大家可以登录snowboy官网训练自己的唤醒词模型.

  Kitt-AI : Snowboy

  Sensory : Sensory

  考虑到用唤醒词嘴巴会累,所以大致调研了一下,Python拥有丰富的库,直接import就能食用.这种方式容易受强噪声干扰,适合一个人在家玩玩.

  pyaudio: pip install pyaudio 可以从设备节点读取原始音频流数据,音频编码是PCM格式;

  webrtcvad: pip install webrtcvad 检测判断一组语音数据是否为空语音;

  当检测到持续时间长度 T1 vad检测都有语音活动,可以判定为语音起始;

  当检测到持续时间长度 T2 vad检测都没有有语音活动,可以判定为语音结束;

  完整程序代码可以从我的github下载

  程序很简单,相信看一会儿就明白了

  '''

  Requirements:

  + pyaudio - `pip install pyaudio`

  + py-webrtcvad - `pip install webrtcvad`

  '''

  import webrtcvad

  import collections

  import sys

  import signal

  import pyaudio

  from array import array

  from struct import pack

  import wave

  import time

  FORMAT=pyaudio.paInt16

  CHANNELS=1

  RATE=16000

  CHUNK_DURATION_MS=30 # supports 10, 20 and 30 (ms)

  PADDING_DURATION_MS=1500 # 1 sec jugement

  CHUNK_SIZE=int(RATE CHUNK_DURATION_MS / 1000) # chunk to read

  CHUNK_BYTES=CHUNK_SIZE 2 # 16bit=2 bytes, PCM

  NUM_PADDING_CHUNKS=int(PADDING_DURATION_MS / CHUNK_DURATION_MS)

  # NUM_WINDOW_CHUNKS=int(240 / CHUNK_DURATION_MS)

  NUM_WINDOW_CHUNKS=int(400 / CHUNK_DURATION_MS) # 400 ms/ 30ms ge

  NUM_WINDOW_CHUNKS_END=NUM_WINDOW_CHUNKS 2

  START_OFFSET=int(NUM_WINDOW_CHUNKS CHUNK_DURATION_MS 0.5 RATE)

  vad=webrtcvad.Vad(1)

  pa=pyaudio.PyAudio()

  stream=pa.open(format=FORMAT,

  channels=CHANNELS,

  rate=RATE,

  input=True,

  start=False,

  # input_device_index=2,

  frames_per_buffer=CHUNK_SIZE)

  got_a_sentence=False

  leave=False

  def handle_int(sig, chunk):

  global leave, got_a_sentence

  leave=True

  got_a_sentence=True

  def record_to_file(path, data, sample_width):

  "Records from the microphone and outputs the resulting data to 'path'"

  # sample_width, data=record()

  data=pack('<' + ('h' len(data)), data)

  wf=wave.open(path, 'wb')

  wf.setnchannels(1)

  wf.setsampwidth(sample_width)

  wf.setframerate(RATE)

  wf.writeframes(data)

  wf.close()

  def normalize(snd_data):

  "Average the volume out"

  MAXIMUM=32767 # 16384

  times=float(MAXIMUM) / max(abs(i) for i in snd_data)

  r=array('h')

  for i in snd_data:

  r.append(int(i times))

  return r

  signal.signal(signal.SIGINT, handle_int)

  while not leave:

  ring_buffer=collections.deque(maxlen=NUM_PADDING_CHUNKS)

  triggered=False

  voiced_frames=[]

  ring_buffer_flags=[0] NUM_WINDOW_CHUNKS

  ring_buffer_index=0

  ring_buffer_flags_end=[0] NUM_WINDOW_CHUNKS_END

  ring_buffer_index_end=0

  buffer_in=''

  # WangS

  raw_data=array('h')

  index=0

  start_point=0

  StartTime=time.time()

  print(" recording: ")

  stream.start_stream()

  while not got_a_sentence and not leave:

  chunk=stream.read(CHUNK_SIZE)

  # add WangS

  raw_data.extend(array('h', chunk))

  index +=CHUNK_SIZE

  TimeUse=time.time() - StartTime

  active=vad.is_speech(chunk, RATE)

  sys.stdout.write('1' if active else '_')

  ring_buffer_flags[ring_buffer_index]=1 if active else 0

  ring_buffer_index +=1

  ring_buffer_index %=NUM_WINDOW_CHUNKS

  ring_buffer_flags_end[ring_buffer_index_end]=1 if active else 0

  ring_buffer_index_end +=1

  ring_buffer_index_end %=NUM_WINDOW_CHUNKS_END

  # start point detection

  if not triggered:

  ring_buffer.append(chunk)

  num_voiced=sum(ring_buffer_flags)

  if num_voiced > 0.8 NUM_WINDOW_CHUNKS:

  sys.stdout.write(' Open ')

  triggered=True

  start_point=index - CHUNK_SIZE 20 # start point

  # voiced_frames.extend(ring_buffer)

  ring_buffer.clear()

  # end point detection

  else:

  # voiced_frames.append(chunk)

  ring_buffer.append(chunk)

  num_unvoiced=NUM_WINDOW_CHUNKS_END - sum(ring_buffer_flags_end)

  if num_unvoiced > 0.90 NUM_WINDOW_CHUNKS_END or TimeUse > 10:

  sys.stdout.write(' Close ')

  triggered=False

  got_a_sentence=True

  sys.stdout.flush()

  sys.stdout.write(' ')

  # data=b''.join(voiced_frames)

  stream.stop_stream()

  print(" done recording")

  got_a_sentence=False

  # write to file

  raw_data.reverse()

  for index in range(start_point):

  raw_data.pop()

  raw_data.reverse()

  raw_data=normalize(raw_data)

  record_to_file("recording.wav", raw_data, 2)

  leave=True

  stream.close()

  程序运行方式sudo python vad.py

内容来源:https://www.16jixie.com/news/show-2730.html
 
 
[ 产品搜索 ]  [ 加入收藏 ]  [ 告诉好友 ]  [ 打印本文 ]  [ 违规举报 ]  [ 关闭窗口 ]

 

 
推荐图文
推荐产品
点击排行
    行业协会  备案信息  可信网站