Merge pull request #2526 from pipecat-ai/aleix/pipeline-task-wait-for-startframe

PipelineTask: wait for StartFrame to reach end of pipeline
This commit is contained in:
Aleix Conchillo Flaqué
2025-08-27 15:57:10 -07:00
committed by GitHub
2 changed files with 27 additions and 4 deletions

View File

@@ -87,6 +87,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- `PipelineTask` now waits for `StartFrame` to reach the end of the pipeline
before pushing any other frames.
- Updated `CartesiaTTSService` and `CartesiaHttpTTSService` to align with
Cartesia's changes for the `speed` parameter. It now takes only an enum of
`slow`, `normal`, or `fast`.
@@ -100,6 +103,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an RTVI issue that was causing frames to be pushed before pipeline was
properly initialized.
- Fixed some `get_messages_for_logging()` that were returning a JSON string
instead of a list.

View File

@@ -236,8 +236,13 @@ class PipelineTask(BasePipelineTask):
# idle.
self._idle_queue = asyncio.Queue()
self._idle_monitor_task: Optional[asyncio.Task] = None
# This event is used to indicate the StartFrame has been received at the
# end of the pipeline.
self._pipeline_start_event = asyncio.Event()
# This event is used to indicate a finalize frame (e.g. EndFrame,
# StopFrame) has been received in the down queue.
# StopFrame) has been received at the end of the pipeline.
self._pipeline_end_event = asyncio.Event()
# This is the final pipeline. It is composed of a source processor,
@@ -516,8 +521,15 @@ class PipelineTask(BasePipelineTask):
data.append(ProcessingMetricsData(processor=p.name, value=0.0))
return MetricsFrame(data=data)
async def _wait_for_pipeline_start(self, frame: Frame):
"""Wait for the specified start frame to reach the end of the pipeline."""
logger.debug(f"{self}: Starting. Waiting for {frame} to reach the end of the pipeline...")
await self._pipeline_start_event.wait()
self._pipeline_start_event.clear()
logger.debug(f"{self}: {frame} reached the end of the pipeline, pipeline is now ready.")
async def _wait_for_pipeline_end(self, frame: Frame):
"""Wait for the pipeline to signal completion."""
"""Wait for the specified frame to reach the end of the pipeline."""
async def wait_for_cancel():
try:
@@ -532,13 +544,13 @@ class PipelineTask(BasePipelineTask):
finally:
await self._call_event_handler("on_pipeline_cancelled", frame)
logger.debug(f"{self}: waiting for {frame} to reach the end of the pipeline...")
logger.debug(f"{self}: Closing. Waiting for {frame} to reach the end of the pipeline...")
if isinstance(frame, CancelFrame):
await wait_for_cancel()
else:
await self._pipeline_end_event.wait()
logger.debug(f"{self}: {frame} reached the end of the pipeline.")
logger.debug(f"{self}: {frame} reached the end of the pipeline, pipeline is closing.")
self._pipeline_end_event.clear()
@@ -591,6 +603,9 @@ class PipelineTask(BasePipelineTask):
start_frame.metadata = self._params.start_metadata
await self._pipeline.queue_frame(start_frame)
# Wait for the pipeline to be started before pushing any other frame.
await self._wait_for_pipeline_start(start_frame)
if self._params.enable_metrics and self._params.send_initial_empty_metrics:
await self._pipeline.queue_frame(self._initial_metrics_frame())
@@ -657,6 +672,8 @@ class PipelineTask(BasePipelineTask):
# Start heartbeat tasks now that StartFrame has been processed
# by all processors in the pipeline
self._maybe_start_heartbeat_tasks()
self._pipeline_start_event.set()
elif isinstance(frame, EndFrame):
await self._call_event_handler("on_pipeline_ended", frame)
self._pipeline_end_event.set()