想象一家快餐店使用语音到文本API实时接收订单。挑战是客户将在WebSocket连接打开之前开始讲话并发送音频数据。我们需要一种方法来捕获该音频,并在打开Websocket之后转录客户所说的话,直到他们完成订单。
一种解决方案是使用缓冲区或队列在连接Websocket之前存储音频数据。在Python中,我们可以使用列表来实现缓冲区。在建立WebSocket连接之前,我们可以将音频数据添加到队列中,甚至在连接后的语音到文本转录期间继续使用缓冲区。
在下一部分中,我们将看到使用Python和Deepgram语音到文本API实现此解决方案。
使用Python中的缓冲区从语音到文本转录存储音频数据
要运行此代码,您需要几件事。
- 从Deepgram中获取Deepgram API key
- 使用
pip
安装以下软件包:
pip安装deepgram-sdk
PIP安装Pyaudio
以下是Python中实现的解决方案,并快速解释代码:
import pyaudio
import asyncio
import websockets
import os
import json
DEEPGRAM_API_KEY = "YOUR_DEEPGRAM_API_KEY"
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 8000
audio_queue = asyncio.Queue()
def callback(input_data, frame_count, time_info, status_flags):
audio_queue.put_nowait(input_data)
return (input_data, pyaudio.paContinue)
async def microphone():
audio = pyaudio.PyAudio()
stream = audio.open(
format = FORMAT,
channels = CHANNELS,
rate = RATE,
input = True,
frames_per_buffer = CHUNK,
stream_callback = callback
)
stream.start_stream()
while stream.is_active():
await asyncio.sleep(0.1)
stream.stop_stream()
stream.close()
async def process():
extra_headers = {
'Authorization': 'token ' + DEEPGRAM_API_KEY
}
async with websockets.connect('wss://api.deepgram.com/v1/listen?encoding=linear16&sample_rate=16000&channels=1', extra_headers = extra_headers) as ws:
async def sender(ws): # sends audio to websocket
try:
while True:
data = await audio_queue.get().
await ws.send(data)
except Exception as e:
print('Error while sending: ', + str(e))
raise
async def receiver(ws):
async for msg in ws:
msg = json.loads(msg)
transcript = msg['channel']['alternatives'][0]['transcript']
if transcript:
print(f'Transcript = {transcript}')
await asyncio.gather(sender(ws), receiver(ws))
async def run():
await asyncio.gather(microphone(),process())
if __name__ == '__main__':
asyncio.run(run())
Python代码解释用于使用语音到文本转录的缓冲区
由于我们与Python的Asyncio合作,因此我们需要创建一个由Pyaudio定义的回调函数。此回调将项目放入队列中而不会阻止。
def callback(input_data, frame_count, time_info, status_flags):
audio_queue.put_nowait(input_data)
return (input_data, pyaudio.paContinue)
我们定义了microphone()
函数,创建基于pyaudio的stream
,然后在stream_callback
中传递回调。然后,我们在活动活动的同时启动流并循环。
async def microphone():
audio = pyaudio.PyAudio()
stream = audio.open(
format = FORMAT,
channels = CHANNELS,
rate = RATE,
input = True,
frames_per_buffer = CHUNK,
stream_callback = callback
)
stream.start_stream()
while stream.is_active():
await asyncio.sleep(0.1)
stream.stop_stream()
stream.close()
接下来,我们定义了一个称为process()
的外部函数,该功能获得了DeepGram的授权。我们为async with websockets.connect
创建上下文管理器,以连接到Deepgram WebSocket服务器。
sender()
函数将音频发送到Websocket。 Buffer audio_queue.get()
删除并从队列中返回项目。如果队列为空,它将等到一个物品可用。
reciever()
函数接收成绩单,解析JSON响应,并将成绩单打印到控制台。
最后,我们使用main
的asyncio.run(run())
运行程序。
async def process():
extra_headers = {
'Authorization': 'token ' + DEEPGRAM_API_KEY
}
async with websockets.connect('wss://api.deepgram.com/v1/listen?encoding=linear16&sample_rate=16000&channels=1', extra_headers = extra_headers) as ws:
async def sender(ws):
try:
while True:
data = await audio_queue.get().
await ws.send(data)
except Exception as e:
print('Error while sending: ', + str(e))
raise
async def receiver(ws): # receives the transcript
async for msg in ws:
msg = json.loads(msg)
transcript = msg['channel']['alternatives'][0]['transcript']
if transcript:
print(f'Transcript = {transcript}')
await asyncio.gather(sender(ws), receiver(ws))
async def run():
await asyncio.gather(microphone(),process())
if __name__ == '__main__':
asyncio.run(run())
结论
我们希望您喜欢这个短项目。如果您需要有关教程或运行代码的帮助,请不要犹豫与我们联系。最好的起点是在我们的GitHub Discussions中。