如何仅抄写您需要的python:连接之前聆听
#python #speechrecognition #speechtotext #transcription

想象一家快餐店使用语音到文本API实时接收订单。挑战是客户将在WebSocket连接打开之前开始讲话并发送音频数据。我们需要一种方法来捕获该音频,并在打开Websocket之后转录客户所说的话,直到他们完成订单。

一种解决方案是使用缓冲区或队列在连接Websocket之前存储音频数据。在Python中,我们可以使用列表来实现缓冲区。在建立WebSocket连接之前,我们可以将音频数据添加到队列中,甚至在连接后的语音到文本转录期间继续使用缓冲区。

在下一部分中,我们将看到使用Python和Deepgram语音到文本API实现此解决方案。

使用Python中的缓冲区从语音到文本转录存储音频数据

要运行此代码,您需要几件事。

pip安装deepgram-sdk
PIP安装Pyaudio

以下是Python中实现的解决方案,并快速解释代码:

import pyaudio
import asyncio
import websockets
import os
import json

DEEPGRAM_API_KEY = "YOUR_DEEPGRAM_API_KEY"

FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 8000

audio_queue = asyncio.Queue()

def callback(input_data, frame_count, time_info, status_flags):
   audio_queue.put_nowait(input_data)

   return (input_data, pyaudio.paContinue)


async def microphone(): 
   audio = pyaudio.PyAudio()
   stream = audio.open(
       format = FORMAT,
       channels = CHANNELS,
       rate = RATE,
       input = True,
       frames_per_buffer = CHUNK,
       stream_callback = callback
   )

   stream.start_stream()

   while stream.is_active():
       await asyncio.sleep(0.1)


   stream.stop_stream()
   stream.close()

async def process():
   extra_headers = {
       'Authorization': 'token ' + DEEPGRAM_API_KEY
   }

   async with websockets.connect('wss://api.deepgram.com/v1/listen?encoding=linear16&sample_rate=16000&channels=1', extra_headers = extra_headers) as ws:
       async def sender(ws): # sends audio to websocket
           try:
               while True:
                   data = await audio_queue.get().
                   await ws.send(data)
           except Exception as e:
               print('Error while sending: ', + str(e))
               raise

       async def receiver(ws): 
           async for msg in ws:
               msg = json.loads(msg)
               transcript = msg['channel']['alternatives'][0]['transcript']

               if transcript:
                   print(f'Transcript = {transcript}')

       await asyncio.gather(sender(ws), receiver(ws))



async def run():
   await asyncio.gather(microphone(),process())

if __name__ == '__main__':
   asyncio.run(run())

Python代码解释用于使用语音到文本转录的缓冲区

由于我们与Python的Asyncio合作,因此我们需要创建一个由Pyaudio定义的回调函数。此回调将项目放入队列中而不会阻止。

def callback(input_data, frame_count, time_info, status_flags):
   audio_queue.put_nowait(input_data)

   return (input_data, pyaudio.paContinue)

我们定义了microphone()函数,创建基于pyaudio的stream,然后在stream_callback中传递回调。然后,我们在活动活动的同时启动流并循环。

async def microphone(): 
   audio = pyaudio.PyAudio()
   stream = audio.open(
       format = FORMAT,
       channels = CHANNELS,
       rate = RATE,
       input = True,
       frames_per_buffer = CHUNK,
       stream_callback = callback
   )

   stream.start_stream()

   while stream.is_active():
       await asyncio.sleep(0.1)


   stream.stop_stream()
   stream.close()

接下来,我们定义了一个称为process()的外部函数,该功能获得了DeepGram的授权。我们为async with websockets.connect创建上下文管理器,以连接到Deepgram WebSocket服务器。

sender()函数将音频发送到Websocket。 Buffer audio_queue.get()删除并从队列中返回项目。如果队列为空,它将等到一个物品可用。

reciever()函数接收成绩单,解析JSON响应,并将成绩单打印到控制台。

最后,我们使用mainasyncio.run(run())运行程序。

async def process():
   extra_headers = {
       'Authorization': 'token ' + DEEPGRAM_API_KEY
   }

   async with websockets.connect('wss://api.deepgram.com/v1/listen?encoding=linear16&sample_rate=16000&channels=1', extra_headers = extra_headers) as ws:
       async def sender(ws):
           try:
               while True:
                   data = await audio_queue.get().
                   await ws.send(data)
           except Exception as e:
               print('Error while sending: ', + str(e))
               raise

       async def receiver(ws): # receives the transcript
           async for msg in ws:
               msg = json.loads(msg)
               transcript = msg['channel']['alternatives'][0]['transcript']

               if transcript:
                   print(f'Transcript = {transcript}')

       await asyncio.gather(sender(ws), receiver(ws))



async def run():
   await asyncio.gather(microphone(),process())

if __name__ == '__main__':
   asyncio.run(run())

结论

我们希望您喜欢这个短项目。如果您需要有关教程或运行代码的帮助,请不要犹豫与我们联系。最好的起点是在我们的GitHub Discussions中。