diff --git a/CHANGELOG.md b/CHANGELOG.md index 065a5ced6..787be8afd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added new `sample_rate` constructor parameter to `TavusVideoService` to allow changing the output sample rate. +- Added new `NeuphonicTTSService`. + (see https://neuphonic.com) + - Added new `UltravoxSTTService`. (see https://github.com/fixie-ai/ultravox) @@ -269,6 +272,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add foundational example `07w-interruptible-fal.py`, showing `FalSTTService`. +- Added a new Ultravox example + `examples/foundational/07u-interruptible-ultravox.py`. + +- Added new Neuphonic examples + `examples/foundational/07v-interruptible-neuphonic.py` and + `examples/foundational/07v-interruptible-neuphonic-http.py`. + - Added a new example `examples/foundational/36-user-email-gathering.py` to show how to gather user emails. The example uses's Cartesia's `` tags and Rime `spell()` function to spell out the emails for confirmation. @@ -367,6 +377,9 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general")) ### Fixed +- Fixed an issue that would cause undesired interruptions via + `EmulateUserStartedSpeakingFrame`. + - Fixed a `GoogleLLMService` that was causing an exception when sending inline audio in some cases. @@ -383,10 +396,6 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general")) - Fixed `match_endofsentence` support for ellipses. -- Fixed an issue that would cause undesired interruptions via - `EmulateUserStartedSpeakingFrame` when only interim transcriptions (i.e. no - final transcriptions) where received. - - Fixed an issue where `EndTaskFrame` was not triggering `on_client_disconnected` or closing the WebSocket in FastAPI. diff --git a/examples/foundational/07v-interruptible-ultravox.py b/examples/foundational/07u-interruptible-ultravox.py similarity index 100% rename from examples/foundational/07v-interruptible-ultravox.py rename to examples/foundational/07u-interruptible-ultravox.py diff --git a/examples/foundational/07u-interruptible-neuphonic-http.py b/examples/foundational/07v-interruptible-neuphonic-http.py similarity index 100% rename from examples/foundational/07u-interruptible-neuphonic-http.py rename to examples/foundational/07v-interruptible-neuphonic-http.py diff --git a/examples/foundational/07u-interruptible-neuphonic.py b/examples/foundational/07v-interruptible-neuphonic.py similarity index 100% rename from examples/foundational/07u-interruptible-neuphonic.py rename to examples/foundational/07v-interruptible-neuphonic.py diff --git a/pyproject.toml b/pyproject.toml index 87a34f929..4e300bd91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ Website = "https://pipecat.ai" anthropic = [ "anthropic~=0.49.0" ] assemblyai = [ "assemblyai~=0.37.0" ] aws = [ "boto3~=1.37.16" ] -azure = [ "azure-cognitiveservices-speech~=1.43.0"] +azure = [ "azure-cognitiveservices-speech~=1.42.0"] canonical = [ "aiofiles~=24.1.0" ] cartesia = [ "cartesia~=1.4.0", "websockets~=13.1" ] neuphonic = [ "pyneuphonic~=1.5.13", "websockets~=13.1" ] diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index 9267af364..75435a214 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -5,7 +5,6 @@ # import asyncio -import time from abc import abstractmethod from typing import Dict, List @@ -222,17 +221,15 @@ class LLMUserContextAggregator(LLMContextResponseAggregator): self, context: OpenAILLMContext, aggregation_timeout: float = 1.0, - bot_interruption_timeout: float = 5.0, **kwargs, ): super().__init__(context=context, role="user", **kwargs) self._aggregation_timeout = aggregation_timeout - self._bot_interruption_timeout = bot_interruption_timeout self._seen_interim_results = False self._user_speaking = False - self._last_user_speaking_time = 0 self._emulating_vad = False + self._waiting_for_aggregation = False self._aggregation_event = asyncio.Event() self._aggregation_task = None @@ -240,6 +237,7 @@ class LLMUserContextAggregator(LLMContextResponseAggregator): def reset(self): super().reset() self._seen_interim_results = False + self._waiting_for_aggregation = False async def handle_aggregation(self, aggregation: str): self._context.add_message({"role": self.role, "content": self._aggregation}) @@ -285,14 +283,11 @@ class LLMUserContextAggregator(LLMContextResponseAggregator): # Reset the aggregation. Reset it before pushing it down, otherwise # if the tasks gets cancelled we won't be able to clear things up. - self._aggregation = "" + self.reset() frame = OpenAILLMContextFrame(self._context) await self.push_frame(frame) - # Reset our accumulator state. - self.reset() - async def _start(self, frame: StartFrame): self._create_aggregation_task() @@ -303,12 +298,14 @@ class LLMUserContextAggregator(LLMContextResponseAggregator): await self._cancel_aggregation_task() async def _handle_user_started_speaking(self, _: UserStartedSpeakingFrame): - self._last_user_speaking_time = time.time() self._user_speaking = True + self._waiting_for_aggregation = True async def _handle_user_stopped_speaking(self, _: UserStoppedSpeakingFrame): - self._last_user_speaking_time = time.time() self._user_speaking = False + # We just stopped speaking. Let's see if there's some aggregation to + # push. If the last thing we saw is an interim transcription, let's wait + # pushing the aggregation as we will probably get a final transcription. if not self._seen_interim_results: await self.push_aggregation() @@ -361,18 +358,13 @@ class LLMUserContextAggregator(LLMContextResponseAggregator): frame we might want to interrupt the bot. """ - if not self._user_speaking: - diff_time = time.time() - self._last_user_speaking_time - if diff_time > self._bot_interruption_timeout: - # If we reach this case we received a transcription but VAD was - # not able to detect voice (e.g. when you whisper a short - # utterance). So, we need to emulate VAD (i.e. user - # start/stopped speaking). - await self.push_frame(EmulateUserStartedSpeakingFrame(), FrameDirection.UPSTREAM) - self._emulating_vad = True - - # Reset time so we don't interrupt again right away. - self._last_user_speaking_time = time.time() + if not self._user_speaking and not self._waiting_for_aggregation: + # If we reach this case we received a transcription but VAD was not + # able to detect voice (e.g. when you whisper a short + # utterance). So, we need to emulate VAD (i.e. user start/stopped + # speaking). + await self.push_frame(EmulateUserStartedSpeakingFrame(), FrameDirection.UPSTREAM) + self._emulating_vad = True class LLMAssistantContextAggregator(LLMContextResponseAggregator): @@ -554,14 +546,11 @@ class LLMUserResponseAggregator(LLMUserContextAggregator): # Reset the aggregation. Reset it before pushing it down, otherwise # if the tasks gets cancelled we won't be able to clear things up. - self._aggregation = "" + self.reset() frame = LLMMessagesFrame(self._context.messages) await self.push_frame(frame) - # Reset our accumulator state. - self.reset() - class LLMAssistantResponseAggregator(LLMAssistantContextAggregator): def __init__(self, messages: List[dict] = [], **kwargs): @@ -573,10 +562,7 @@ class LLMAssistantResponseAggregator(LLMAssistantContextAggregator): # Reset the aggregation. Reset it before pushing it down, otherwise # if the tasks gets cancelled we won't be able to clear things up. - self._aggregation = "" + self.reset() frame = LLMMessagesFrame(self._context.messages) await self.push_frame(frame) - - # Reset our accumulator state. - self.reset() diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 782ad1333..971dfe066 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -152,6 +152,7 @@ class BaseInputTransport(FrameProcessor): async def _handle_user_interruption(self, frame: Frame): if isinstance(frame, UserStartedSpeakingFrame): logger.debug("User started speaking") + await self.push_frame(frame) # Make sure we notify about interruptions quickly out-of-band. if self.interruptions_allowed: await self._start_interruption() @@ -161,12 +162,11 @@ class BaseInputTransport(FrameProcessor): await self.push_frame(StartInterruptionFrame()) elif isinstance(frame, UserStoppedSpeakingFrame): logger.debug("User stopped speaking") + await self.push_frame(frame) if self.interruptions_allowed: await self._stop_interruption() await self.push_frame(StopInterruptionFrame()) - await self.push_frame(frame) - # # Audio input # diff --git a/src/pipecat/transports/services/livekit.py b/src/pipecat/transports/services/livekit.py index 149ca4b7c..8ce5c885c 100644 --- a/src/pipecat/transports/services/livekit.py +++ b/src/pipecat/transports/services/livekit.py @@ -599,13 +599,6 @@ class LiveKitTransport(BaseTransport): ) await self._output.send_message(frame) - async def cleanup(self): - if self._input: - await self._input.cleanup() - if self._output: - await self._output.cleanup() - await self._client.disconnect() - async def on_room_event(self, event): # Handle room events pass diff --git a/tests/test_context_aggregators.py b/tests/test_context_aggregators.py index 0c9b6d5e4..185725632 100644 --- a/tests/test_context_aggregators.py +++ b/tests/test_context_aggregators.py @@ -44,8 +44,6 @@ from pipecat.tests.utils import SleepFrame, run_test AGGREGATION_TIMEOUT = 0.1 AGGREGATION_SLEEP = 0.15 -BOT_INTERRUPTION_TIMEOUT = 0.2 -BOT_INTERRUPTION_SLEEP = 0.25 class BaseTestUserContextAggregator: @@ -388,14 +386,13 @@ class BaseTestUserContextAggregator: aggregator = self.AGGREGATOR_CLASS( context, aggregation_timeout=AGGREGATION_TIMEOUT, - bot_interruption_timeout=BOT_INTERRUPTION_TIMEOUT, ) frames_to_send = [ UserStartedSpeakingFrame(), InterimTranscriptionFrame(text="How ", user_id="cat", timestamp=""), SleepFrame(), UserStoppedSpeakingFrame(), - SleepFrame(BOT_INTERRUPTION_SLEEP), + SleepFrame(AGGREGATION_SLEEP), InterimTranscriptionFrame(text="are you?", user_id="cat", timestamp=""), TranscriptionFrame(text="How are you?", user_id="cat", timestamp=""), SleepFrame(sleep=AGGREGATION_SLEEP), @@ -405,12 +402,10 @@ class BaseTestUserContextAggregator: UserStoppedSpeakingFrame, *self.EXPECTED_CONTEXT_FRAMES, ] - expected_up_frames = [EmulateUserStartedSpeakingFrame, EmulateUserStoppedSpeakingFrame] await run_test( aggregator, frames_to_send=frames_to_send, expected_down_frames=expected_down_frames, - expected_up_frames=expected_up_frames, ) self.check_message_content(context, 0, "How are you?")