Files
pipecat/examples/multi-worker/sensor-controller/sensor.py
Aleix Conchillo Flaqué b03247f360 Rename BaseTask → BaseWorker and reserve "task" for asyncio
Replaces every "task" identifier that referred to the BaseTask
abstraction with "worker". Asyncio task plumbing (asyncio.Task,
BaseTaskManager, TaskManager, create_task, cancel_task, etc.) stays
untouched. Highlights:

- Classes: BaseTask → BaseWorker, PipelineTask → PipelineWorker,
  LLMTask → LLMWorker, LLMContextTask → LLMContextWorker, TaskBus →
  WorkerBus, TaskRegistry → WorkerRegistry, TaskActivationArgs →
  WorkerActivationArgs, TaskReadyData → WorkerReadyData,
  TaskRegistryEntry → WorkerRegistryEntry, TaskObserver →
  WorkerObserver, all Bus*TaskMessage → Bus*WorkerMessage,
  BusAddTaskMessage.task field → worker, BusWorkerRegistryMessage.tasks
  field → workers.
- Methods/decorators: activate_task → activate_worker, deactivate_task
  → deactivate_worker, add_task → add_worker, watch_task →
  watch_worker, @task_ready → @worker_ready, setup_pipeline_task hook
  → setup_pipeline_worker.
- Params/fields: FrameProcessorSetup.pipeline_task and
  FunctionCallParams.pipeline_task → pipeline_worker. Parameter names
  like task_name → worker_name; spawn/run accept worker:.
- Files: pipeline/base_task.py → base_worker.py, pipeline/task.py →
  worker.py (plus a re-export shim at pipeline/task.py),
  task_observer.py → worker_observer.py, task_ready_decorator.py →
  worker_ready_decorator.py, pipecat.tasks → pipecat.workers,
  llm_task.py → llm_worker.py, llm_context_task.py →
  llm_context_worker.py, examples/multi-task → examples/multi-worker.

Back-compat:
- PipelineTask kept as a deprecated subclass of PipelineWorker that
  warns on construction.
- pipecat.pipeline.task re-exports PipelineWorker/PipelineTask/etc. so
  existing user imports keep working.
- FrameProcessor.pipeline_task kept as a deprecated property that
  forwards to pipeline_worker.

Local variables in examples that hold a worker (task = PipelineTask(...))
are renamed to worker = PipelineWorker(...). Asyncio-task locals
(runner_task, etc.) are preserved.
2026-05-21 19:07:13 -07:00

187 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Temperature sensor processors for the sensor-controller example.
Two custom :class:`FrameProcessor` subclasses that give the worker
pipeline real autonomous frame flow:
- :class:`SensorReader` simulates a thermometer. It runs an async tick
loop that advances ``current`` toward ``target`` with a first-order
lag plus Gaussian noise, and pushes a :class:`SensorReadingFrame` on
every tick. ``target`` and ``response_rate`` are mutable so the
worker's LLM can adjust them via tool calls.
- :class:`SensorStats` consumes the readings, maintains a rolling
window, and exposes ``current`` / ``min`` / ``max`` / ``avg`` /
``trend`` as properties. The worker LLM reads these directly when
answering the user.
"""
import random
import time
from collections import deque
from dataclasses import dataclass
from pipecat.frames.frames import DataFrame, Frame, StartFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@dataclass
class SensorReadingFrame(DataFrame):
"""A single temperature reading emitted by :class:`SensorReader`.
Parameters:
temperature: The reading in degrees Celsius.
timestamp: Unix timestamp when the reading was taken.
"""
temperature: float = 0.0
timestamp: float = 0.0
class SensorReader(FrameProcessor):
"""Simulated temperature sensor with adjustable target and response rate.
Each tick, ``current`` is updated as::
current += (target - current) * response_rate + gauss(0, noise_sigma)
This is a first-order lag toward ``target``. With ``response_rate=0.05``
and a 1s tick, the current reading reaches ~halfway to target in ~14s;
with ``response_rate=0.2`` it converges in ~510s.
"""
def __init__(
self,
*,
start_temp: float = 22.0,
sample_period_s: float = 1.0,
response_rate: float = 0.05,
noise_sigma: float = 0.1,
):
"""Initialize the sensor.
Args:
start_temp: Initial temperature and initial target (°C).
sample_period_s: Seconds between successive readings.
response_rate: Fraction of the gap toward target closed each tick
(clamped to ``[0.0, 1.0]``).
noise_sigma: Standard deviation of the Gaussian noise added to
each reading.
"""
super().__init__()
self._current = start_temp
self._target = start_temp
self._response_rate = max(0.0, min(1.0, response_rate))
self._noise_sigma = noise_sigma
self._sample_period_s = sample_period_s
self._tick_task = None
@property
def current(self) -> float:
"""The most recent temperature reading (°C)."""
return self._current
@property
def target(self) -> float:
"""The temperature the sensor is drifting toward (°C)."""
return self._target
@property
def response_rate(self) -> float:
"""Fraction of the target-current gap closed per tick."""
return self._response_rate
def set_target(self, value: float) -> None:
"""Set a new target temperature (°C)."""
self._target = value
def set_response_rate(self, rate: float) -> None:
"""Set how aggressively the sensor approaches the target.
Args:
rate: Fraction in ``[0.0, 1.0]``. Clamped to that range.
"""
self._response_rate = max(0.0, min(1.0, rate))
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame) and self._tick_task is None:
self._tick_task = self.create_task(self._tick_loop(), "ticker")
await self.push_frame(frame, direction)
async def cleanup(self) -> None:
if self._tick_task is not None:
await self.cancel_task(self._tick_task)
self._tick_task = None
await super().cleanup()
async def _tick_loop(self) -> None:
import asyncio
while True:
await asyncio.sleep(self._sample_period_s)
gap = self._target - self._current
self._current += gap * self._response_rate + random.gauss(0, self._noise_sigma)
await self.push_frame(
SensorReadingFrame(temperature=self._current, timestamp=time.time()),
FrameDirection.DOWNSTREAM,
)
class SensorStats(FrameProcessor):
"""Rolling-window statistics over :class:`SensorReadingFrame`s.
Consumes readings as they flow downstream and exposes rolling
``min`` / ``max`` / ``avg`` / ``trend`` as properties — the worker
LLM reads them directly when responding to the user.
"""
def __init__(self, window: int = 30):
"""Initialize the stats aggregator.
Args:
window: Number of recent readings to retain.
"""
super().__init__()
self._readings: deque[float] = deque(maxlen=window)
@property
def current(self) -> float:
"""The most recent reading, or 0.0 if none have been seen."""
return self._readings[-1] if self._readings else 0.0
@property
def min(self) -> float:
return min(self._readings) if self._readings else 0.0
@property
def max(self) -> float:
return max(self._readings) if self._readings else 0.0
@property
def avg(self) -> float:
return sum(self._readings) / len(self._readings) if self._readings else 0.0
@property
def trend(self) -> str:
"""``"rising"`` / ``"falling"`` / ``"stable"`` based on first vs. last half of the window."""
if len(self._readings) < 4:
return "stable"
half = len(self._readings) // 2
old_avg = sum(list(self._readings)[:half]) / half
new_avg = sum(list(self._readings)[half:]) / (len(self._readings) - half)
diff = new_avg - old_avg
if abs(diff) < 0.25:
return "stable"
return "rising" if diff > 0 else "falling"
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, SensorReadingFrame):
self._readings.append(frame.temperature)
await self.push_frame(frame, direction)