From 337f048864d154dc3a70e4ae38eb3fc54f69abbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 30 Aug 2024 16:24:04 -0700 Subject: [PATCH] introduce synchronous and asynchronous frame processors Pipecat has a pipeline-based architecture. The pipeline consists of frame processors linked to each other. The elements travelling across the pipeline are called frames. To have a deterministic behavior the frames travelling through the pipeline should always be ordered, except system frames which are out-of-band frames. To achieve that, each frame processor should only output frames from a single task. There are synchronous and asynchronous frame processors. The synchronous processors push output frames from the same task that they receive input frames, and therefore only pushing frames from one task. Asynchrnous frame processors can have internal tasks to perform things asynchrnously (e.g. receiving data from a websocket) but they also have a single task where they push frames from. --- examples/foundational/17-detect-user-idle.py | 2 +- .../processors/aggregators/llm_response.py | 1 - .../processors/async_frame_processor.py | 64 ------------- src/pipecat/processors/frame_processor.py | 55 ++++++++++- src/pipecat/processors/frameworks/rtvi.py | 34 +------ .../processors/gstreamer/pipeline_source.py | 50 ++-------- .../processors/idle_frame_processor.py | 19 +--- src/pipecat/processors/user_idle_processor.py | 16 +--- src/pipecat/services/ai_services.py | 27 +----- src/pipecat/services/azure.py | 22 ++--- src/pipecat/services/deepgram.py | 2 +- src/pipecat/services/gladia.py | 35 +++---- src/pipecat/services/lmnt.py | 2 +- src/pipecat/transports/base_input.py | 91 ++++--------------- src/pipecat/transports/base_output.py | 60 +++--------- .../transports/network/websocket_server.py | 4 +- src/pipecat/transports/services/daily.py | 6 +- 17 files changed, 130 insertions(+), 360 deletions(-) delete mode 100644 src/pipecat/processors/async_frame_processor.py diff --git a/examples/foundational/17-detect-user-idle.py b/examples/foundational/17-detect-user-idle.py index 903f35f4e..66fcfb200 100644 --- a/examples/foundational/17-detect-user-idle.py +++ b/examples/foundational/17-detect-user-idle.py @@ -70,7 +70,7 @@ async def main(): async def user_idle_callback(user_idle: UserIdleProcessor): messages.append( {"role": "system", "content": "Ask the user if they are still there and try to prompt for some input, but be short."}) - await user_idle.queue_frame(LLMMessagesFrame(messages)) + await user_idle.push_frame(LLMMessagesFrame(messages)) user_idle = UserIdleProcessor(callback=user_idle_callback, timeout=5.0) diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index ab0552578..379394120 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -4,7 +4,6 @@ # SPDX-License-Identifier: BSD 2-Clause License # -import sys from typing import List from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame, OpenAILLMContext diff --git a/src/pipecat/processors/async_frame_processor.py b/src/pipecat/processors/async_frame_processor.py deleted file mode 100644 index 28a27d255..000000000 --- a/src/pipecat/processors/async_frame_processor.py +++ /dev/null @@ -1,64 +0,0 @@ -# -# Copyright (c) 2024, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio - -from pipecat.frames.frames import EndFrame, Frame, StartInterruptionFrame -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor - - -class AsyncFrameProcessor(FrameProcessor): - - def __init__( - self, - *, - name: str | None = None, - loop: asyncio.AbstractEventLoop | None = None, - **kwargs): - super().__init__(name=name, loop=loop, **kwargs) - - self._create_push_task() - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, StartInterruptionFrame): - await self._handle_interruptions(frame) - - async def queue_frame( - self, - frame: Frame, - direction: FrameDirection = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def cleanup(self): - self._push_frame_task.cancel() - await self._push_frame_task - - async def _handle_interruptions(self, frame: Frame): - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task). - await self.push_frame(frame) - # Create a new queue and task. - self._create_push_task() - - def _create_push_task(self): - self._push_queue = asyncio.Queue() - self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler()) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index dfdee7d40..72924776c 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -11,12 +11,14 @@ from enum import Enum from pipecat.clocks.base_clock import BaseClock from pipecat.frames.frames import ( + EndFrame, ErrorFrame, Frame, MetricsFrame, StartFrame, StartInterruptionFrame, - UserStoppedSpeakingFrame) + StopInterruptionFrame, + SystemFrame) from pipecat.utils.utils import obj_count, obj_id from loguru import logger @@ -88,6 +90,7 @@ class FrameProcessor: self, *, name: str | None = None, + sync: bool = True, loop: asyncio.AbstractEventLoop | None = None, **kwargs): self.id: int = obj_id() @@ -96,6 +99,7 @@ class FrameProcessor: self._prev: "FrameProcessor" | None = None self._next: "FrameProcessor" | None = None self._loop: asyncio.AbstractEventLoop = loop or asyncio.get_running_loop() + self._sync = sync # Clock self._clock: BaseClock | None = None @@ -109,6 +113,14 @@ class FrameProcessor: # Metrics self._metrics = FrameProcessorMetrics(name=self.name) + # Every processor in Pipecat should only output frames from a single + # task. This avoid problems like audio overlapping. System frames are + # the exception to this rule. + # + # This create this task. + if not self._sync: + self.__create_push_task() + @property def interruptions_allowed(self): return self._allow_interruptions @@ -192,14 +204,38 @@ class FrameProcessor: self._enable_usage_metrics = frame.enable_usage_metrics self._report_only_initial_ttfb = frame.report_only_initial_ttfb elif isinstance(frame, StartInterruptionFrame): + await self._start_interruption() await self.stop_all_metrics() - elif isinstance(frame, UserStoppedSpeakingFrame): + elif isinstance(frame, StopInterruptionFrame): self._should_report_ttfb = True async def push_error(self, error: ErrorFrame): await self.push_frame(error, FrameDirection.UPSTREAM) async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): + if self._sync or isinstance(frame, SystemFrame): + await self.__internal_push_frame(frame, direction) + else: + await self.__push_queue.put((frame, direction)) + + # + # Handle interruptions + # + + async def _start_interruption(self): + if not self._sync: + # Cancel the task. This will stop pushing frames downstream. + self.__push_frame_task.cancel() + await self.__push_frame_task + + # Create a new queue and task. + self.__create_push_task() + + async def _stop_interruption(self): + # Nothing to do right now. + pass + + async def __internal_push_frame(self, frame: Frame, direction: FrameDirection): try: if direction == FrameDirection.DOWNSTREAM and self._next: logger.trace(f"Pushing {frame} from {self} to {self._next}") @@ -210,5 +246,20 @@ class FrameProcessor: except Exception as e: logger.exception(f"Uncaught exception in {self}: {e}") + def __create_push_task(self): + self.__push_queue = asyncio.Queue() + self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler()) + + async def __push_frame_task_handler(self): + running = True + while running: + try: + (frame, direction) = await self.__push_queue.get() + await self.__internal_push_frame(frame, direction) + running = not isinstance(frame, EndFrame) + self.__push_queue.task_done() + except asyncio.CancelledError: + break + def __str__(self): return self.name diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index dd28e7252..d32f0f640 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -272,8 +272,9 @@ class RTVIProcessor(FrameProcessor): def __init__(self, *, config: RTVIConfig = RTVIConfig(config=[]), - params: RTVIProcessorParams = RTVIProcessorParams()): - super().__init__() + params: RTVIProcessorParams = RTVIProcessorParams(), + **kwargs): + super().__init__(sync=False, **kwargs) self._config = config self._params = params @@ -286,9 +287,6 @@ class RTVIProcessor(FrameProcessor): self._registered_actions: Dict[str, RTVIAction] = {} self._registered_services: Dict[str, RTVIService] = {} - self._push_frame_task = self.get_event_loop().create_task(self._push_frame_task_handler()) - self._push_queue = asyncio.Queue() - self._message_task = self.get_event_loop().create_task(self._message_task_handler()) self._message_queue = asyncio.Queue() @@ -335,12 +333,6 @@ class RTVIProcessor(FrameProcessor): message = RTVILLMFunctionCallStartMessage(data=fn) await self._push_transport_message(message, exclude_none=False) - async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): - if isinstance(frame, SystemFrame): - await super().push_frame(frame, direction) - else: - await self._internal_push_frame(frame, direction) - async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -394,30 +386,10 @@ class RTVIProcessor(FrameProcessor): # processing EndFrames. self._message_task.cancel() await self._message_task - await self._push_frame_task async def _cancel(self, frame: CancelFrame): self._message_task.cancel() await self._message_task - self._push_frame_task.cancel() - await self._push_frame_task - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await super().push_frame(frame, direction) - self._push_queue.task_done() - running = not isinstance(frame, EndFrame) - except asyncio.CancelledError: - break async def _push_transport_message(self, model: BaseModel, exclude_none: bool = True): frame = TransportMessageFrame( diff --git a/src/pipecat/processors/gstreamer/pipeline_source.py b/src/pipecat/processors/gstreamer/pipeline_source.py index eef0d56cc..5f0ee089b 100644 --- a/src/pipecat/processors/gstreamer/pipeline_source.py +++ b/src/pipecat/processors/gstreamer/pipeline_source.py @@ -41,7 +41,7 @@ class GStreamerPipelineSource(FrameProcessor): clock_sync: bool = True def __init__(self, *, pipeline: str, out_params: OutputParams = OutputParams(), **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._out_params = out_params @@ -62,10 +62,6 @@ class GStreamerPipelineSource(FrameProcessor): bus.add_signal_watch() bus.connect("message", self._on_gstreamer_message) - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) @@ -80,60 +76,28 @@ class GStreamerPipelineSource(FrameProcessor): elif isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be # processed by every processor before any other frame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self._start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self._stop(frame) # Other frames else: - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) async def _start(self, frame: StartFrame): self._player.set_state(Gst.State.PLAYING) async def _stop(self, frame: EndFrame): self._player.set_state(Gst.State.NULL) - # Wait for the push frame task to finish. It will finish when the - # EndFrame is actually processed. - await self._push_frame_task async def _cancel(self, frame: CancelFrame): self._player.set_state(Gst.State.NULL) - # Cancel all the tasks and wait for them to finish. - self._push_frame_task.cancel() - await self._push_frame_task # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break - - # - # GStreaner + # GStreamer # def _on_gstreamer_message(self, bus: Gst.Bus, message: Gst.Message): @@ -221,7 +185,7 @@ class GStreamerPipelineSource(FrameProcessor): frame = AudioRawFrame(audio=info.data, sample_rate=self._out_params.audio_sample_rate, num_channels=self._out_params.audio_channels) - asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK @@ -232,6 +196,6 @@ class GStreamerPipelineSource(FrameProcessor): image=info.data, size=(self._out_params.video_width, self._out_params.video_height), format="RGB") - asyncio.run_coroutine_threadsafe(self._internal_push_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) buffer.unmap(info) return Gst.FlowReturn.OK diff --git a/src/pipecat/processors/idle_frame_processor.py b/src/pipecat/processors/idle_frame_processor.py index 40304a5c6..42b81517e 100644 --- a/src/pipecat/processors/idle_frame_processor.py +++ b/src/pipecat/processors/idle_frame_processor.py @@ -8,19 +8,14 @@ import asyncio from typing import Awaitable, Callable, List -from pipecat.frames.frames import Frame, SystemFrame -from pipecat.processors.async_frame_processor import AsyncFrameProcessor -from pipecat.processors.frame_processor import FrameDirection +from pipecat.frames.frames import Frame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -class IdleFrameProcessor(AsyncFrameProcessor): +class IdleFrameProcessor(FrameProcessor): """This class waits to receive any frame or list of desired frames within a given timeout. If the timeout is reached before receiving any of those frames the provided callback will be called. - - The callback can then be used to push frames downstream by using - `queue_frame()` (or `push_frame()` for system frames). - """ def __init__( @@ -30,7 +25,7 @@ class IdleFrameProcessor(AsyncFrameProcessor): timeout: float, types: List[type] = [], **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._callback = callback self._timeout = timeout @@ -41,10 +36,7 @@ class IdleFrameProcessor(AsyncFrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - else: - await self.queue_frame(frame, direction) + await self.push_frame(frame, direction) # If we are not waiting for any specific frame set the event, otherwise # check if we have received one of the desired frames. @@ -55,7 +47,6 @@ class IdleFrameProcessor(AsyncFrameProcessor): if isinstance(frame, t): self._idle_event.set() - # If we are not waiting for any specific frame set the event, otherwise async def cleanup(self): self._idle_task.cancel() await self._idle_task diff --git a/src/pipecat/processors/user_idle_processor.py b/src/pipecat/processors/user_idle_processor.py index 7deda2555..36c394a5d 100644 --- a/src/pipecat/processors/user_idle_processor.py +++ b/src/pipecat/processors/user_idle_processor.py @@ -11,21 +11,16 @@ from typing import Awaitable, Callable from pipecat.frames.frames import ( BotSpeakingFrame, Frame, - SystemFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame) -from pipecat.processors.async_frame_processor import AsyncFrameProcessor -from pipecat.processors.frame_processor import FrameDirection +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -class UserIdleProcessor(AsyncFrameProcessor): +class UserIdleProcessor(FrameProcessor): """This class is useful to check if the user is interacting with the bot within a given timeout. If the timeout is reached before any interaction occurred the provided callback will be called. - The callback can then be used to push frames downstream by using - `queue_frame()` (or `push_frame()` for system frames). - """ def __init__( @@ -34,7 +29,7 @@ class UserIdleProcessor(AsyncFrameProcessor): callback: Callable[["UserIdleProcessor"], Awaitable[None]], timeout: float, **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._callback = callback self._timeout = timeout @@ -46,10 +41,7 @@ class UserIdleProcessor(AsyncFrameProcessor): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - else: - await self.queue_frame(frame, direction) + await self.push_frame(frame, direction) # We shouldn't call the idle callback if the user or the bot are speaking. if isinstance(frame, UserStartedSpeakingFrame): diff --git a/src/pipecat/services/ai_services.py b/src/pipecat/services/ai_services.py index 7291e7db9..d7226dba3 100644 --- a/src/pipecat/services/ai_services.py +++ b/src/pipecat/services/ai_services.py @@ -32,7 +32,6 @@ from pipecat.frames.frames import ( UserImageRequestFrame, VisionImageRawFrame ) -from pipecat.processors.async_frame_processor import AsyncFrameProcessor from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.transcriptions.language import Language from pipecat.utils.audio import calculate_audio_volume @@ -67,7 +66,7 @@ class AIService(FrameProcessor): elif isinstance(frame, EndFrame): await self.stop(frame) - async def process_generator(self, generator: AsyncGenerator[Frame, None]): + async def process_generator(self, generator: AsyncGenerator[Frame | None, None]): async for f in generator: if f: if isinstance(f, ErrorFrame): @@ -76,30 +75,6 @@ class AIService(FrameProcessor): await self.push_frame(f) -class AsyncAIService(AsyncFrameProcessor): - def __init__(self, **kwargs): - super().__init__(**kwargs) - - async def start(self, frame: StartFrame): - pass - - async def stop(self, frame: EndFrame): - pass - - async def cancel(self, frame: CancelFrame): - pass - - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, StartFrame): - await self.start(frame) - elif isinstance(frame, CancelFrame): - await self.cancel(frame) - elif isinstance(frame, EndFrame): - await self.stop(frame) - - class LLMService(AIService): """This class is a no-op but serves as a base class for LLM services.""" diff --git a/src/pipecat/services/azure.py b/src/pipecat/services/azure.py index c2f984b75..90674fcc4 100644 --- a/src/pipecat/services/azure.py +++ b/src/pipecat/services/azure.py @@ -18,13 +18,11 @@ from pipecat.frames.frames import ( ErrorFrame, Frame, StartFrame, - SystemFrame, TTSStartedFrame, TTSStoppedFrame, TranscriptionFrame, URLImageRawFrame) -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import AsyncAIService, TTSService, ImageGenService +from pipecat.services.ai_services import STTService, TTSService, ImageGenService from pipecat.services.openai import BaseOpenAILLMService from pipecat.utils.time import time_now_iso8601 @@ -126,7 +124,7 @@ class AzureTTSService(TTSService): logger.error(f"{self} error: {cancellation_details.error_details}") -class AzureSTTService(AsyncAIService): +class AzureSTTService(STTService): def __init__( self, *, @@ -149,15 +147,11 @@ class AzureSTTService(AsyncAIService): speech_config=speech_config, audio_config=audio_config) self._speech_recognizer.recognized.connect(self._on_handle_recognized) - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - elif isinstance(frame, AudioRawFrame): - self._audio_stream.write(frame.audio) - else: - await self._push_queue.put((frame, direction)) + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + await self.start_processing_metrics() + self._audio_stream.write(audio) + await self.stop_processing_metrics() + yield None async def start(self, frame: StartFrame): await super().start(frame) @@ -176,7 +170,7 @@ class AzureSTTService(AsyncAIService): def _on_handle_recognized(self, event): if event.result.reason == ResultReason.RecognizedSpeech and len(event.result.text) > 0: frame = TranscriptionFrame(event.result.text, "", time_now_iso8601()) - asyncio.run_coroutine_threadsafe(self.queue_frame(frame), self.get_event_loop()) + asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop()) class AzureImageGenServiceREST(ImageGenService): diff --git a/src/pipecat/services/deepgram.py b/src/pipecat/services/deepgram.py index d899d4bdb..708c3c511 100644 --- a/src/pipecat/services/deepgram.py +++ b/src/pipecat/services/deepgram.py @@ -161,8 +161,8 @@ class DeepgramSTTService(STTService): async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: await self.start_processing_metrics() await self._connection.send(audio) - yield None await self.stop_processing_metrics() + yield None async def _connect(self): if await self._connection.start(self._live_options): diff --git a/src/pipecat/services/gladia.py b/src/pipecat/services/gladia.py index 886300897..ead8f63dc 100644 --- a/src/pipecat/services/gladia.py +++ b/src/pipecat/services/gladia.py @@ -7,20 +7,17 @@ import base64 import json -from typing import Optional +from typing import AsyncGenerator, Optional from pydantic.main import BaseModel from pipecat.frames.frames import ( - AudioRawFrame, CancelFrame, EndFrame, Frame, InterimTranscriptionFrame, StartFrame, - SystemFrame, TranscriptionFrame) -from pipecat.processors.frame_processor import FrameDirection -from pipecat.services.ai_services import AsyncAIService +from pipecat.services.ai_services import STTService from pipecat.utils.time import time_now_iso8601 from loguru import logger @@ -35,7 +32,7 @@ except ModuleNotFoundError as e: raise Exception(f"Missing module: {e}") -class GladiaSTTService(AsyncAIService): +class GladiaSTTService(STTService): class InputParams(BaseModel): sample_rate: Optional[int] = 16000 language: Optional[str] = "english" @@ -50,23 +47,13 @@ class GladiaSTTService(AsyncAIService): confidence: float = 0.5, params: InputParams = InputParams(), **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._api_key = api_key self._url = url self._params = params self._confidence = confidence - async def process_frame(self, frame: Frame, direction: FrameDirection): - await super().process_frame(frame, direction) - - if isinstance(frame, SystemFrame): - await self.push_frame(frame, direction) - elif isinstance(frame, AudioRawFrame): - await self._send_audio(frame) - else: - await self.queue_frame(frame, direction) - async def start(self, frame: StartFrame): await super().start(frame) self._websocket = await websockets.connect(self._url) @@ -81,6 +68,12 @@ class GladiaSTTService(AsyncAIService): await super().cancel(frame) await self._websocket.close() + async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]: + await self.start_processing_metrics() + await self._send_audio(audio) + await self.stop_processing_metrics() + yield None + async def _setup_gladia(self): configuration = { "x_gladia_key": self._api_key, @@ -92,9 +85,9 @@ class GladiaSTTService(AsyncAIService): await self._websocket.send(json.dumps(configuration)) - async def _send_audio(self, frame: AudioRawFrame): + async def _send_audio(self, audio: bytes): message = { - 'frames': base64.b64encode(frame.audio).decode("utf-8") + 'frames': base64.b64encode(audio).decode("utf-8") } await self._websocket.send(json.dumps(message)) @@ -113,6 +106,6 @@ class GladiaSTTService(AsyncAIService): transcript = utterance["transcription"] if confidence >= self._confidence: if type == "final": - await self.queue_frame(TranscriptionFrame(transcript, "", time_now_iso8601())) + await self.push_frame(TranscriptionFrame(transcript, "", time_now_iso8601())) else: - await self.queue_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601())) + await self.push_frame(InterimTranscriptionFrame(transcript, "", time_now_iso8601())) diff --git a/src/pipecat/services/lmnt.py b/src/pipecat/services/lmnt.py index 638e394a1..60f0cb7df 100644 --- a/src/pipecat/services/lmnt.py +++ b/src/pipecat/services/lmnt.py @@ -46,7 +46,7 @@ class LmntTTSService(AsyncTTSService): **kwargs): # Let TTSService produce TTSStoppedFrames after a short delay of # no activity. - super().__init__(push_stop_frames=True, sample_rate=sample_rate, **kwargs) + super().__init__(sync=False, push_stop_frames=True, sample_rate=sample_rate, **kwargs) self._api_key = api_key self._voice_id = voice_id diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 3d1d0c4d7..8836fbd1e 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -31,16 +31,12 @@ from loguru import logger class BaseInputTransport(FrameProcessor): def __init__(self, params: TransportParams, **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._params = params self._executor = ThreadPoolExecutor(max_workers=5) - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def start(self, frame: StartFrame): # Create audio input queue and task if needed. if self._params.audio_in_enabled or self._params.vad_enabled: @@ -53,10 +49,6 @@ class BaseInputTransport(FrameProcessor): self._audio_task.cancel() await self._audio_task - # Wait for the push frame task to finish. It will finish when the - # EndFrame is actually processed. - await self._push_frame_task - async def cancel(self, frame: CancelFrame): # Cancel all the tasks and wait for them to finish. @@ -64,9 +56,6 @@ class BaseInputTransport(FrameProcessor): self._audio_task.cancel() await self._audio_task - self._push_frame_task.cancel() - await self._push_frame_task - def vad_analyzer(self) -> VADAnalyzer | None: return self._params.vad_analyzer @@ -86,11 +75,8 @@ class BaseInputTransport(FrameProcessor): await self.cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, BotInterruptionFrame): - await self._handle_interruptions(frame, False) - elif isinstance(frame, StartInterruptionFrame): + logger.debug("Bot interruption") await self._start_interruption() - elif isinstance(frame, StopInterruptionFrame): - await self._stop_interruption() # All other system frames elif isinstance(frame, SystemFrame): await self.push_frame(frame, direction) @@ -98,12 +84,12 @@ class BaseInputTransport(FrameProcessor): elif isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be # processed by every processor before any other frame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self.start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. - await self._internal_push_frame(frame, direction) + await self.push_frame(frame, direction) await self.stop(frame) elif isinstance(frame, VADParamsUpdateFrame): vad_analyzer = self.vad_analyzer() @@ -111,73 +97,28 @@ class BaseInputTransport(FrameProcessor): vad_analyzer.set_params(frame.params) # Other frames else: - await self._internal_push_frame(frame, direction) - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break + await self.push_frame(frame, direction) # # Handle interruptions # - async def _start_interruption(self): - if not self.interruptions_allowed: - return - - # Cancel the task. This will stop pushing frames downstream. - self._push_frame_task.cancel() - await self._push_frame_task - # Push an out-of-band frame (i.e. not using the ordered push - # frame task) to stop everything, specially at the output - # transport. - await self.push_frame(StartInterruptionFrame()) - # Create a new queue and task. - self._create_push_task() - - async def _stop_interruption(self): - if not self.interruptions_allowed: - return - - await self.push_frame(StopInterruptionFrame()) - - async def _handle_interruptions(self, frame: Frame, push_frame: bool): + async def _handle_interruptions(self, frame: Frame): if self.interruptions_allowed: - # Make sure we notify about interruptions quickly out-of-band - if isinstance(frame, BotInterruptionFrame): - logger.debug("Bot interruption") - await self._start_interruption() - elif isinstance(frame, UserStartedSpeakingFrame): + # Make sure we notify about interruptions quickly out-of-band. + if isinstance(frame, UserStartedSpeakingFrame): logger.debug("User started speaking") await self._start_interruption() + # Push an out-of-band frame (i.e. not using the ordered push + # frame task) to stop everything, specially at the output + # transport. + await self.push_frame(StartInterruptionFrame()) elif isinstance(frame, UserStoppedSpeakingFrame): logger.debug("User stopped speaking") await self._stop_interruption() + await self.push_frame(StopInterruptionFrame()) - if push_frame: - await self._internal_push_frame(frame) + await self.push_frame(frame) # # Audio input @@ -201,7 +142,7 @@ class BaseInputTransport(FrameProcessor): frame = UserStoppedSpeakingFrame() if frame: - await self._handle_interruptions(frame, True) + await self._handle_interruptions(frame) vad_state = new_vad_state return vad_state @@ -222,7 +163,7 @@ class BaseInputTransport(FrameProcessor): # Push audio downstream if passthrough. if audio_passthrough: - await self._internal_push_frame(frame) + await self.push_frame(frame) self._audio_in_queue.task_done() except asyncio.CancelledError: diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index a24c9f4d2..bc683721a 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -43,7 +43,7 @@ from pipecat.utils.time import nanoseconds_to_seconds class BaseOutputTransport(FrameProcessor): def __init__(self, params: TransportParams, **kwargs): - super().__init__(**kwargs) + super().__init__(sync=False, **kwargs) self._params = params @@ -70,10 +70,6 @@ class BaseOutputTransport(FrameProcessor): # generating frames upstream while, for example, the audio is playing. self._create_sink_tasks() - # Create push frame task. This is the task that will push frames in - # order. We also guarantee that all frames are pushed in the same task. - self._create_push_task() - async def start(self, frame: StartFrame): # Create camera output queue and task if needed. if self._params.camera_out_enabled: @@ -95,9 +91,8 @@ class BaseOutputTransport(FrameProcessor): self._audio_out_task.cancel() await self._audio_out_task - # Wait for the push frame and sink tasks to finish. They will finish when - # the EndFrame is actually processed. - await self._push_frame_task + # Wait for the sink task to finish. They will finish when the EndFrame + # is actually processed. await self._sink_task async def cancel(self, frame: CancelFrame): @@ -107,9 +102,6 @@ class BaseOutputTransport(FrameProcessor): self._camera_out_task.cancel() await self._camera_out_task - self._push_frame_task.cancel() - await self._push_frame_task - self._sink_task.cancel() await self._sink_task @@ -182,10 +174,6 @@ class BaseOutputTransport(FrameProcessor): await self._sink_clock_task # Create sink tasks. self._create_sink_tasks() - # Stop push task. - self._push_frame_task.cancel() - await self._push_frame_task - self._create_push_task() # Let's send a bot stopped speaking if we have to. if self._bot_speaking: await self._bot_stopped_speaking() @@ -227,7 +215,7 @@ class BaseOutputTransport(FrameProcessor): async def _sink_frame_handler(self, frame: Frame): if isinstance(frame, AudioRawFrame): await self.write_raw_audio_frames(frame.audio) - await self._internal_push_frame(frame) + await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) elif isinstance(frame, ImageRawFrame): await self._set_camera_image(frame) @@ -237,12 +225,12 @@ class BaseOutputTransport(FrameProcessor): await self.send_message(frame) elif isinstance(frame, TTSStartedFrame): await self._bot_started_speaking() - await self._internal_push_frame(frame) + await self.push_frame(frame) elif isinstance(frame, TTSStoppedFrame): await self._bot_stopped_speaking() - await self._internal_push_frame(frame) + await self.push_frame(frame) else: - await self._internal_push_frame(frame) + await self.push_frame(frame) async def _sink_task_handler(self): running = True @@ -261,7 +249,7 @@ class BaseOutputTransport(FrameProcessor): # TODO(aleix): For now we just process TextFrame. But we should process # audio and video as well. if isinstance(frame, TextFrame): - await self._internal_push_frame(frame) + await self.push_frame(frame) async def _sink_clock_task_handler(self): running = True @@ -293,38 +281,12 @@ class BaseOutputTransport(FrameProcessor): async def _bot_started_speaking(self): logger.debug("Bot started speaking") self._bot_speaking = True - await self._internal_push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM) + await self.push_frame(BotStartedSpeakingFrame(), FrameDirection.UPSTREAM) async def _bot_stopped_speaking(self): logger.debug("Bot stopped speaking") self._bot_speaking = False - await self._internal_push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM) - - # - # Push frames task - # - - def _create_push_task(self): - loop = self.get_event_loop() - self._push_queue = asyncio.Queue() - self._push_frame_task = loop.create_task(self._push_frame_task_handler()) - - async def _internal_push_frame( - self, - frame: Frame | None, - direction: FrameDirection | None = FrameDirection.DOWNSTREAM): - await self._push_queue.put((frame, direction)) - - async def _push_frame_task_handler(self): - running = True - while running: - try: - (frame, direction) = await self._push_queue.get() - await self.push_frame(frame, direction) - running = not isinstance(frame, EndFrame) - self._push_queue.task_done() - except asyncio.CancelledError: - break + await self.push_frame(BotStoppedSpeakingFrame(), FrameDirection.UPSTREAM) # # Camera out @@ -408,7 +370,7 @@ class BaseOutputTransport(FrameProcessor): try: frame = await self._audio_out_queue.get() await self.write_raw_audio_frames(frame.audio) - await self._internal_push_frame(frame) + await self.push_frame(frame) await self.push_frame(BotSpeakingFrame(), FrameDirection.UPSTREAM) except asyncio.CancelledError: break diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index 08ca9719a..819950d72 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -98,9 +98,9 @@ class WebsocketServerInputTransport(BaseInputTransport): continue if isinstance(frame, AudioRawFrame): - await self.push_audio_frame(frame) + await self.queue_audio_frame(frame) else: - await self._internal_push_frame(frame) + await self.push_frame(frame) # Notify disconnection await self._callbacks.on_client_disconnected(websocket) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 7cf330b9e..210cc7341 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -625,11 +625,11 @@ class DailyInputTransport(BaseInputTransport): # async def push_transcription_frame(self, frame: TranscriptionFrame | InterimTranscriptionFrame): - await self._internal_push_frame(frame) + await self.push_frame(frame) async def push_app_message(self, message: Any, sender: str): frame = DailyTransportMessageFrame(message=message, participant_id=sender) - await self._internal_push_frame(frame) + await self.push_frame(frame) # # Audio in @@ -692,7 +692,7 @@ class DailyInputTransport(BaseInputTransport): image=buffer, size=size, format=format) - await self._internal_push_frame(frame) + await self.push_frame(frame) self._video_renderers[participant_id]["timestamp"] = curr_time