diff --git a/CHANGELOG.md b/CHANGELOG.md index c7734e4d6..673980d13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `PipelineTask` now waits for `StartFrame` to reach the end of the pipeline + before pushing any other frames. + - Updated `CartesiaTTSService` and `CartesiaHttpTTSService` to align with Cartesia's changes for the `speed` parameter. It now takes only an enum of `slow`, `normal`, or `fast`. @@ -100,6 +103,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed an RTVI issue that was causing frames to be pushed before pipeline was + properly initialized. + - Fixed some `get_messages_for_logging()` that were returning a JSON string instead of a list. diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 5500dba62..23b8c5cc2 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -236,8 +236,13 @@ class PipelineTask(BasePipelineTask): # idle. self._idle_queue = asyncio.Queue() self._idle_monitor_task: Optional[asyncio.Task] = None + + # This event is used to indicate the StartFrame has been received at the + # end of the pipeline. + self._pipeline_start_event = asyncio.Event() + # This event is used to indicate a finalize frame (e.g. EndFrame, - # StopFrame) has been received in the down queue. + # StopFrame) has been received at the end of the pipeline. self._pipeline_end_event = asyncio.Event() # This is the final pipeline. It is composed of a source processor, @@ -516,8 +521,15 @@ class PipelineTask(BasePipelineTask): data.append(ProcessingMetricsData(processor=p.name, value=0.0)) return MetricsFrame(data=data) + async def _wait_for_pipeline_start(self, frame: Frame): + """Wait for the specified start frame to reach the end of the pipeline.""" + logger.debug(f"{self}: Starting. Waiting for {frame} to reach the end of the pipeline...") + await self._pipeline_start_event.wait() + self._pipeline_start_event.clear() + logger.debug(f"{self}: {frame} reached the end of the pipeline, pipeline is now ready.") + async def _wait_for_pipeline_end(self, frame: Frame): - """Wait for the pipeline to signal completion.""" + """Wait for the specified frame to reach the end of the pipeline.""" async def wait_for_cancel(): try: @@ -532,13 +544,13 @@ class PipelineTask(BasePipelineTask): finally: await self._call_event_handler("on_pipeline_cancelled", frame) - logger.debug(f"{self}: waiting for {frame} to reach the end of the pipeline...") + logger.debug(f"{self}: Closing. Waiting for {frame} to reach the end of the pipeline...") if isinstance(frame, CancelFrame): await wait_for_cancel() else: await self._pipeline_end_event.wait() - logger.debug(f"{self}: {frame} reached the end of the pipeline.") + logger.debug(f"{self}: {frame} reached the end of the pipeline, pipeline is closing.") self._pipeline_end_event.clear() @@ -591,6 +603,9 @@ class PipelineTask(BasePipelineTask): start_frame.metadata = self._params.start_metadata await self._pipeline.queue_frame(start_frame) + # Wait for the pipeline to be started before pushing any other frame. + await self._wait_for_pipeline_start(start_frame) + if self._params.enable_metrics and self._params.send_initial_empty_metrics: await self._pipeline.queue_frame(self._initial_metrics_frame()) @@ -657,6 +672,8 @@ class PipelineTask(BasePipelineTask): # Start heartbeat tasks now that StartFrame has been processed # by all processors in the pipeline self._maybe_start_heartbeat_tasks() + + self._pipeline_start_event.set() elif isinstance(frame, EndFrame): await self._call_event_handler("on_pipeline_ended", frame) self._pipeline_end_event.set()