diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d8e18746..865f75676 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added a new `PipelineTask` parameter `observers` that replaces the previous + `PipelineParams.observers`. + - Added a new `PipelineTask` parameter `check_dangling_tasks` to enable or disable checking for frame processors' dangling tasks when the Pipeline finishes running. @@ -45,6 +48,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general")) ``` +### Deprecated + +- `PipelineParams.observers` is now deprecated, you the new `PipelineTask` + parameter `observers`. + ### Removed - Remove `TransportParams.audio_out_is_live` since it was not being used at all. diff --git a/examples/instant-voice/server/src/single_bot.py b/examples/instant-voice/server/src/single_bot.py index e1e9df68a..55ba20377 100644 --- a/examples/instant-voice/server/src/single_bot.py +++ b/examples/instant-voice/server/src/single_bot.py @@ -92,10 +92,8 @@ async def main(): task = PipelineTask( pipeline, - params=PipelineParams( - allow_interruptions=True, - observers=[rtvi.observer()], - ), + params=PipelineParams(allow_interruptions=True), + observers=[rtvi.observer()], ) @rtvi.event_handler("on_client_ready") diff --git a/examples/news-chatbot/server/news_bot.py b/examples/news-chatbot/server/news_bot.py index 7aed4c3b9..a34b0c3d0 100644 --- a/examples/news-chatbot/server/news_bot.py +++ b/examples/news-chatbot/server/news_bot.py @@ -140,10 +140,8 @@ async def main(): task = PipelineTask( pipeline, - PipelineParams( - allow_interruptions=True, - observers=[GoogleRTVIObserver(rtvi)], - ), + PipelineParams(allow_interruptions=True), + observers=[GoogleRTVIObserver(rtvi)], ) @rtvi.event_handler("on_client_ready") diff --git a/examples/simple-chatbot/server/bot-gemini.py b/examples/simple-chatbot/server/bot-gemini.py index ecf38f159..f80a9770e 100644 --- a/examples/simple-chatbot/server/bot-gemini.py +++ b/examples/simple-chatbot/server/bot-gemini.py @@ -176,8 +176,8 @@ async def main(): allow_interruptions=True, enable_metrics=True, enable_usage_metrics=True, - observers=[RTVIObserver(rtvi)], ), + observers=[RTVIObserver(rtvi)], ) await task.queue_frame(quiet_frame) diff --git a/examples/simple-chatbot/server/bot-openai.py b/examples/simple-chatbot/server/bot-openai.py index 51c421c7d..e1d7bb9a0 100644 --- a/examples/simple-chatbot/server/bot-openai.py +++ b/examples/simple-chatbot/server/bot-openai.py @@ -202,8 +202,8 @@ async def main(): allow_interruptions=True, enable_metrics=True, enable_usage_metrics=True, - observers=[RTVIObserver(rtvi)], ), + observers=[RTVIObserver(rtvi)], ) await task.queue_frame(quiet_frame) diff --git a/examples/translation-chatbot/bot.py b/examples/translation-chatbot/bot.py index 8d4ee851e..5839afb5d 100644 --- a/examples/translation-chatbot/bot.py +++ b/examples/translation-chatbot/bot.py @@ -187,8 +187,8 @@ async def main(): allow_interruptions=False, # We don't want to interrupt the translator bot enable_metrics=True, enable_usage_metrics=True, - observers=[RTVIObserver(rtvi)], ), + observers=[RTVIObserver(rtvi)], ) @transport.event_handler("on_first_participant_joined") diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 1813878e1..873fc60d6 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -122,6 +122,7 @@ class PipelineTask(BaseTask): Args: pipeline: The pipeline to execute. params: Configuration parameters for the pipeline. + observers: List of observers for monitoring pipeline execution. clock: Clock implementation for timing operations. check_dangling_tasks: Whether to check for processors' tasks finishing properly. """ @@ -130,6 +131,7 @@ class PipelineTask(BaseTask): self, pipeline: BasePipeline, params: PipelineParams = PipelineParams(), + observers: List[BaseObserver] = [], clock: BaseClock = SystemClock(), check_dangling_tasks: bool = True, ): @@ -140,6 +142,16 @@ class PipelineTask(BaseTask): self._clock = clock self._params = params self._check_dangling_tasks = check_dangling_tasks + if self._params.observers: + import warnings + + with warnings.catch_warnings(): + warnings.simplefilter("always") + warnings.warn( + "Field 'observers' is deprecated, use the 'observers' parameter instead.", + DeprecationWarning, + ) + observers = self._params.observers self._finished = False # This queue receives frames coming from the pipeline upstream. @@ -163,7 +175,7 @@ class PipelineTask(BaseTask): self._task_manager = TaskManager() - self._observer = TaskObserver(observers=params.observers, task_manager=self._task_manager) + self._observer = TaskObserver(observers=observers, task_manager=self._task_manager) @property def id(self) -> int: diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 0aff922b2..07abef48e 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -111,8 +111,8 @@ class TestPipelineTask(unittest.IsolatedAsyncioTestCase): params=PipelineParams( enable_heartbeats=True, heartbeats_period_secs=0.2, - observers=[heartbeats_observer], ), + observers=[heartbeats_observer], ) task.set_event_loop(asyncio.get_event_loop())