Compare commits

..

1 Commits

Author SHA1 Message Date
James Hush
45c790c7d5 feat: Add debug logging to Gladia STT service
- Log transcript processing including confidence checks
2025-09-18 11:51:53 +08:00
15 changed files with 112 additions and 319 deletions

View File

@@ -9,23 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `on_before_disconnect` synchronous event to `DailyTransport` and
`LiveKitTransport`.
- It is now possible to register synchronous event handlers. By default, all
event handlers are executed in a separate task. However, in some cases we want
to guarantee order of execution, for example, executing something before
disconnecting a transport.
```python
self._register_event_handler("on_event_name", sync=True)
```
- Added support for global location in `GoogleVertexLLMService`. The service now
supports both regional locations (e.g., "us-east4") and the "global" location
for Vertex AI endpoints. When using "global" location, the service will use
`aiplatform.googleapis.com` as the API host instead of the regional format.
- Added `on_pipeline_finished` event to `PipelineTask`. This event will get
fired when the pipeline is done running. This can be the result of a
`StopFrame`, `CancelFrame` or `EndFrame`.
@@ -38,8 +21,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Updated Silero VAD model to v6.
- Updated `livekit` to 1.0.13.
- `torch` and `torchaudio` are no longer required for running Smart Turn
@@ -55,28 +36,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Deprecated
- `GladiaSTTService`'s `confidence` arg is deprecated. `confidence` is no
longer needed to determine which transcription or translation frames to
emit.
- `PipelineTask` events `on_pipeline_stopped`, `on_pipeline_ended` and
`on_pipeline_cancelled` are now deprecated. Use `on_pipeline_finished`
instead.
### Fixed
- Fixed an issue where multiple handlers for an event would not run in parallel.
- Fixed `DailyTransport.sip_call_transfer()` to automatically use the session
ID from the `on_dialin_connected` event, when not explicitly provided. Now
supports cold transfers (from incoming dial-in calls) by automatically
tracking session IDs from connection events.
- Fixed a memory leak in `SmallWebRTCTransport`. In `aiortc`, when you receive
a `MediaStreamTrack` (audio or video), frames are produced asynchronously. If
the code never consumes these frames, they are queued in memory, causing a
memory leak.
- Fixed an issue in `AsyncAITTSService`, where `TTSTextFrames` were not being
pushed.

View File

@@ -21,8 +21,6 @@
🧭 Looking to build structured conversations? Check out [Pipecat Flows](https://github.com/pipecat-ai/pipecat-flows) for managing complex conversational states and transitions.
🔍 Looking for help debugging your pipeline and processors? Check out [Whisker](https://github.com/pipecat-ai/whisker), a real-time Pipecat debugger.
## 🧠 Why Pipecat?
- **Voice-first**: Integrates speech recognition, text-to-speech, and conversation handling

View File

@@ -106,7 +106,7 @@ tavus=[]
together = []
tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ]
ultravox = [ "transformers>=4.48.0", "vllm>=0.9.0" ]
webrtc = [ "aiortc~=1.13.0", "opencv-python~=4.11.0.86" ]
webrtc = [ "aiortc~=1.11.0", "opencv-python~=4.11.0.86" ]
websocket = [ "pipecat-ai[websockets-base]", "fastapi>=0.115.6,<0.117.0" ]
websockets-base = [ "websockets>=13.1,<16.0" ]
whisper = [ "faster-whisper~=1.1.1" ]

View File

@@ -1,12 +0,0 @@
#!/bin/bash
PID=$1
while true; do
# Clear the screen
clear
# Print the header + RSS in GB
ps -p "$PID" -o pid,comm,rss | \
awk 'NR==1 {print $0, "rss_GB"} NR>1 {printf "%s %s %s %.2f\n", $1,$2,$3,$3/1024/1024}'
sleep 1
done

View File

@@ -169,4 +169,11 @@ class ServiceSwitcher(ParallelPipeline, Generic[StrategyType]):
service_switcher_filter_frame = ServiceSwitcher.ServiceSwitcherFilterFrame(
active_service=self.strategy.active_service
)
await super().process_frame(service_switcher_filter_frame, direction)
# Hack: we need access ParallelPipeline internals here to queue the
# frame in each of the pipelines.
# Why not just call super().process_frame(service_switcher_filter_frame, direction),
# you ask? Because that would also send this internal-only frame
# down the "main" pipeline instead of only down the individual
# branches of the parallel pipeline, which we want to avoid.
for p in self._pipelines:
await p.queue_frame(service_switcher_filter_frame, direction)

View File

@@ -568,17 +568,11 @@ class FrameProcessor(BaseObject):
"""Pause processing of queued frames."""
logger.trace(f"{self}: pausing frame processing")
self.__should_block_frames = True
# We should also unset the process event here, in case it was set immediately after an interruption
if self.__process_event:
self.__process_event.clear()
async def pause_processing_system_frames(self):
"""Pause processing of queued system frames."""
logger.trace(f"{self}: pausing system frame processing")
self.__should_block_system_frames = True
# We should also unset the input event here, in case it was set immediately after an interruption
if self.__input_event:
self.__input_event.clear()
async def resume_processing_frames(self):
"""Resume processing of queued frames."""

View File

@@ -78,6 +78,10 @@ from pipecat.runner.types import (
SmallWebRTCRunnerArguments,
WebSocketRunnerArguments,
)
from pipecat.transports.smallwebrtc.request_handler import (
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)
try:
import uvicorn
@@ -182,10 +186,6 @@ def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "lo
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)
except ImportError as e:
logger.error(f"WebRTC transport dependencies not installed: {e}")
return

View File

@@ -13,7 +13,6 @@ supporting multiple languages, custom vocabulary, and various audio processing o
import asyncio
import base64
import json
import warnings
from typing import Any, AsyncGenerator, Dict, Literal, Optional
import aiohttp
@@ -174,6 +173,8 @@ class _InputParamsDescriptor:
"""Descriptor for backward compatibility with deprecation warning."""
def __get__(self, obj, objtype=None):
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
@@ -207,7 +208,7 @@ class GladiaSTTService(STTService):
api_key: str,
region: Literal["us-west", "eu-west"] | None = None,
url: str = "https://api.gladia.io/v2/live",
confidence: Optional[float] = None,
confidence: float = 0.5,
sample_rate: Optional[int] = None,
model: str = "solaria-1",
params: Optional[GladiaInputParams] = None,
@@ -223,11 +224,6 @@ class GladiaSTTService(STTService):
region: Region used to process audio. eu-west or us-west. Defaults to eu-west.
url: Gladia API URL. Defaults to "https://api.gladia.io/v2/live".
confidence: Minimum confidence threshold for transcriptions (0.0-1.0).
.. deprecated:: 0.0.86
The 'confidence' parameter is deprecated and will be removed in a future version.
No confidence threshold is applied.
sample_rate: Audio sample rate in Hz. If None, uses service default.
model: Model to use for transcription. Defaults to "solaria-1".
params: Additional configuration parameters for Gladia service.
@@ -240,6 +236,7 @@ class GladiaSTTService(STTService):
params = params or GladiaInputParams()
# Warn about deprecated language parameter if it's used
if params.language is not None:
with warnings.catch_warnings():
warnings.simplefilter("always")
@@ -250,20 +247,11 @@ class GladiaSTTService(STTService):
stacklevel=2,
)
if confidence:
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"The 'confidence' parameter is deprecated and will be removed in a future version. "
"No confidence threshold is applied.",
DeprecationWarning,
stacklevel=2,
)
self._api_key = api_key
self._region = region
self._url = url
self.set_model_name(model)
self._confidence = confidence
self._params = params
self._websocket = None
self._receive_task = None
@@ -587,40 +575,48 @@ class GladiaSTTService(STTService):
elif content["type"] == "transcript":
utterance = content["data"]["utterance"]
confidence = utterance.get("confidence", 0)
language = utterance["language"]
transcript = utterance["text"]
is_final = content["data"]["is_final"]
if is_final:
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=content,
if confidence >= self._confidence:
if is_final:
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=content,
)
)
await self._handle_transcription(
transcript=transcript,
is_final=is_final,
language=language,
)
else:
await self.push_frame(
InterimTranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=content,
)
)
)
await self._handle_transcription(
transcript=transcript,
is_final=is_final,
language=language,
)
else:
await self.push_frame(
InterimTranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=content,
)
logger.debug(
f"{self} Skipping transcript due to low confidence: '{transcript}' (confidence: {confidence} < {self._confidence})"
)
elif content["type"] == "translation":
translated_utterance = content["data"]["translated_utterance"]
original_language = content["data"]["original_language"]
translated_language = translated_utterance["language"]
confidence = translated_utterance.get("confidence", 0)
translation = translated_utterance["text"]
if translated_language != original_language:
if translated_language != original_language and confidence >= self._confidence:
await self.push_frame(
TranslationFrame(
translation, "", time_now_iso8601(), translated_language

View File

@@ -83,23 +83,14 @@ class GoogleVertexLLMService(OpenAILLMService):
self._api_key = self._get_api_token(credentials, credentials_path)
super().__init__(
api_key=self._api_key,
base_url=base_url,
model=model,
params=params,
**kwargs,
api_key=self._api_key, base_url=base_url, model=model, params=params, **kwargs
)
@staticmethod
def _get_base_url(params: InputParams) -> str:
"""Construct the base URL for Vertex AI API."""
# Determine the correct API host based on location
if params.location == "global":
api_host = "aiplatform.googleapis.com"
else:
api_host = f"{params.location}-aiplatform.googleapis.com"
return (
f"https://{api_host}/v1/"
f"https://{params.location}-aiplatform.googleapis.com/v1/"
f"projects/{params.project_id}/locations/{params.location}/endpoints/openapi"
)
@@ -127,14 +118,12 @@ class GoogleVertexLLMService(OpenAILLMService):
if credentials:
# Parse and load credentials from JSON string
creds = service_account.Credentials.from_service_account_info(
json.loads(credentials),
scopes=["https://www.googleapis.com/auth/cloud-platform"],
json.loads(credentials), scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
elif credentials_path:
# Load credentials from JSON file
creds = service_account.Credentials.from_service_account_file(
credentials_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
credentials_path, scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
else:
try:

View File

@@ -25,7 +25,6 @@ from pydantic import BaseModel
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
from pipecat.frames.frames import (
CancelFrame,
ControlFrame,
EndFrame,
ErrorFrame,
Frame,
@@ -42,7 +41,6 @@ from pipecat.frames.frames import (
UserAudioRawFrame,
UserImageRawFrame,
UserImageRequestFrame,
DataFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup
from pipecat.transcriptions.language import Language
@@ -107,17 +105,6 @@ class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
participant_id: Optional[str] = None
@dataclass
class DailyUpdateRemoteParticipantsFrame(ControlFrame):
"""Frame to update remote participants in Daily calls.
Parameters:
remote_participants: See https://reference-python.daily.co/api_reference.html#daily.CallClient.update_remote_participants.
"""
remote_participants: Mapping[str, Any] = None
class WebRTCVADAnalyzer(VADAnalyzer):
"""Voice Activity Detection analyzer using WebRTC.
@@ -228,7 +215,6 @@ class DailyCallbacks(BaseModel):
on_active_speaker_changed: Called when the active speaker of the call has changed.
on_joined: Called when bot successfully joined a room.
on_left: Called when bot left a room.
on_before_leave: Called when bot is about to leave the room.
on_error: Called when an error occurs.
on_app_message: Called when receiving an app message.
on_call_state_updated: Called when call state changes.
@@ -258,7 +244,6 @@ class DailyCallbacks(BaseModel):
on_active_speaker_changed: Callable[[Mapping[str, Any]], Awaitable[None]]
on_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
on_left: Callable[[], Awaitable[None]]
on_before_leave: Callable[[], Awaitable[None]]
on_error: Callable[[str], Awaitable[None]]
on_app_message: Callable[[Any, str], Awaitable[None]]
on_call_state_updated: Callable[[str], Awaitable[None]]
@@ -374,7 +359,6 @@ class DailyTransportClient(EventHandler):
self._transcription_ids = []
self._transcription_status = None
self._dial_out_session_id: str = ""
self._dial_in_session_id: str = ""
self._joining = False
self._joined = False
@@ -735,9 +719,6 @@ class DailyTransportClient(EventHandler):
logger.info(f"Leaving {self._room_url}")
# Call callback before leaving.
await self._callbacks.on_before_leave()
if self._params.transcription_enabled:
await self.stop_transcription()
@@ -842,16 +823,6 @@ class DailyTransportClient(EventHandler):
Args:
settings: SIP call transfer settings.
"""
session_id = (
settings.get("sessionId") or self._dial_out_session_id or self._dial_in_session_id
)
if not session_id:
logger.error("Unable to transfer SIP call: 'sessionId' is not set")
return
# Update 'sessionId' field.
settings["sessionId"] = session_id
future = self._get_event_loop().create_future()
self._client.sip_call_transfer(settings, completion=completion_callback(future))
await future
@@ -1170,7 +1141,6 @@ class DailyTransportClient(EventHandler):
Args:
data: Dial-in connection data.
"""
self._dial_in_session_id = data["sessionId"] if "sessionId" in data else ""
self._call_event_callback(self._callbacks.on_dialin_connected, data)
def on_dialin_ready(self, sip_endpoint: str):
@@ -1187,9 +1157,6 @@ class DailyTransportClient(EventHandler):
Args:
data: Dial-in stop data.
"""
# Cleanup only if our session stopped.
if data.get("sessionId") == self._dial_in_session_id:
self._dial_in_session_id = ""
self._call_event_callback(self._callbacks.on_dialin_stopped, data)
def on_dialin_error(self, data: Any):
@@ -1198,9 +1165,6 @@ class DailyTransportClient(EventHandler):
Args:
data: Dial-in error data.
"""
# Cleanup only if our session errored out.
if data.get("sessionId") == self._dial_in_session_id:
self._dial_in_session_id = ""
self._call_event_callback(self._callbacks.on_dialin_error, data)
def on_dialin_warning(self, data: Any):
@@ -1235,7 +1199,7 @@ class DailyTransportClient(EventHandler):
data: Dial-out stop data.
"""
# Cleanup only if our session stopped.
if data.get("sessionId") == self._dial_out_session_id:
if data["sessionId"] == self._dial_out_session_id:
self._dial_out_session_id = ""
self._call_event_callback(self._callbacks.on_dialout_stopped, data)
@@ -1246,7 +1210,7 @@ class DailyTransportClient(EventHandler):
data: Dial-out error data.
"""
# Cleanup only if our session errored out.
if data.get("sessionId") == self._dial_out_session_id:
if data["sessionId"] == self._dial_out_session_id:
self._dial_out_session_id = ""
self._call_event_callback(self._callbacks.on_dialout_error, data)
@@ -1803,31 +1767,6 @@ class DailyOutputTransport(BaseOutputTransport):
# Leave the room.
await self._client.leave()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process outgoing frames, including transport messages.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
logger.debug(f"Got a DailyUpdateRemoteParticipantsFrame: {frame}")
await self._client.update_remote_participants(frame.remote_participants)
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process outgoing frames, including transport messages.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
await self._client.update_remote_participants(frame.remote_participants)
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
"""Send a transport message to participants.
@@ -1923,7 +1862,6 @@ class DailyTransport(BaseTransport):
on_active_speaker_changed=self._on_active_speaker_changed,
on_joined=self._on_joined,
on_left=self._on_left,
on_before_leave=self._on_before_leave,
on_error=self._on_error,
on_app_message=self._on_app_message,
on_call_state_updated=self._on_call_state_updated,
@@ -1987,10 +1925,6 @@ class DailyTransport(BaseTransport):
self._register_event_handler("on_recording_started")
self._register_event_handler("on_recording_stopped")
self._register_event_handler("on_recording_error")
self._register_event_handler("on_before_disconnect", sync=True)
# Deprecated
self._register_event_handler("on_joined")
self._register_event_handler("on_left")
#
# BaseTransport
@@ -2242,10 +2176,6 @@ class DailyTransport(BaseTransport):
"""Handle room left events."""
await self._call_event_handler("on_left")
async def _on_before_leave(self):
"""Handle before leave room events."""
await self._call_event_handler("on_before_disconnect")
async def _on_error(self, error):
"""Handle error events and push error frames."""
await self._call_event_handler("on_error", error)
@@ -2385,7 +2315,7 @@ class DailyTransport(BaseTransport):
"""Handle participant updated events."""
await self._call_event_handler("on_participant_updated", participant)
async def _on_transcription_message(self, message: Mapping[str, Any]) -> None:
async def _on_transcription_message(self, message: Dict[str, Any]) -> None:
"""Handle transcription message events."""
await self._call_event_handler("on_transcription_message", message)

View File

@@ -114,7 +114,6 @@ class LiveKitCallbacks(BaseModel):
on_connected: Callable[[], Awaitable[None]]
on_disconnected: Callable[[], Awaitable[None]]
on_before_disconnect: Callable[[], Awaitable[None]]
on_participant_connected: Callable[[str], Awaitable[None]]
on_participant_disconnected: Callable[[str], Awaitable[None]]
on_audio_track_subscribed: Callable[[str], Awaitable[None]]
@@ -283,7 +282,6 @@ class LiveKitTransportClient:
return
logger.info(f"Disconnecting from {self._room_name}")
await self._callbacks.on_before_disconnect()
await self.room.disconnect()
self._connected = False
logger.info(f"Disconnected from {self._room_name}")
@@ -920,7 +918,6 @@ class LiveKitTransport(BaseTransport):
callbacks = LiveKitCallbacks(
on_connected=self._on_connected,
on_disconnected=self._on_disconnected,
on_before_disconnect=self._on_before_disconnect,
on_participant_connected=self._on_participant_connected,
on_participant_disconnected=self._on_participant_disconnected,
on_audio_track_subscribed=self._on_audio_track_subscribed,
@@ -950,7 +947,6 @@ class LiveKitTransport(BaseTransport):
self._register_event_handler("on_first_participant_joined")
self._register_event_handler("on_participant_left")
self._register_event_handler("on_call_state_updated")
self._register_event_handler("on_before_disconnect", sync=True)
def input(self) -> LiveKitInputTransport:
"""Get the input transport for receiving media and events.
@@ -1045,10 +1041,6 @@ class LiveKitTransport(BaseTransport):
"""Handle room disconnected events."""
await self._call_event_handler("on_disconnected")
async def _on_before_disconnect(self):
"""Handle before disconnection room events."""
await self._call_event_handler("on_before_disconnect")
async def _on_participant_connected(self, participant_id: str):
"""Handle participant connected events."""
await self._call_event_handler("on_participant_connected", participant_id)

View File

@@ -95,20 +95,15 @@ class SmallWebRTCTrack:
enable/disable control and frame discarding for audio and video streams.
"""
def __init__(self, receiver):
def __init__(self, track: MediaStreamTrack):
"""Initialize the WebRTC track wrapper.
Args:
receiver: The RemoteStreamTrack receiver instance.
track: The underlying MediaStreamTrack to wrap.
index: The index of the track in the transceiver (0 for mic, 1 for cam, 2 for screen)
"""
self._receiver = receiver
# Configuring the receiver for not consuming the track by default to prevent memory grow
self._receiver._enabled = False
self._track = receiver.track
self._track = track
self._enabled = True
self._last_recv_time: float = 0.0
self._idle_task: Optional[asyncio.Task] = None
self._idle_timeout: float = 2.0 # seconds before discarding old frames
def set_enabled(self, enabled: bool) -> None:
"""Enable or disable the track.
@@ -143,44 +138,13 @@ class SmallWebRTCTrack:
async def recv(self) -> Optional[Frame]:
"""Receive the next frame from the track.
Enables the internal receiving state and starts idle watcher.
Returns:
The next frame, except for video tracks, where it returns the frame only if the track is enabled, otherwise, returns None.
"""
self._receiver._enabled = True
self._last_recv_time = time.time()
# start idle watcher if not already running
if not self._idle_task or self._idle_task.done():
self._idle_task = asyncio.create_task(self._idle_watcher())
if not self._enabled and self._track.kind == "video":
return None
return await self._track.recv()
async def _idle_watcher(self):
"""Disable receiving if idle for more than _idle_timeout and monitor queue size."""
while self._receiver._enabled:
await asyncio.sleep(self._idle_timeout)
idle_duration = time.time() - self._last_recv_time
if idle_duration >= self._idle_timeout:
# discard old frames to prevent memory growth
logger.debug(
f"Disabling receiver for {self._track.kind} track after {idle_duration:.2f}s idle"
)
await self.discard_old_frames()
self._receiver._enabled = False
def stop(self):
"""Stop receiving frames from the track."""
self._receiver._enabled = False
if self._idle_task:
self._idle_task.cancel()
self._idle_task = None
if self._track:
self._track.stop()
def __getattr__(self, name):
"""Forward attribute access to the underlying track.
@@ -490,10 +454,6 @@ class SmallWebRTCConnection(BaseObject):
async def _close(self):
"""Close the peer connection and cleanup resources."""
for track in self._track_map.values():
if track:
track.stop()
self._track_map.clear()
if self._pc:
await self._pc.close()
self._message_queue.clear()
@@ -566,8 +526,8 @@ class SmallWebRTCConnection(BaseObject):
logger.warning("No audio transceiver is available")
return None
receiver = transceivers[AUDIO_TRANSCEIVER_INDEX].receiver
audio_track = SmallWebRTCTrack(receiver) if receiver else None
track = transceivers[AUDIO_TRANSCEIVER_INDEX].receiver.track
audio_track = SmallWebRTCTrack(track) if track else None
self._track_map[AUDIO_TRANSCEIVER_INDEX] = audio_track
return audio_track
@@ -588,8 +548,8 @@ class SmallWebRTCConnection(BaseObject):
logger.warning("No video transceiver is available")
return None
receiver = transceivers[VIDEO_TRANSCEIVER_INDEX].receiver
video_track = SmallWebRTCTrack(receiver) if receiver else None
track = transceivers[VIDEO_TRANSCEIVER_INDEX].receiver.track
video_track = SmallWebRTCTrack(track) if track else None
self._track_map[VIDEO_TRANSCEIVER_INDEX] = video_track
return video_track
@@ -610,8 +570,8 @@ class SmallWebRTCConnection(BaseObject):
logger.warning("No screen video transceiver is available")
return None
receiver = transceivers[SCREEN_VIDEO_TRANSCEIVER_INDEX].receiver
video_track = SmallWebRTCTrack(receiver) if receiver else None
track = transceivers[SCREEN_VIDEO_TRANSCEIVER_INDEX].receiver.track
video_track = SmallWebRTCTrack(track) if track else None
self._track_map[SCREEN_VIDEO_TRANSCEIVER_INDEX] = video_track
return video_track

View File

@@ -14,33 +14,13 @@ and async cleanup for all Pipecat components.
import asyncio
import inspect
from abc import ABC
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from typing import Optional
from loguru import logger
from pipecat.utils.utils import obj_count, obj_id
@dataclass
class EventHandler:
"""Data class to store event handlers information.
This data class stores the event name, a list of handlers to run for this
event, and whether these handlers will be executed in a task.
Attributes:
name (str): The name of the event handler.
handlers (List[Any]): A list of functions to be called when this event is triggered.
is_sync (bool): Indicates whether the functions are executed in a task.
"""
name: str
handlers: List[Any]
is_sync: bool
class BaseObject(ABC):
"""Abstract base class providing common functionality for Pipecat objects.
@@ -61,7 +41,7 @@ class BaseObject(ABC):
self._name = name or f"{self.__class__.__name__}#{obj_count(self)}"
# Registered event handlers.
self._event_handlers: Dict[str, EventHandler] = {}
self._event_handlers: dict = {}
# Set of tasks being executed. When a task finishes running it gets
# automatically removed from the set. When we cleanup we wait for all
@@ -123,21 +103,18 @@ class BaseObject(ABC):
Can be sync or async.
"""
if event_name in self._event_handlers:
self._event_handlers[event_name].handlers.append(handler)
self._event_handlers[event_name].append(handler)
else:
logger.warning(f"Event handler {event_name} not registered")
def _register_event_handler(self, event_name: str, sync: bool = False):
def _register_event_handler(self, event_name: str):
"""Register an event handler type.
Args:
event_name: The name of the event type to register.
sync: Whether this event handler will be executed in a task.
"""
if event_name not in self._event_handlers:
self._event_handlers[event_name] = EventHandler(
name=event_name, handlers=[], is_sync=sync
)
self._event_handlers[event_name] = []
else:
logger.warning(f"Event handler {event_name} not registered")
@@ -149,43 +126,34 @@ class BaseObject(ABC):
*args: Positional arguments to pass to event handlers.
**kwargs: Keyword arguments to pass to event handlers.
"""
if event_name not in self._event_handlers:
# If we haven't registered an event handler, we don't need to do
# anything.
if not self._event_handlers.get(event_name):
return
event_handler = self._event_handlers[event_name]
# Create the task.
task = asyncio.create_task(self._run_task(event_name, *args, **kwargs))
for handler in event_handler.handlers:
if event_handler.is_sync:
# Just run the handler.
await self._run_handler(event_handler.name, handler, *args, **kwargs)
else:
# Create the task. Note that this is a task per each function
# handler. Users can register to an event handler multiple
# times.
task = asyncio.create_task(
self._run_handler(event_handler.name, handler, *args, **kwargs)
)
# Add it to our list of event tasks.
self._event_tasks.add((event_name, task))
# Add it to our list of event tasks.
self._event_tasks.add((event_name, task))
# Remove the task from the event tasks list when the task completes.
task.add_done_callback(self._event_task_finished)
# Remove the task from the event tasks list when the task completes.
task.add_done_callback(self._event_task_finished)
async def _run_handler(self, event_name: str, handler, *args, **kwargs):
async def _run_task(self, event_name: str, *args, **kwargs):
"""Execute all handlers for an event.
Args:
event_name: The event name for this handler.
handler: The handler function to run.
event_name: The name of the event being handled.
*args: Positional arguments to pass to handlers.
**kwargs: Keyword arguments to pass to handlers.
"""
try:
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.exception(f"Exception in event handler {event_name}: {e}")

44
uv.lock generated
View File

@@ -215,7 +215,7 @@ wheels = [
[[package]]
name = "aiortc"
version = "1.13.0"
version = "1.11.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aioice" },
@@ -227,9 +227,15 @@ dependencies = [
{ name = "pylibsrtp" },
{ name = "pyopenssl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/62/03/bc947d74c548e0c17cf94e5d5bdacaed0ee9e5b2bb7b8b8cf1ac7a7c01ec/aiortc-1.13.0.tar.gz", hash = "sha256:5d209975c22d0910fb5a0f0e2caa828f2da966c53580f7c7170ac3a16a871620", size = 1179894, upload-time = "2025-05-27T03:23:59.017Z" }
sdist = { url = "https://files.pythonhosted.org/packages/91/60/7bb59c28c6e65e5d74258d392f531f555f12ab519b0f467ffd6b76650c20/aiortc-1.11.0.tar.gz", hash = "sha256:50b9d86f6cba87d95ce7c6b051949208b48f8062b231837aed8f049045f11a28", size = 1179206, upload-time = "2025-03-28T10:00:50.327Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/87/29/765633cab5f1888890f5f172d1d53009b9b14e079cdfa01a62d9896a9ea9/aiortc-1.13.0-py3-none-any.whl", hash = "sha256:9ccccec98796f6a96bd1c3dd437a06da7e0f57521c96bd56e4b965a91b03a0a0", size = 92910, upload-time = "2025-05-27T03:23:57.344Z" },
{ url = "https://files.pythonhosted.org/packages/17/34/5c34707ce58ca0fd3b157a3b478255a8445950bf2b87f048864eb7233f5f/aiortc-1.11.0-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:018b0d623c6b88b9cd4bd3b700dece943731d081c50fef1b866a43f6b46a7343", size = 1218501, upload-time = "2025-03-28T10:00:39.44Z" },
{ url = "https://files.pythonhosted.org/packages/1b/d7/cc1d483097f2ae605e07e9f7af004c473da5756af25149823de2047eb991/aiortc-1.11.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:0bdd6477ac9227e9fd80ca079d6614b5b0b45c1887f214e67cddc7fde2692d95", size = 898901, upload-time = "2025-03-28T10:00:41.709Z" },
{ url = "https://files.pythonhosted.org/packages/00/64/caf7e7b3c49d492ba79256638644812d66ca68dcfa8e27307fd58f564555/aiortc-1.11.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bc311672d25091061eaa9c3fe1adbb7f2ef677c6fabd2cffdff8c724c1f81ce7", size = 1750429, upload-time = "2025-03-28T10:00:43.802Z" },
{ url = "https://files.pythonhosted.org/packages/11/12/3e37c16de90ead788e45bfe10fe6fea66711919d2bf3826f663779824de0/aiortc-1.11.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f57c5804135d357291f25de65faf7a844d7595c6eb12493e0a304f4d5c34d660", size = 1867914, upload-time = "2025-03-28T10:00:45.049Z" },
{ url = "https://files.pythonhosted.org/packages/aa/a9/f0a32b3966e8bc8cf4faea558b6e40171eacfc04b14e8b077bebc6ec57e3/aiortc-1.11.0-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:43ff9f5c2a5d657fbb4ab8c9b4e4c9d2967753e03c4539eb1dd82014816ef6a0", size = 1893742, upload-time = "2025-03-28T10:00:46.393Z" },
{ url = "https://files.pythonhosted.org/packages/a5/c5/57f997af08ceca5e78a5f23e4cb93445236eff39af0c9940495ae7069de4/aiortc-1.11.0-cp39-abi3-win32.whl", hash = "sha256:5e10a50ca6df3abc32811e1c84fe131b7d20d3e5349f521ca430683ca9a96c70", size = 923160, upload-time = "2025-03-28T10:00:47.578Z" },
{ url = "https://files.pythonhosted.org/packages/b2/ce/7f969694b950f673d7bf5ec697608366bd585ff741760e107e3eff55b131/aiortc-1.11.0-cp39-abi3-win_amd64.whl", hash = "sha256:67debf5ce89fb12c64b4be24e70809b29f1bb0e635914760d0c2e1193955ff62", size = 1009541, upload-time = "2025-03-28T10:00:49.09Z" },
]
[[package]]
@@ -3154,12 +3160,12 @@ name = "mlx-lm"
version = "0.27.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "jinja2", marker = "sys_platform == 'darwin'" },
{ name = "mlx", marker = "sys_platform == 'darwin'" },
{ name = "numpy", marker = "sys_platform == 'darwin'" },
{ name = "protobuf", marker = "sys_platform == 'darwin'" },
{ name = "pyyaml", marker = "sys_platform == 'darwin'" },
{ name = "transformers", marker = "sys_platform == 'darwin'" },
{ name = "jinja2" },
{ name = "mlx" },
{ name = "numpy" },
{ name = "protobuf" },
{ name = "pyyaml" },
{ name = "transformers" },
]
sdist = { url = "https://files.pythonhosted.org/packages/41/77/e8d3a82658a2070bc392a583dd08c8d24088433e920eac4905bf882255ad/mlx_lm-0.27.1.tar.gz", hash = "sha256:36640fb64c909cfd9baddf37b16e7d3b94a1a141033e6b7ea7a0ef5a965fb4ae", size = 185170, upload-time = "2025-09-04T16:06:57.949Z" }
wheels = [
@@ -3652,7 +3658,7 @@ name = "nvidia-cudnn-cu12"
version = "9.5.1.17"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-cublas-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-cublas-cu12" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/78/4535c9c7f859a64781e43c969a3a7e84c54634e319a996d43ef32ce46f83/nvidia_cudnn_cu12-9.5.1.17-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:30ac3869f6db17d170e0e556dd6cc5eee02647abc31ca856634d5a40f82c15b2", size = 570988386, upload-time = "2024-10-25T19:54:26.39Z" },
@@ -3663,7 +3669,7 @@ name = "nvidia-cufft-cu12"
version = "11.3.0.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-nvjitlink-cu12" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/8f/16/73727675941ab8e6ffd86ca3a4b7b47065edcca7a997920b831f8147c99d/nvidia_cufft_cu12-11.3.0.4-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:ccba62eb9cef5559abd5e0d54ceed2d9934030f51163df018532142a8ec533e5", size = 200221632, upload-time = "2024-11-20T17:41:32.357Z" },
@@ -3692,9 +3698,9 @@ name = "nvidia-cusolver-cu12"
version = "11.7.1.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-cublas-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-cusparse-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-cublas-cu12" },
{ name = "nvidia-cusparse-cu12" },
{ name = "nvidia-nvjitlink-cu12" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/f0/6e/c2cf12c9ff8b872e92b4a5740701e51ff17689c4d726fca91875b07f655d/nvidia_cusolver_cu12-11.7.1.2-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:e9e49843a7707e42022babb9bcfa33c29857a93b88020c4e4434656a655b698c", size = 158229790, upload-time = "2024-11-20T17:43:43.211Z" },
@@ -3706,7 +3712,7 @@ name = "nvidia-cusparse-cu12"
version = "12.5.4.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-nvjitlink-cu12" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/06/1e/b8b7c2f4099a37b96af5c9bb158632ea9e5d9d27d7391d7eb8fc45236674/nvidia_cusparse_cu12-12.5.4.2-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:7556d9eca156e18184b94947ade0fba5bb47d69cec46bf8660fd2c71a4b48b73", size = 216561367, upload-time = "2024-11-20T17:44:54.824Z" },
@@ -4465,7 +4471,7 @@ requires-dist = [
{ name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.0.0" },
{ name = "aiofiles", specifier = ">=24.1.0,<25" },
{ name = "aiohttp", specifier = ">=3.11.12,<4" },
{ name = "aiortc", marker = "extra == 'webrtc'", specifier = "~=1.13.0" },
{ name = "aiortc", marker = "extra == 'webrtc'", specifier = "~=1.11.0" },
{ name = "anthropic", marker = "extra == 'anthropic'", specifier = "~=0.49.0" },
{ name = "audioop-lts", marker = "python_full_version >= '3.13'", specifier = "~=0.2.1" },
{ name = "aws-sdk-bedrock-runtime", marker = "python_full_version >= '3.12' and extra == 'aws-nova-sonic'", specifier = "~=0.0.2" },
@@ -7163,7 +7169,7 @@ name = "triton"
version = "3.3.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "setuptools", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "setuptools" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/76/04/d54d3a6d077c646624dc9461b0059e23fd5d30e0dbe67471e3654aec81f9/triton-3.3.0-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:fad99beafc860501d7fcc1fb7045d9496cbe2c882b1674640304949165a916e7", size = 156441993, upload-time = "2025-04-09T20:27:25.107Z" },
@@ -7716,8 +7722,8 @@ name = "xformers"
version = "0.0.30"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "torch", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "numpy" },
{ name = "torch" },
]
sdist = { url = "https://files.pythonhosted.org/packages/bf/f7/dd2269cce89fd1221947dd7cc3a60707ffe721ef55c1803ac3b1a1f7ae5c/xformers-0.0.30.tar.gz", hash = "sha256:a12bf3eb39e294cdbe8a7253ac9b665f41bac61d6d98df174e34ef7bdb6f2fc4", size = 10214139, upload-time = "2025-04-28T20:51:02.045Z" }
wheels = [