Merge pull request #1282 from pipecat-ai/aleix/pipelinetask-observers-constructor

PipelineTask: pass observers in contructor parameter
This commit is contained in:
Aleix Conchillo Flaqué
2025-02-24 21:29:46 -08:00
committed by GitHub
8 changed files with 29 additions and 13 deletions

View File

@@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added a new `PipelineTask` parameter `observers` that replaces the previous
`PipelineParams.observers`.
- Added a new `PipelineTask` parameter `check_dangling_tasks` to enable or
disable checking for frame processors' dangling tasks when the Pipeline
finishes running.
@@ -45,6 +48,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
```
### Deprecated
- `PipelineParams.observers` is now deprecated, you the new `PipelineTask`
parameter `observers`.
### Removed
- Remove `TransportParams.audio_out_is_live` since it was not being used at all.

View File

@@ -92,10 +92,8 @@ async def main():
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
observers=[rtvi.observer()],
),
params=PipelineParams(allow_interruptions=True),
observers=[rtvi.observer()],
)
@rtvi.event_handler("on_client_ready")

View File

@@ -140,10 +140,8 @@ async def main():
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
observers=[GoogleRTVIObserver(rtvi)],
),
PipelineParams(allow_interruptions=True),
observers=[GoogleRTVIObserver(rtvi)],
)
@rtvi.event_handler("on_client_ready")

View File

@@ -176,8 +176,8 @@ async def main():
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
observers=[RTVIObserver(rtvi)],
),
observers=[RTVIObserver(rtvi)],
)
await task.queue_frame(quiet_frame)

View File

@@ -202,8 +202,8 @@ async def main():
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
observers=[RTVIObserver(rtvi)],
),
observers=[RTVIObserver(rtvi)],
)
await task.queue_frame(quiet_frame)

View File

@@ -187,8 +187,8 @@ async def main():
allow_interruptions=False, # We don't want to interrupt the translator bot
enable_metrics=True,
enable_usage_metrics=True,
observers=[RTVIObserver(rtvi)],
),
observers=[RTVIObserver(rtvi)],
)
@transport.event_handler("on_first_participant_joined")

View File

@@ -122,6 +122,7 @@ class PipelineTask(BaseTask):
Args:
pipeline: The pipeline to execute.
params: Configuration parameters for the pipeline.
observers: List of observers for monitoring pipeline execution.
clock: Clock implementation for timing operations.
check_dangling_tasks: Whether to check for processors' tasks finishing properly.
"""
@@ -130,6 +131,7 @@ class PipelineTask(BaseTask):
self,
pipeline: BasePipeline,
params: PipelineParams = PipelineParams(),
observers: List[BaseObserver] = [],
clock: BaseClock = SystemClock(),
check_dangling_tasks: bool = True,
):
@@ -140,6 +142,16 @@ class PipelineTask(BaseTask):
self._clock = clock
self._params = params
self._check_dangling_tasks = check_dangling_tasks
if self._params.observers:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Field 'observers' is deprecated, use the 'observers' parameter instead.",
DeprecationWarning,
)
observers = self._params.observers
self._finished = False
# This queue receives frames coming from the pipeline upstream.
@@ -163,7 +175,7 @@ class PipelineTask(BaseTask):
self._task_manager = TaskManager()
self._observer = TaskObserver(observers=params.observers, task_manager=self._task_manager)
self._observer = TaskObserver(observers=observers, task_manager=self._task_manager)
@property
def id(self) -> int:

View File

@@ -111,8 +111,8 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase):
params=PipelineParams(
enable_heartbeats=True,
heartbeats_period_secs=0.2,
observers=[heartbeats_observer],
),
observers=[heartbeats_observer],
)
task.set_event_loop(asyncio.get_event_loop())