Merge pull request #2181 from pipecat-ai/pk/fix-aws-nova-sonic-pipeline-freeze

Fix a pipeline freeze when using AWS Nova Sonic. The freeze occurs if…
This commit is contained in:
kompfner
2025-07-10 16:30:55 -04:00
committed by GitHub
2 changed files with 52 additions and 41 deletions

View File

@@ -5,6 +5,14 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Fixed
- Fix a pipeline freeze when using AWS Nova Sonic, which would occur if the
user started early, while the bot was still working through
`trigger_assistant_response()`.
## [0.0.75] - 2025-07-08
### Added

View File

@@ -474,7 +474,6 @@ class AWSNovaSonicLLMService(LLMService):
# If we need to, send assistant response trigger (depends on self._connected_time)
if self._triggering_assistant_response:
await self._send_assistant_response_trigger()
self._triggering_assistant_response = False
async def _disconnect(self):
try:
@@ -1105,7 +1104,6 @@ class AWSNovaSonicLLMService(LLMService):
# Send the trigger audio, if we're fully connected and set up
if self._connected_time is not None:
await self._send_assistant_response_trigger()
self._triggering_assistant_response = False
async def _send_assistant_response_trigger(self):
if (
@@ -1113,46 +1111,51 @@ class AWSNovaSonicLLMService(LLMService):
): # should never happen
return
logger.debug("Sending assistant response trigger...")
try:
logger.debug("Sending assistant response trigger...")
chunk_duration = 0.02 # what we might get from InputAudioRawFrame
chunk_size = int(
chunk_duration
* self._params.input_sample_rate
* self._params.input_channel_count
* (self._params.input_sample_size / 8)
) # e.g. 0.02 seconds of 16-bit (2-byte) PCM mono audio at 16kHz is 640 bytes
chunk_duration = 0.02 # what we might get from InputAudioRawFrame
chunk_size = int(
chunk_duration
* self._params.input_sample_rate
* self._params.input_channel_count
* (self._params.input_sample_size / 8)
) # e.g. 0.02 seconds of 16-bit (2-byte) PCM mono audio at 16kHz is 640 bytes
# Lead with a bit of blank audio, if needed.
# It seems like the LLM can't quite "hear" the first little bit of audio sent on a
# connection.
current_time = time.time()
max_blank_audio_duration = 0.5
blank_audio_duration = (
max_blank_audio_duration - (current_time - self._connected_time)
if self._connected_time is not None
and (current_time - self._connected_time) < max_blank_audio_duration
else None
)
if blank_audio_duration:
logger.debug(
f"Leading assistant response trigger with {blank_audio_duration}s of blank audio"
# Lead with a bit of blank audio, if needed.
# It seems like the LLM can't quite "hear" the first little bit of audio sent on a
# connection.
current_time = time.time()
max_blank_audio_duration = 0.5
blank_audio_duration = (
max_blank_audio_duration - (current_time - self._connected_time)
if self._connected_time is not None
and (current_time - self._connected_time) < max_blank_audio_duration
else None
)
blank_audio_chunk = b"\x00" * chunk_size
num_chunks = int(blank_audio_duration / chunk_duration)
for _ in range(num_chunks):
await self._send_user_audio_event(blank_audio_chunk)
await asyncio.sleep(chunk_duration)
if blank_audio_duration:
logger.debug(
f"Leading assistant response trigger with {blank_audio_duration}s of blank audio"
)
blank_audio_chunk = b"\x00" * chunk_size
num_chunks = int(blank_audio_duration / chunk_duration)
for _ in range(num_chunks):
await self._send_user_audio_event(blank_audio_chunk)
await asyncio.sleep(chunk_duration)
# Send trigger audio
# NOTE: this audio *will* be transcribed and eventually make it into the context. That's OK:
# if we ever need to seed this service again with context it would make sense to include it
# since the instruction (i.e. the "wait for the trigger" instruction) will be part of the
# context as well.
audio_chunks = [
self._assistant_response_trigger_audio[i : i + chunk_size]
for i in range(0, len(self._assistant_response_trigger_audio), chunk_size)
]
for chunk in audio_chunks:
await self._send_user_audio_event(chunk)
await asyncio.sleep(chunk_duration)
# Send trigger audio
# NOTE: this audio *will* be transcribed and eventually make it into the context. That's OK:
# if we ever need to seed this service again with context it would make sense to include it
# since the instruction (i.e. the "wait for the trigger" instruction) will be part of the
# context as well.
audio_chunks = [
self._assistant_response_trigger_audio[i : i + chunk_size]
for i in range(0, len(self._assistant_response_trigger_audio), chunk_size)
]
for chunk in audio_chunks:
await self._send_user_audio_event(chunk)
await asyncio.sleep(chunk_duration)
finally:
# We need to clean up in case sending the trigger was cancelled, e.g. in the case of a user interruption.
# (An asyncio.CancelledError would be raised in that case.)
self._triggering_assistant_response = False