allow enabling watchdog timers per frame processor or task

This commit is contained in:
Aleix Conchillo Flaqué
2025-06-25 16:34:45 -07:00
parent 4f032f5b96
commit ef1ade3a71
4 changed files with 39 additions and 18 deletions

View File

@@ -19,16 +19,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Introduce task watchdog timers. Watchdog timers are used to detect if a
Pipecat task is taking longer than expected (by default 5 seconds). Watchdog
timers are disabled by default and can be enabled by passing
timers are disabled by default and can be enabled globally by passing
`enable_watchdog_timers` argument to `PipelineTask` constructor. It is
possible to change the default watchdog timer timeout by using the
`watchdog_timeout` argument. You can also log how long it takes to reset the
watchdog timers which is done with the `enable_watchdog_logging`. You can
control these settings per each frame processor or even per task. That is, you
can set set `enable_watchdog_logging` and `watchdog_timeout` when creating any
frame processor through their constructor arguments or when you create a task
with `FrameProcessor.create_task()`. Note that watchdog timers only work with
Pipecat tasks and will not work if you use `asycio.create_task()` or similar.
control all these settings per each frame processor or even per task. That is,
you can set `enable_watchdog_timers`, `enable_watchdog_logging` and
`watchdog_timeout` when creating any frame processor through their constructor
arguments or when you create a task with `FrameProcessor.create_task()`. Note
that watchdog timers only work with Pipecat tasks and will not work if you use
`asycio.create_task()` or similar.
- Added `lexicon_names` parameter to `AWSPollyTTSService.InputParams`.

View File

@@ -55,6 +55,7 @@ class FrameProcessor(WatchdogReseter, BaseObject):
*,
name: Optional[str] = None,
enable_watchdog_logging: Optional[bool] = None,
enable_watchdog_timers: Optional[bool] = None,
metrics: Optional[FrameProcessorMetrics] = None,
watchdog_timeout_secs: Optional[float] = None,
**kwargs,
@@ -64,11 +65,14 @@ class FrameProcessor(WatchdogReseter, BaseObject):
self._prev: Optional["FrameProcessor"] = None
self._next: Optional["FrameProcessor"] = None
# Enable watchdog timers for all tasks created by this frame processor.
self._enable_watchdog_timers = enable_watchdog_timers
# Enable watchdog logging for all tasks created by this frame processor.
self._enable_watchdog_logging = enable_watchdog_logging
# Allow this frame processor to control their tasks timeout.
self._watchdog_timeout = watchdog_timeout_secs
self._watchdog_timeout_secs = watchdog_timeout_secs
# Clock
self._clock: Optional[BaseClock] = None
@@ -194,6 +198,7 @@ class FrameProcessor(WatchdogReseter, BaseObject):
name: Optional[str] = None,
*,
enable_watchdog_logging: Optional[bool] = None,
enable_watchdog_timers: Optional[bool] = None,
watchdog_timeout_secs: Optional[float] = None,
) -> asyncio.Task:
if name:
@@ -208,8 +213,11 @@ class FrameProcessor(WatchdogReseter, BaseObject):
if enable_watchdog_logging
else self._enable_watchdog_logging
),
enable_watchdog_timers=(
enable_watchdog_timers if enable_watchdog_timers else self.watchdog_timers_enabled
),
watchdog_timeout=(
watchdog_timeout_secs if watchdog_timeout_secs else self._watchdog_timeout
watchdog_timeout_secs if watchdog_timeout_secs else self._watchdog_timeout_secs
),
)
@@ -226,9 +234,13 @@ class FrameProcessor(WatchdogReseter, BaseObject):
self._clock = setup.clock
self._task_manager = setup.task_manager
self._observer = setup.observer
self._watchdog_timers_enabled = setup.watchdog_timers_enabled
self._watchdog_timers_enabled = (
self._enable_watchdog_timers
if self._enable_watchdog_timers
else setup.watchdog_timers_enabled
)
if self._metrics is not None:
await self._metrics.setup(self._task_manager, self.watchdog_timers_enabled)
await self._metrics.setup(self._task_manager, self._watchdog_timers_enabled)
async def cleanup(self):
await super().cleanup()
@@ -286,7 +298,8 @@ class FrameProcessor(WatchdogReseter, BaseObject):
async def resume_processing_frames(self):
logger.trace(f"{self}: resuming frame processing")
self.__input_event.set()
if self.__input_event:
self.__input_event.set()
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):

View File

@@ -524,7 +524,6 @@ class AudioContextWordTTSService(WebsocketWordTTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._contexts_queue = asyncio.Queue()
self._contexts: Dict[str, asyncio.Queue] = {}
self._audio_context_task = None

View File

@@ -39,6 +39,7 @@ class BaseTaskManager(ABC):
name: str,
*,
enable_watchdog_logging: Optional[bool] = None,
enable_watchdog_timers: Optional[bool] = None,
watchdog_timeout: Optional[float] = None,
) -> asyncio.Task:
"""
@@ -51,6 +52,7 @@ class BaseTaskManager(ABC):
coroutine (Coroutine): The coroutine to be executed within the task.
name (str): The name to assign to the task for identification.
enable_watchdog_logging(bool): whether this task should log watchdog processing times.
enable_watchdog_timers(bool): whether this task should have a watchdog timer.
watchdog_timeout(float): watchdog timer timeout for this task.
Returns:
@@ -108,6 +110,7 @@ class TaskData:
task: asyncio.Task
watchdog_timer: asyncio.Event
enable_watchdog_logging: bool
enable_watchdog_timers: bool
watchdog_timeout: float
watchdog_task: Optional[asyncio.Task]
@@ -132,6 +135,7 @@ class TaskManager(BaseTaskManager):
name: str,
*,
enable_watchdog_logging: Optional[bool] = None,
enable_watchdog_timers: Optional[bool] = None,
watchdog_timeout: Optional[float] = None,
) -> asyncio.Task:
"""
@@ -144,6 +148,7 @@ class TaskManager(BaseTaskManager):
coroutine (Coroutine): The coroutine to be executed within the task.
name (str): The name to assign to the task for identification.
enable_watchdog_logging(bool): whether this task should log watchdog processing time.
enable_watchdog_timers(bool): whether this task should have a watchdog timer.
watchdog_timeout(float): watchdog timer timeout for this task.
Returns:
@@ -175,11 +180,16 @@ class TaskManager(BaseTaskManager):
if enable_watchdog_logging
else self._params.enable_watchdog_logging
),
enable_watchdog_timers=(
enable_watchdog_timers
if enable_watchdog_timers
else self._params.enable_watchdog_timers
),
watchdog_timeout=(
watchdog_timeout if watchdog_timeout else self._params.watchdog_timeout
),
watchdog_task=None,
)
),
)
logger.trace(f"{name}: task created")
return task
@@ -256,19 +266,17 @@ class TaskManager(BaseTaskManager):
will be logged indicating the task is stalling.
"""
if self._params and not self._params.enable_watchdog_timers:
return
name = task.get_name()
if name in self._tasks:
self._tasks[name].watchdog_timer.set()
if self._tasks[name].enable_watchdog_timers:
self._tasks[name].watchdog_timer.set()
else:
logger.warning(f"Unable to reset watchdog timer: task {name} does not exist")
def _add_task(self, task_data: TaskData):
name = task_data.task.get_name()
self._tasks[name] = task_data
if self._params and self._params.enable_watchdog_timers:
if self._params and task_data.enable_watchdog_timers:
watchdog_task = self.get_event_loop().create_task(
self._watchdog_task_handler(task_data)
)