diff --git a/examples/foundational/17-detect-user-idle.py b/examples/foundational/17-detect-user-idle.py index 903f35f4e..66fcfb200 100644 --- a/examples/foundational/17-detect-user-idle.py +++ b/examples/foundational/17-detect-user-idle.py @@ -70,7 +70,7 @@ async def main(): async def user_idle_callback(user_idle: UserIdleProcessor): messages.append( {"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."}) - await user_idle.queue_frame(LLMMessagesFrame(messages)) + await user_idle.push_frame(LLMMessagesFrame(messages)) user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0) diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index ab0552578..379394120 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import sys from typing import List from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame, OpenAILLMContext diff --git a/src/pipecat/processors/async_frame_processor.py b/src/pipecat/processors/async_frame_processor.py deleted file mode 100644 index 28a27d255..000000000 --- a/src/pipecat/processors/async_frame_processor.py +++ /dev/null @@ -1,64 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio - -from pipecat.frames.frames import EndFrame, Frame, StartInterruptionFrame -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor - - -class AsyncFrameProcessor(FrameProcessor): - - def __init__( - self, - *, - name: str | None = None, - loop: asyncio.AbstractEventLoop | None = None, - **kwargs): - super().__init__(name=name, loop=loop, **kwargs) - - self._create_push_task() - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, StartInterruptionFrame): - await self._handle_interruptions(frame) - - async def queue_frame( - self, - frame: Frame, - direction: FrameDirection = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def cleanup(self): - self._push_frame_task.cancel() - await self._push_frame_task - - async def _handle_interruptions(self, frame: Frame): - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task). - await self.push_frame(frame) - # Create a new queue and task. - self._create_push_task() - - def _create_push_task(self): - self._push_queue = asyncio.Queue() - self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler()) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index dfdee7d40..72924776c 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -11,12 +11,14 @@ from enum import Enum from pipecat.clocks.base_clock import BaseClock from pipecat.frames.frames import ( + EndFrame, ErrorFrame, Frame, MetricsFrame, StartFrame, StartInterruptionFrame, - UserStoppedSpeakingFrame) + StopInterruptionFrame, + SystemFrame) from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -88,6 +90,7 @@ class FrameProcessor: self, *, name: str | None = None, + sync: bool = True, loop: asyncio.AbstractEventLoop | None = None, **kwargs): self.id: int = obj_id() @@ -96,6 +99,7 @@ class FrameProcessor: self._prev: "FrameProcessor" | None = None self._next: "FrameProcessor" | None = None self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop() + self._sync = sync # Clock self._clock: BaseClock | None = None @@ -109,6 +113,14 @@ class FrameProcessor: # Metrics self._metrics = FrameProcessorMetrics(name=self.name) + # Every processor in Pipecat should only output frames from a single + # task. This avoid problems like audio overlapping. System frames are + # the exception to this rule. + # + # This create this task. + if not self._sync: + self.__create_push_task() + @property def interruptions_allowed(self): return self._allow_interruptions @@ -192,14 +204,38 @@ class FrameProcessor: self._enable_usage_metrics = frame.enable_usage_metrics self._report_only_initial_ttfb = frame.report_only_initial_ttfb elif isinstance(frame, StartInterruptionFrame): + await self._start_interruption() await self.stop_all_metrics() - elif isinstance(frame, UserStoppedSpeakingFrame): + elif isinstance(frame, StopInterruptionFrame): self._should_report_ttfb = True async def push_error(self, error: ErrorFrame): await self.push_frame(error, FrameDirection.UPSTREAM) async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): + if self._sync or isinstance(frame, SystemFrame): + await self.__internal_push_frame(frame, direction) + else: + await self.__push_queue.put((frame, direction)) + + # + # Handle interruptions + # + + async def _start_interruption(self): + if not self._sync: + # Cancel the task. This will stop pushing frames downstream. + self.__push_frame_task.cancel() + await self.__push_frame_task + + # Create a new queue and task. + self.__create_push_task() + + async def _stop_interruption(self): + # Nothing to do right now. + pass + + async def __internal_push_frame(self, frame: Frame, direction: FrameDirection): try: if direction == FrameDirection.DOWNSTREAM and self._next: logger.trace(f"Pushing {frame} from {self} to {self._next}") @@ -210,5 +246,20 @@ class FrameProcessor: except Exception as e: logger.exception(f"Uncaught exception in {self}: {e}") + def __create_push_task(self): + self.__push_queue = asyncio.Queue() + self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler()) + + async def __push_frame_task_handler(self): + running = True + while running: + try: + (frame, direction) = await self.__push_queue.get() + await self.__internal_push_frame(frame, direction) + running = not isinstance(frame, EndFrame) + self.__push_queue.task_done() + except asyncio.CancelledError: + break + def __str__(self): return self.name diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index dd28e7252..d32f0f640 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -272,8 +272,9 @@ class RTVIProcessor(FrameProcessor): def __init__(self, *, config: RTVIConfig = RTVIConfig(config=[]), - params: RTVIProcessorParams = RTVIProcessorParams()): - super().__init__() + params: RTVIProcessorParams = RTVIProcessorParams(), + **kwargs): + super().__init__(sync=False, **kwargs) self._config = config self._params = params @@ -286,9 +287,6 @@ class RTVIProcessor(FrameProcessor): self._registered_actions: Dict[str, RTVIAction] = {} self._registered_services: Dict[str, RTVIService] = {} - self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler()) - self._push_queue = asyncio.Queue() - self._message_task = self.get_event_loop().create_task(self._message_task_handler()) self._message_queue = asyncio.Queue() @@ -335,12 +333,6 @@ class RTVIProcessor(FrameProcessor): message = RTVILLMFunctionCallStartMessage(data=fn) await self._push_transport_message(message, exclude_none=False) - async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): - if isinstance(frame, SystemFrame): - await super().push_frame(frame, direction) - else: - await self._internal_push_frame(frame, direction) - async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -394,30 +386,10 @@ class RTVIProcessor(FrameProcessor): # processing EndFrames. self._message_task.cancel() await self._message_task - await self._push_frame_task async def _cancel(self, frame: CancelFrame): self._message_task.cancel() await self._message_task - self._push_frame_task.cancel() - await self._push_frame_task - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await super().push_frame(frame, direction) - self._push_queue.task_done() - running = not isinstance(frame, EndFrame) - except asyncio.CancelledError: - break async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True): frame = TransportMessageFrame( diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py index eef0d56cc..5f0ee089b 100644 --- a/src/pipecat/processors/gstreamer/pipeline_source.py +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -41,7 +41,7 @@ class GStreamerPipelineSource(FrameProcessor): clock_sync: bool = True def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._out_params = out_params @@ -62,10 +62,6 @@ class GStreamerPipelineSource(FrameProcessor): bus.add_signal_watch() bus.connect("message", self._on_gstreamer_message) - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -80,60 +76,28 @@ class GStreamerPipelineSource(FrameProcessor): elif isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be # processed by every processor before any other frame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self._start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self._stop(frame) # Other frames else: - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) async def _start(self, frame: StartFrame): self._player.set_state(Gst.State.PLAYING) async def _stop(self, frame: EndFrame): self._player.set_state(Gst.State.NULL) - # Wait for the push frame task to finish. It will finish when the - # EndFrame is actually processed. - await self._push_frame_task async def _cancel(self, frame: CancelFrame): self._player.set_state(Gst.State.NULL) - # Cancel all the tasks and wait for them to finish. - self._push_frame_task.cancel() - await self._push_frame_task # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break - - # - # GStreaner + # GStreamer # def _on_gstreamer_message(self, bus: Gst.Bus, message: Gst.Message): @@ -221,7 +185,7 @@ class GStreamerPipelineSource(FrameProcessor): frame = AudioRawFrame(audio=info.data, sample_rate=self._out_params.audio_sample_rate, num_channels=self._out_params.audio_channels) - asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK @@ -232,6 +196,6 @@ class GStreamerPipelineSource(FrameProcessor): image=info.data, size=(self._out_params.video_width, self._out_params.video_height), format="RGB") - asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK diff --git a/src/pipecat/processors/idle_frame_processor.py b/src/pipecat/processors/idle_frame_processor.py index 40304a5c6..42b81517e 100644 --- a/src/pipecat/processors/idle_frame_processor.py +++ b/src/pipecat/processors/idle_frame_processor.py @@ -8,19 +8,14 @@ import asyncio from typing import Awaitable, Callable, List -from pipecat.frames.frames import Frame, SystemFrame -from pipecat.processors.async_frame_processor import AsyncFrameProcessor -from pipecat.processors.frame_processor import FrameDirection +from pipecat.frames.frames import Frame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -class IdleFrameProcessor(AsyncFrameProcessor): +class IdleFrameProcessor(FrameProcessor): """This class waits to receive any frame or list of desired frames within a given timeout. If the timeout is reached before receiving any of those frames the provided callback will be called. - - The callback can then be used to push frames downstream by using - `queue_frame()` (or `push_frame()` for system frames). - """ def __init__( @@ -30,7 +25,7 @@ class IdleFrameProcessor(AsyncFrameProcessor): timeout: float, types: List[type] = [], **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._callback = callback self._timeout = timeout @@ -41,10 +36,7 @@ class IdleFrameProcessor(AsyncFrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - else: - await self.queue_frame(frame, direction) + await self.push_frame(frame, direction) # If we are not waiting for any specific frame set the event, otherwise # check if we have received one of the desired frames. @@ -55,7 +47,6 @@ class IdleFrameProcessor(AsyncFrameProcessor): if isinstance(frame, t): self._idle_event.set() - # If we are not waiting for any specific frame set the event, otherwise async def cleanup(self): self._idle_task.cancel() await self._idle_task diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 7deda2555..36c394a5d 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -11,21 +11,16 @@ from typing import Awaitable, Callable from pipecat.frames.frames import ( BotSpeakingFrame, Frame, - SystemFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame) -from pipecat.processors.async_frame_processor import AsyncFrameProcessor -from pipecat.processors.frame_processor import FrameDirection +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -class UserIdleProcessor(AsyncFrameProcessor): +class UserIdleProcessor(FrameProcessor): """This class is useful to check if the user is interacting with the bot within a given timeout. If the timeout is reached before any interaction occurred the provided callback will be called. - The callback can then be used to push frames downstream by using - `queue_frame()` (or `push_frame()` for system frames). - """ def __init__( @@ -34,7 +29,7 @@ class UserIdleProcessor(AsyncFrameProcessor): callback: Callable[["UserIdleProcessor"], Awaitable[None]], timeout: float, **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._callback = callback self._timeout = timeout @@ -46,10 +41,7 @@ class UserIdleProcessor(AsyncFrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - else: - await self.queue_frame(frame, direction) + await self.push_frame(frame, direction) # We shouldn't call the idle callback if the user or the bot are speaking. if isinstance(frame, UserStartedSpeakingFrame): diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index 7291e7db9..d7226dba3 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -32,7 +32,6 @@ from pipecat.frames.frames import ( UserImageRequestFrame, VisionImageRawFrame ) -from pipecat.processors.async_frame_processor import AsyncFrameProcessor from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transcriptions.language import Language from pipecat.utils.audio import calculate_audio_volume @@ -67,7 +66,7 @@ class AIService(FrameProcessor): elif isinstance(frame, EndFrame): await self.stop(frame) - async def process_generator(self, generator: AsyncGenerator[Frame, None]): + async def process_generator(self, generator: AsyncGenerator[Frame | None, None]): async for f in generator: if f: if isinstance(f, ErrorFrame): @@ -76,30 +75,6 @@ class AIService(FrameProcessor): await self.push_frame(f) -class AsyncAIService(AsyncFrameProcessor): - def __init__(self, **kwargs): - super().__init__(**kwargs) - - async def start(self, frame: StartFrame): - pass - - async def stop(self, frame: EndFrame): - pass - - async def cancel(self, frame: CancelFrame): - pass - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, StartFrame): - await self.start(frame) - elif isinstance(frame, CancelFrame): - await self.cancel(frame) - elif isinstance(frame, EndFrame): - await self.stop(frame) - - class LLMService(AIService): """This class is a no-op but serves as a base class for LLM services.""" diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index c2f984b75..90674fcc4 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -18,13 +18,11 @@ from pipecat.frames.frames import ( ErrorFrame, Frame, StartFrame, - SystemFrame, TTSStartedFrame, TTSStoppedFrame, TranscriptionFrame, URLImageRawFrame) -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import AsyncAIService, TTSService, ImageGenService +from pipecat.services.ai_services import STTService, TTSService, ImageGenService from pipecat.services.openai import BaseOpenAILLMService from pipecat.utils.time import time_now_iso8601 @@ -126,7 +124,7 @@ class AzureTTSService(TTSService): logger.error(f"{self} error: {cancellation_details.error_details}") -class AzureSTTService(AsyncAIService): +class AzureSTTService(STTService): def __init__( self, *, @@ -149,15 +147,11 @@ class AzureSTTService(AsyncAIService): speech_config=speech_config, audio_config=audio_config) self._speech_recognizer.recognized.connect(self._on_handle_recognized) - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - elif isinstance(frame, AudioRawFrame): - self._audio_stream.write(frame.audio) - else: - await self._push_queue.put((frame, direction)) + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + await self.start_processing_metrics() + self._audio_stream.write(audio) + await self.stop_processing_metrics() + yield None async def start(self, frame: StartFrame): await super().start(frame) @@ -176,7 +170,7 @@ class AzureSTTService(AsyncAIService): def _on_handle_recognized(self, event): if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0: frame = TranscriptionFrame(event.result.text, "", time_now_iso8601()) - asyncio.run_coroutine_threadsafe(self.queue_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) class AzureImageGenServiceREST(ImageGenService): diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index d899d4bdb..708c3c511 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -161,8 +161,8 @@ class DeepgramSTTService(STTService): async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: await self.start_processing_metrics() await self._connection.send(audio) - yield None await self.stop_processing_metrics() + yield None async def _connect(self): if await self._connection.start(self._live_options): diff --git a/src/pipecat/services/gladia.py b/src/pipecat/services/gladia.py index 886300897..ead8f63dc 100644 --- a/src/pipecat/services/gladia.py +++ b/src/pipecat/services/gladia.py @@ -7,20 +7,17 @@ import base64 import json -from typing import Optional +from typing import AsyncGenerator, Optional from pydantic.main import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, InterimTranscriptionFrame, StartFrame, - SystemFrame, TranscriptionFrame) -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import AsyncAIService +from pipecat.services.ai_services import STTService from pipecat.utils.time import time_now_iso8601 from loguru import logger @@ -35,7 +32,7 @@ except ModuleNotFoundError as e: raise Exception(f"Missing module: {e}") -class GladiaSTTService(AsyncAIService): +class GladiaSTTService(STTService): class InputParams(BaseModel): sample_rate: Optional[int] = 16000 language: Optional[str] = "english" @@ -50,23 +47,13 @@ class GladiaSTTService(AsyncAIService): confidence: float = 0.5, params: InputParams = InputParams(), **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._api_key = api_key self._url = url self._params = params self._confidence = confidence - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - elif isinstance(frame, AudioRawFrame): - await self._send_audio(frame) - else: - await self.queue_frame(frame, direction) - async def start(self, frame: StartFrame): await super().start(frame) self._websocket = await websockets.connect(self._url) @@ -81,6 +68,12 @@ class GladiaSTTService(AsyncAIService): await super().cancel(frame) await self._websocket.close() + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + await self.start_processing_metrics() + await self._send_audio(audio) + await self.stop_processing_metrics() + yield None + async def _setup_gladia(self): configuration = { "x_gladia_key": self._api_key, @@ -92,9 +85,9 @@ class GladiaSTTService(AsyncAIService): await self._websocket.send(json.dumps(configuration)) - async def _send_audio(self, frame: AudioRawFrame): + async def _send_audio(self, audio: bytes): message = { - 'frames': base64.b64encode(frame.audio).decode("utf-8") + 'frames': base64.b64encode(audio).decode("utf-8") } await self._websocket.send(json.dumps(message)) @@ -113,6 +106,6 @@ class GladiaSTTService(AsyncAIService): transcript = utterance["transcription"] if confidence >= self._confidence: if type == "final": - await self.queue_frame(TranscriptionFrame(transcript, "", time_now_iso8601())) + await self.push_frame(TranscriptionFrame(transcript, "", time_now_iso8601())) else: - await self.queue_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601())) + await self.push_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601())) diff --git a/src/pipecat/services/lmnt.py b/src/pipecat/services/lmnt.py index 638e394a1..60f0cb7df 100644 --- a/src/pipecat/services/lmnt.py +++ b/src/pipecat/services/lmnt.py @@ -46,7 +46,7 @@ class LmntTTSService(AsyncTTSService): **kwargs): # Let TTSService produce TTSStoppedFrames after a short delay of # no activity. - super().__init__(push_stop_frames=True, sample_rate=sample_rate, **kwargs) + super().__init__(sync=False, push_stop_frames=True, sample_rate=sample_rate, **kwargs) self._api_key = api_key self._voice_id = voice_id diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 3d1d0c4d7..8836fbd1e 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -31,16 +31,12 @@ from loguru import logger class BaseInputTransport(FrameProcessor): def __init__(self, params: TransportParams, **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._params = params self._executor = ThreadPoolExecutor(max_workers=5) - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def start(self, frame: StartFrame): # Create audio input queue and task if needed. if self._params.audio_in_enabled or self._params.vad_enabled: @@ -53,10 +49,6 @@ class BaseInputTransport(FrameProcessor): self._audio_task.cancel() await self._audio_task - # Wait for the push frame task to finish. It will finish when the - # EndFrame is actually processed. - await self._push_frame_task - async def cancel(self, frame: CancelFrame): # Cancel all the tasks and wait for them to finish. @@ -64,9 +56,6 @@ class BaseInputTransport(FrameProcessor): self._audio_task.cancel() await self._audio_task - self._push_frame_task.cancel() - await self._push_frame_task - def vad_analyzer(self) -> VADAnalyzer | None: return self._params.vad_analyzer @@ -86,11 +75,8 @@ class BaseInputTransport(FrameProcessor): await self.cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, BotInterruptionFrame): - await self._handle_interruptions(frame, False) - elif isinstance(frame, StartInterruptionFrame): + logger.debug("Bot interruption") await self._start_interruption() - elif isinstance(frame, StopInterruptionFrame): - await self._stop_interruption() # All other system frames elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) @@ -98,12 +84,12 @@ class BaseInputTransport(FrameProcessor): elif isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be # processed by every processor before any other frame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self.start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self.stop(frame) elif isinstance(frame, VADParamsUpdateFrame): vad_analyzer = self.vad_analyzer() @@ -111,73 +97,28 @@ class BaseInputTransport(FrameProcessor): vad_analyzer.set_params(frame.params) # Other frames else: - await self._internal_push_frame(frame, direction) - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break + await self.push_frame(frame, direction) # # Handle interruptions # - async def _start_interruption(self): - if not self.interruptions_allowed: - return - - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task) to stop everything, specially at the output - # transport. - await self.push_frame(StartInterruptionFrame()) - # Create a new queue and task. - self._create_push_task() - - async def _stop_interruption(self): - if not self.interruptions_allowed: - return - - await self.push_frame(StopInterruptionFrame()) - - async def _handle_interruptions(self, frame: Frame, push_frame: bool): + async def _handle_interruptions(self, frame: Frame): if self.interruptions_allowed: - # Make sure we notify about interruptions quickly out-of-band - if isinstance(frame, BotInterruptionFrame): - logger.debug("Bot interruption") - await self._start_interruption() - elif isinstance(frame, UserStartedSpeakingFrame): + # Make sure we notify about interruptions quickly out-of-band. + if isinstance(frame, UserStartedSpeakingFrame): logger.debug("User started speaking") await self._start_interruption() + # Push an out-of-band frame (i.e. not using the ordered push + # frame task) to stop everything, specially at the output + # transport. + await self.push_frame(StartInterruptionFrame()) elif isinstance(frame, UserStoppedSpeakingFrame): logger.debug("User stopped speaking") await self._stop_interruption() + await self.push_frame(StopInterruptionFrame()) - if push_frame: - await self._internal_push_frame(frame) + await self.push_frame(frame) # # Audio input @@ -201,7 +142,7 @@ class BaseInputTransport(FrameProcessor): frame = UserStoppedSpeakingFrame() if frame: - await self._handle_interruptions(frame, True) + await self._handle_interruptions(frame) vad_state = new_vad_state return vad_state @@ -222,7 +163,7 @@ class BaseInputTransport(FrameProcessor): # Push audio downstream if passthrough. if audio_passthrough: - await self._internal_push_frame(frame) + await self.push_frame(frame) self._audio_in_queue.task_done() except asyncio.CancelledError: diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index a24c9f4d2..bc683721a 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -43,7 +43,7 @@ from pipecat.utils.time import nanoseconds_to_seconds class BaseOutputTransport(FrameProcessor): def __init__(self, params: TransportParams, **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._params = params @@ -70,10 +70,6 @@ class BaseOutputTransport(FrameProcessor): # generating frames upstream while, for example, the audio is playing. self._create_sink_tasks() - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def start(self, frame: StartFrame): # Create camera output queue and task if needed. if self._params.camera_out_enabled: @@ -95,9 +91,8 @@ class BaseOutputTransport(FrameProcessor): self._audio_out_task.cancel() await self._audio_out_task - # Wait for the push frame and sink tasks to finish. They will finish when - # the EndFrame is actually processed. - await self._push_frame_task + # Wait for the sink task to finish. They will finish when the EndFrame + # is actually processed. await self._sink_task async def cancel(self, frame: CancelFrame): @@ -107,9 +102,6 @@ class BaseOutputTransport(FrameProcessor): self._camera_out_task.cancel() await self._camera_out_task - self._push_frame_task.cancel() - await self._push_frame_task - self._sink_task.cancel() await self._sink_task @@ -182,10 +174,6 @@ class BaseOutputTransport(FrameProcessor): await self._sink_clock_task # Create sink tasks. self._create_sink_tasks() - # Stop push task. - self._push_frame_task.cancel() - await self._push_frame_task - self._create_push_task() # Let's send a bot stopped speaking if we have to. if self._bot_speaking: await self._bot_stopped_speaking() @@ -227,7 +215,7 @@ class BaseOutputTransport(FrameProcessor): async def _sink_frame_handler(self, frame: Frame): if isinstance(frame, AudioRawFrame): await self.write_raw_audio_frames(frame.audio) - await self._internal_push_frame(frame) + await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) elif isinstance(frame, ImageRawFrame): await self._set_camera_image(frame) @@ -237,12 +225,12 @@ class BaseOutputTransport(FrameProcessor): await self.send_message(frame) elif isinstance(frame, TTSStartedFrame): await self._bot_started_speaking() - await self._internal_push_frame(frame) + await self.push_frame(frame) elif isinstance(frame, TTSStoppedFrame): await self._bot_stopped_speaking() - await self._internal_push_frame(frame) + await self.push_frame(frame) else: - await self._internal_push_frame(frame) + await self.push_frame(frame) async def _sink_task_handler(self): running = True @@ -261,7 +249,7 @@ class BaseOutputTransport(FrameProcessor): # TODO(aleix): For now we just process TextFrame. But we should process # audio and video as well. if isinstance(frame, TextFrame): - await self._internal_push_frame(frame) + await self.push_frame(frame) async def _sink_clock_task_handler(self): running = True @@ -293,38 +281,12 @@ class BaseOutputTransport(FrameProcessor): async def _bot_started_speaking(self): logger.debug("Bot started speaking") self._bot_speaking = True - await self._internal_push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM) + await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM) async def _bot_stopped_speaking(self): logger.debug("Bot stopped speaking") self._bot_speaking = False - await self._internal_push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM) - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break + await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM) # # Camera out @@ -408,7 +370,7 @@ class BaseOutputTransport(FrameProcessor): try: frame = await self._audio_out_queue.get() await self.write_raw_audio_frames(frame.audio) - await self._internal_push_frame(frame) + await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) except asyncio.CancelledError: break diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index 08ca9719a..819950d72 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -98,9 +98,9 @@ class WebsocketServerInputTransport(BaseInputTransport): continue if isinstance(frame, AudioRawFrame): - await self.push_audio_frame(frame) + await self.queue_audio_frame(frame) else: - await self._internal_push_frame(frame) + await self.push_frame(frame) # Notify disconnection await self._callbacks.on_client_disconnected(websocket) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 7cf330b9e..210cc7341 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -625,11 +625,11 @@ class DailyInputTransport(BaseInputTransport): # async def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame): - await self._internal_push_frame(frame) + await self.push_frame(frame) async def push_app_message(self, message: Any, sender: str): frame = DailyTransportMessageFrame(message=message, participant_id=sender) - await self._internal_push_frame(frame) + await self.push_frame(frame) # # Audio in @@ -692,7 +692,7 @@ class DailyInputTransport(BaseInputTransport): image=buffer, size=size, format=format) - await self._internal_push_frame(frame) + await self.push_frame(frame) self._video_renderers[participant_id]["timestamp"] = curr_time