Compare commits

...

29 Commits

Author SHA1 Message Date
Chad Bailey
05863ded53 add remote participant updates to DailyTransport 2025-09-19 21:15:07 +00:00
chadbailey59
6ab4a48d8f Add remote participant updates to DailyTransport (#2694)
* add remote participant updates to DailyTransport

* cleanup

* cleanup

* ruff cleanup again
2025-09-19 21:15:07 +00:00
Aleix Conchillo Flaqué
89e0092159 BaseObject: run each handler for the same event in a separate task 2025-09-19 21:15:07 +00:00
Aleix Conchillo Flaqué
0de31dab79 LiveKitTransport: added synchronous before_disconnect event 2025-09-19 21:15:07 +00:00
Aleix Conchillo Flaqué
10ff93307d DailyTransport: added synchronous on_before_disconnect event 2025-09-19 21:15:07 +00:00
Aleix Conchillo Flaqué
414c9e3bc8 BaseObject: allow synchronous event handlers 2025-09-19 21:15:07 +00:00
Chad Bailey
ba64f126a3 reset paused frame processors after interruption 2025-09-19 18:15:01 +00:00
Chad Bailey
d28e3881a7 add remote participant updates to DailyTransport 2025-09-19 18:11:06 +00:00
Mark Backman
7df7395dd1 Merge pull request #2692 from pipecat-ai/mb/lazy-load-smallwebrtc-request
Lazy load SmallWebRTCRequest, SmallWebRTCRequestHandler in runner
2025-09-19 10:43:43 -07:00
Mark Backman
0885bc9cdf Lazy load SmallWebRTCRequest, SmallWebRTCRequestHandler in runner 2025-09-19 13:28:01 -04:00
Aleix Conchillo Flaqué
0204f6a95d Merge pull request #2686 from pipecat-ai/aleix/silero-vad-v6
audio(vad): update Silero VAD model to v6
2025-09-18 20:31:10 -07:00
Mark Backman
b0bf653f04 Merge pull request #2679 from pipecat-ai/mb/gladia-remove-confidence
GladiaSTTService: deprecate confidence arg
2025-09-18 17:41:33 -07:00
Mark Backman
e8a676eb36 GladiaSTTService: deprecate confidence arg 2025-09-18 20:38:53 -04:00
Mark Backman
ca96eef1f3 Merge pull request #2680 from pipecat-ai/mb/dial-in-session-id
DailyTransport sip_call_transfer now automatically receives session_id
2025-09-18 17:36:51 -07:00
Mark Backman
8e1637d6c7 DailyTransport sip_call_transfer now automatically receives session_id 2025-09-18 20:34:14 -04:00
Filipi da Silva Fuchter
367200c0ad Merge pull request #2682 from pipecat-ai/filipi/smallwebrtc_leak
Smallwebrtc memory leak
2025-09-18 18:56:08 -03:00
Filipi Fuchter
766e1948a6 Mentioning the fix in the changelog. 2025-09-18 18:43:33 -03:00
Aleix Conchillo Flaqué
f369683b8b audio(vad): update Silero VAD model to v6 2025-09-18 14:06:37 -07:00
Aleix Conchillo Flaqué
461025d1cc Merge pull request #2684 from pipecat-ai/aleix/readme-whisker
README: add whisker debugger
2025-09-18 13:27:35 -07:00
Aleix Conchillo Flaqué
ac88706f38 README: add whisker debugger 2025-09-18 13:22:54 -07:00
Filipi Fuchter
93a89449b8 Adding warnings in case queue grows. 2025-09-18 16:43:57 -03:00
Filipi Fuchter
199bf72945 Preventing memory growth if we are not consuming the track. 2025-09-18 16:16:10 -03:00
Filipi Fuchter
d20e4125f6 Updating aiortc to the latest version. 2025-09-18 15:22:46 -03:00
Filipi Fuchter
c1baed642e Script to monitor memory usage. 2025-09-18 14:43:42 -03:00
Mark Backman
33ef68573f Merge pull request #2662 from pelguetat/fix-vertex-ai-global-location-support
feat: add support for global location in Vertex AI base URL
2025-09-18 10:25:10 -07:00
Pablo Elgueta
3c1b41df13 docs: add changelog entry for global location support
- Document the new global location support in GoogleVertexLLMService
- Explain the difference between regional and global API hosts
- Follow Keep a Changelog format
2025-09-18 17:39:03 +01:00
kompfner
fca4ecc73c Merge pull request #2675 from pipecat-ai/pk/service-switcher-logic-simplification
Simplify a bit of logic in `ServiceSwitcher`
2025-09-18 09:17:22 -04:00
Paul Kompfner
cfa333508b Simplify a bit of logic in ServiceSwitcher 2025-09-17 21:03:38 -04:00
Pablo Elgueta
69f0a75882 feat: add support for global location in Vertex AI base URL
- Update _get_base_url method to handle 'global' location case
- Use 'aiplatform.googleapis.com' for global locations
- Use '{location}-aiplatform.googleapis.com' for regional locations
- Maintains backward compatibility with existing regional endpoints
2025-09-16 10:28:22 -03:00
15 changed files with 320 additions and 108 deletions

View File

@@ -9,6 +9,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `on_before_disconnect` synchronous event to `DailyTransport` and
`LiveKitTransport`.
- It is now possible to register synchronous event handlers. By default, all
event handlers are executed in a separate task. However, in some cases we want
to guarantee order of execution, for example, executing something before
disconnecting a transport.
```python
self._register_event_handler("on_event_name", sync=True)
```
- Added support for global location in `GoogleVertexLLMService`. The service now
supports both regional locations (e.g., "us-east4") and the "global" location
for Vertex AI endpoints. When using "global" location, the service will use
`aiplatform.googleapis.com` as the API host instead of the regional format.
- Added `on_pipeline_finished` event to `PipelineTask`. This event will get
fired when the pipeline is done running. This can be the result of a
`StopFrame`, `CancelFrame` or `EndFrame`.
@@ -21,6 +38,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Updated Silero VAD model to v6.
- Updated `livekit` to 1.0.13.
- `torch` and `torchaudio` are no longer required for running Smart Turn
@@ -36,12 +55,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Deprecated
- `GladiaSTTService`'s `confidence` arg is deprecated. `confidence` is no
longer needed to determine which transcription or translation frames to
emit.
- `PipelineTask` events `on_pipeline_stopped`, `on_pipeline_ended` and
`on_pipeline_cancelled` are now deprecated. Use `on_pipeline_finished`
instead.
### Fixed
- Fixed an issue where multiple handlers for an event would not run in parallel.
- Fixed `DailyTransport.sip_call_transfer()` to automatically use the session
ID from the `on_dialin_connected` event, when not explicitly provided. Now
supports cold transfers (from incoming dial-in calls) by automatically
tracking session IDs from connection events.
- Fixed a memory leak in `SmallWebRTCTransport`. In `aiortc`, when you receive
a `MediaStreamTrack` (audio or video), frames are produced asynchronously. If
the code never consumes these frames, they are queued in memory, causing a
memory leak.
- Fixed an issue in `AsyncAITTSService`, where `TTSTextFrames` were not being
pushed.

View File

@@ -21,6 +21,8 @@
🧭 Looking to build structured conversations? Check out [Pipecat Flows](https://github.com/pipecat-ai/pipecat-flows) for managing complex conversational states and transitions.
🔍 Looking for help debugging your pipeline and processors? Check out [Whisker](https://github.com/pipecat-ai/whisker), a real-time Pipecat debugger.
## 🧠 Why Pipecat?
- **Voice-first**: Integrates speech recognition, text-to-speech, and conversation handling

View File

@@ -106,7 +106,7 @@ tavus=[]
together = []
tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ]
ultravox = [ "transformers>=4.48.0", "vllm>=0.9.0" ]
webrtc = [ "aiortc~=1.11.0", "opencv-python~=4.11.0.86" ]
webrtc = [ "aiortc~=1.13.0", "opencv-python~=4.11.0.86" ]
websocket = [ "pipecat-ai[websockets-base]", "fastapi>=0.115.6,<0.117.0" ]
websockets-base = [ "websockets>=13.1,<16.0" ]
whisper = [ "faster-whisper~=1.1.1" ]

12
scripts/mem-watch.sh Executable file
View File

@@ -0,0 +1,12 @@
#!/bin/bash
PID=$1
while true; do
# Clear the screen
clear
# Print the header + RSS in GB
ps -p "$PID" -o pid,comm,rss | \
awk 'NR==1 {print $0, "rss_GB"} NR>1 {printf "%s %s %s %.2f\n", $1,$2,$3,$3/1024/1024}'
sleep 1
done

View File

@@ -169,11 +169,4 @@ class ServiceSwitcher(ParallelPipeline, Generic[StrategyType]):
service_switcher_filter_frame = ServiceSwitcher.ServiceSwitcherFilterFrame(
active_service=self.strategy.active_service
)
# Hack: we need access ParallelPipeline internals here to queue the
# frame in each of the pipelines.
# Why not just call super().process_frame(service_switcher_filter_frame, direction),
# you ask? Because that would also send this internal-only frame
# down the "main" pipeline instead of only down the individual
# branches of the parallel pipeline, which we want to avoid.
for p in self._pipelines:
await p.queue_frame(service_switcher_filter_frame, direction)
await super().process_frame(service_switcher_filter_frame, direction)

View File

@@ -568,11 +568,17 @@ class FrameProcessor(BaseObject):
"""Pause processing of queued frames."""
logger.trace(f"{self}: pausing frame processing")
self.__should_block_frames = True
# We should also unset the process event here, in case it was set immediately after an interruption
if self.__process_event:
self.__process_event.clear()
async def pause_processing_system_frames(self):
"""Pause processing of queued system frames."""
logger.trace(f"{self}: pausing system frame processing")
self.__should_block_system_frames = True
# We should also unset the input event here, in case it was set immediately after an interruption
if self.__input_event:
self.__input_event.clear()
async def resume_processing_frames(self):
"""Resume processing of queued frames."""

View File

@@ -78,10 +78,6 @@ from pipecat.runner.types import (
SmallWebRTCRunnerArguments,
WebSocketRunnerArguments,
)
from pipecat.transports.smallwebrtc.request_handler import (
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)
try:
import uvicorn
@@ -186,6 +182,10 @@ def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "lo
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)
except ImportError as e:
logger.error(f"WebRTC transport dependencies not installed: {e}")
return

View File

@@ -13,6 +13,7 @@ supporting multiple languages, custom vocabulary, and various audio processing o
import asyncio
import base64
import json
import warnings
from typing import Any, AsyncGenerator, Dict, Literal, Optional
import aiohttp
@@ -173,8 +174,6 @@ class _InputParamsDescriptor:
"""Descriptor for backward compatibility with deprecation warning."""
def __get__(self, obj, objtype=None):
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
@@ -208,7 +207,7 @@ class GladiaSTTService(STTService):
api_key: str,
region: Literal["us-west", "eu-west"] | None = None,
url: str = "https://api.gladia.io/v2/live",
confidence: float = 0.5,
confidence: Optional[float] = None,
sample_rate: Optional[int] = None,
model: str = "solaria-1",
params: Optional[GladiaInputParams] = None,
@@ -224,6 +223,11 @@ class GladiaSTTService(STTService):
region: Region used to process audio. eu-west or us-west. Defaults to eu-west.
url: Gladia API URL. Defaults to "https://api.gladia.io/v2/live".
confidence: Minimum confidence threshold for transcriptions (0.0-1.0).
.. deprecated:: 0.0.86
The 'confidence' parameter is deprecated and will be removed in a future version.
No confidence threshold is applied.
sample_rate: Audio sample rate in Hz. If None, uses service default.
model: Model to use for transcription. Defaults to "solaria-1".
params: Additional configuration parameters for Gladia service.
@@ -236,7 +240,6 @@ class GladiaSTTService(STTService):
params = params or GladiaInputParams()
# Warn about deprecated language parameter if it's used
if params.language is not None:
with warnings.catch_warnings():
warnings.simplefilter("always")
@@ -247,11 +250,20 @@ class GladiaSTTService(STTService):
stacklevel=2,
)
if confidence:
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"The 'confidence' parameter is deprecated and will be removed in a future version. "
"No confidence threshold is applied.",
DeprecationWarning,
stacklevel=2,
)
self._api_key = api_key
self._region = region
self._url = url
self.set_model_name(model)
self._confidence = confidence
self._params = params
self._websocket = None
self._receive_task = None
@@ -575,43 +587,40 @@ class GladiaSTTService(STTService):
elif content["type"] == "transcript":
utterance = content["data"]["utterance"]
confidence = utterance.get("confidence", 0)
language = utterance["language"]
transcript = utterance["text"]
is_final = content["data"]["is_final"]
if confidence >= self._confidence:
if is_final:
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=content,
)
if is_final:
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=content,
)
await self._handle_transcription(
transcript=transcript,
is_final=is_final,
language=language,
)
else:
await self.push_frame(
InterimTranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=content,
)
)
await self._handle_transcription(
transcript=transcript,
is_final=is_final,
language=language,
)
else:
await self.push_frame(
InterimTranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
language,
result=content,
)
)
elif content["type"] == "translation":
translated_utterance = content["data"]["translated_utterance"]
original_language = content["data"]["original_language"]
translated_language = translated_utterance["language"]
confidence = translated_utterance.get("confidence", 0)
translation = translated_utterance["text"]
if translated_language != original_language and confidence >= self._confidence:
if translated_language != original_language:
await self.push_frame(
TranslationFrame(
translation, "", time_now_iso8601(), translated_language

View File

@@ -83,14 +83,23 @@ class GoogleVertexLLMService(OpenAILLMService):
self._api_key = self._get_api_token(credentials, credentials_path)
super().__init__(
api_key=self._api_key, base_url=base_url, model=model, params=params, **kwargs
api_key=self._api_key,
base_url=base_url,
model=model,
params=params,
**kwargs,
)
@staticmethod
def _get_base_url(params: InputParams) -> str:
"""Construct the base URL for Vertex AI API."""
# Determine the correct API host based on location
if params.location == "global":
api_host = "aiplatform.googleapis.com"
else:
api_host = f"{params.location}-aiplatform.googleapis.com"
return (
f"https://{params.location}-aiplatform.googleapis.com/v1/"
f"https://{api_host}/v1/"
f"projects/{params.project_id}/locations/{params.location}/endpoints/openapi"
)
@@ -118,12 +127,14 @@ class GoogleVertexLLMService(OpenAILLMService):
if credentials:
# Parse and load credentials from JSON string
creds = service_account.Credentials.from_service_account_info(
json.loads(credentials), scopes=["https://www.googleapis.com/auth/cloud-platform"]
json.loads(credentials),
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
elif credentials_path:
# Load credentials from JSON file
creds = service_account.Credentials.from_service_account_file(
credentials_path, scopes=["https://www.googleapis.com/auth/cloud-platform"]
credentials_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
else:
try:

View File

@@ -25,6 +25,7 @@ from pydantic import BaseModel
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
from pipecat.frames.frames import (
CancelFrame,
ControlFrame,
EndFrame,
ErrorFrame,
Frame,
@@ -41,6 +42,7 @@ from pipecat.frames.frames import (
UserAudioRawFrame,
UserImageRawFrame,
UserImageRequestFrame,
DataFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup
from pipecat.transcriptions.language import Language
@@ -105,6 +107,17 @@ class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
participant_id: Optional[str] = None
@dataclass
class DailyUpdateRemoteParticipantsFrame(ControlFrame):
"""Frame to update remote participants in Daily calls.
Parameters:
remote_participants: See https://reference-python.daily.co/api_reference.html#daily.CallClient.update_remote_participants.
"""
remote_participants: Mapping[str, Any] = None
class WebRTCVADAnalyzer(VADAnalyzer):
"""Voice Activity Detection analyzer using WebRTC.
@@ -215,6 +228,7 @@ class DailyCallbacks(BaseModel):
on_active_speaker_changed: Called when the active speaker of the call has changed.
on_joined: Called when bot successfully joined a room.
on_left: Called when bot left a room.
on_before_leave: Called when bot is about to leave the room.
on_error: Called when an error occurs.
on_app_message: Called when receiving an app message.
on_call_state_updated: Called when call state changes.
@@ -244,6 +258,7 @@ class DailyCallbacks(BaseModel):
on_active_speaker_changed: Callable[[Mapping[str, Any]], Awaitable[None]]
on_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
on_left: Callable[[], Awaitable[None]]
on_before_leave: Callable[[], Awaitable[None]]
on_error: Callable[[str], Awaitable[None]]
on_app_message: Callable[[Any, str], Awaitable[None]]
on_call_state_updated: Callable[[str], Awaitable[None]]
@@ -359,6 +374,7 @@ class DailyTransportClient(EventHandler):
self._transcription_ids = []
self._transcription_status = None
self._dial_out_session_id: str = ""
self._dial_in_session_id: str = ""
self._joining = False
self._joined = False
@@ -719,6 +735,9 @@ class DailyTransportClient(EventHandler):
logger.info(f"Leaving {self._room_url}")
# Call callback before leaving.
await self._callbacks.on_before_leave()
if self._params.transcription_enabled:
await self.stop_transcription()
@@ -823,6 +842,16 @@ class DailyTransportClient(EventHandler):
Args:
settings: SIP call transfer settings.
"""
session_id = (
settings.get("sessionId") or self._dial_out_session_id or self._dial_in_session_id
)
if not session_id:
logger.error("Unable to transfer SIP call: 'sessionId' is not set")
return
# Update 'sessionId' field.
settings["sessionId"] = session_id
future = self._get_event_loop().create_future()
self._client.sip_call_transfer(settings, completion=completion_callback(future))
await future
@@ -1141,6 +1170,7 @@ class DailyTransportClient(EventHandler):
Args:
data: Dial-in connection data.
"""
self._dial_in_session_id = data["sessionId"] if "sessionId" in data else ""
self._call_event_callback(self._callbacks.on_dialin_connected, data)
def on_dialin_ready(self, sip_endpoint: str):
@@ -1157,6 +1187,9 @@ class DailyTransportClient(EventHandler):
Args:
data: Dial-in stop data.
"""
# Cleanup only if our session stopped.
if data.get("sessionId") == self._dial_in_session_id:
self._dial_in_session_id = ""
self._call_event_callback(self._callbacks.on_dialin_stopped, data)
def on_dialin_error(self, data: Any):
@@ -1165,6 +1198,9 @@ class DailyTransportClient(EventHandler):
Args:
data: Dial-in error data.
"""
# Cleanup only if our session errored out.
if data.get("sessionId") == self._dial_in_session_id:
self._dial_in_session_id = ""
self._call_event_callback(self._callbacks.on_dialin_error, data)
def on_dialin_warning(self, data: Any):
@@ -1199,7 +1235,7 @@ class DailyTransportClient(EventHandler):
data: Dial-out stop data.
"""
# Cleanup only if our session stopped.
if data["sessionId"] == self._dial_out_session_id:
if data.get("sessionId") == self._dial_out_session_id:
self._dial_out_session_id = ""
self._call_event_callback(self._callbacks.on_dialout_stopped, data)
@@ -1210,7 +1246,7 @@ class DailyTransportClient(EventHandler):
data: Dial-out error data.
"""
# Cleanup only if our session errored out.
if data["sessionId"] == self._dial_out_session_id:
if data.get("sessionId") == self._dial_out_session_id:
self._dial_out_session_id = ""
self._call_event_callback(self._callbacks.on_dialout_error, data)
@@ -1767,6 +1803,31 @@ class DailyOutputTransport(BaseOutputTransport):
# Leave the room.
await self._client.leave()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process outgoing frames, including transport messages.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
logger.debug(f"Got a DailyUpdateRemoteParticipantsFrame: {frame}")
await self._client.update_remote_participants(frame.remote_participants)
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process outgoing frames, including transport messages.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
await self._client.update_remote_participants(frame.remote_participants)
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
"""Send a transport message to participants.
@@ -1862,6 +1923,7 @@ class DailyTransport(BaseTransport):
on_active_speaker_changed=self._on_active_speaker_changed,
on_joined=self._on_joined,
on_left=self._on_left,
on_before_leave=self._on_before_leave,
on_error=self._on_error,
on_app_message=self._on_app_message,
on_call_state_updated=self._on_call_state_updated,
@@ -1925,6 +1987,10 @@ class DailyTransport(BaseTransport):
self._register_event_handler("on_recording_started")
self._register_event_handler("on_recording_stopped")
self._register_event_handler("on_recording_error")
self._register_event_handler("on_before_disconnect", sync=True)
# Deprecated
self._register_event_handler("on_joined")
self._register_event_handler("on_left")
#
# BaseTransport
@@ -2176,6 +2242,10 @@ class DailyTransport(BaseTransport):
"""Handle room left events."""
await self._call_event_handler("on_left")
async def _on_before_leave(self):
"""Handle before leave room events."""
await self._call_event_handler("on_before_disconnect")
async def _on_error(self, error):
"""Handle error events and push error frames."""
await self._call_event_handler("on_error", error)
@@ -2315,7 +2385,7 @@ class DailyTransport(BaseTransport):
"""Handle participant updated events."""
await self._call_event_handler("on_participant_updated", participant)
async def _on_transcription_message(self, message: Dict[str, Any]) -> None:
async def _on_transcription_message(self, message: Mapping[str, Any]) -> None:
"""Handle transcription message events."""
await self._call_event_handler("on_transcription_message", message)

View File

@@ -114,6 +114,7 @@ class LiveKitCallbacks(BaseModel):
on_connected: Callable[[], Awaitable[None]]
on_disconnected: Callable[[], Awaitable[None]]
on_before_disconnect: Callable[[], Awaitable[None]]
on_participant_connected: Callable[[str], Awaitable[None]]
on_participant_disconnected: Callable[[str], Awaitable[None]]
on_audio_track_subscribed: Callable[[str], Awaitable[None]]
@@ -282,6 +283,7 @@ class LiveKitTransportClient:
return
logger.info(f"Disconnecting from {self._room_name}")
await self._callbacks.on_before_disconnect()
await self.room.disconnect()
self._connected = False
logger.info(f"Disconnected from {self._room_name}")
@@ -918,6 +920,7 @@ class LiveKitTransport(BaseTransport):
callbacks = LiveKitCallbacks(
on_connected=self._on_connected,
on_disconnected=self._on_disconnected,
on_before_disconnect=self._on_before_disconnect,
on_participant_connected=self._on_participant_connected,
on_participant_disconnected=self._on_participant_disconnected,
on_audio_track_subscribed=self._on_audio_track_subscribed,
@@ -947,6 +950,7 @@ class LiveKitTransport(BaseTransport):
self._register_event_handler("on_first_participant_joined")
self._register_event_handler("on_participant_left")
self._register_event_handler("on_call_state_updated")
self._register_event_handler("on_before_disconnect", sync=True)
def input(self) -> LiveKitInputTransport:
"""Get the input transport for receiving media and events.
@@ -1041,6 +1045,10 @@ class LiveKitTransport(BaseTransport):
"""Handle room disconnected events."""
await self._call_event_handler("on_disconnected")
async def _on_before_disconnect(self):
"""Handle before disconnection room events."""
await self._call_event_handler("on_before_disconnect")
async def _on_participant_connected(self, participant_id: str):
"""Handle participant connected events."""
await self._call_event_handler("on_participant_connected", participant_id)

View File

@@ -95,15 +95,20 @@ class SmallWebRTCTrack:
enable/disable control and frame discarding for audio and video streams.
"""
def __init__(self, track: MediaStreamTrack):
def __init__(self, receiver):
"""Initialize the WebRTC track wrapper.
Args:
track: The underlying MediaStreamTrack to wrap.
index: The index of the track in the transceiver (0 for mic, 1 for cam, 2 for screen)
receiver: The RemoteStreamTrack receiver instance.
"""
self._track = track
self._receiver = receiver
# Configuring the receiver for not consuming the track by default to prevent memory grow
self._receiver._enabled = False
self._track = receiver.track
self._enabled = True
self._last_recv_time: float = 0.0
self._idle_task: Optional[asyncio.Task] = None
self._idle_timeout: float = 2.0 # seconds before discarding old frames
def set_enabled(self, enabled: bool) -> None:
"""Enable or disable the track.
@@ -138,13 +143,44 @@ class SmallWebRTCTrack:
async def recv(self) -> Optional[Frame]:
"""Receive the next frame from the track.
Enables the internal receiving state and starts idle watcher.
Returns:
The next frame, except for video tracks, where it returns the frame only if the track is enabled, otherwise, returns None.
"""
self._receiver._enabled = True
self._last_recv_time = time.time()
# start idle watcher if not already running
if not self._idle_task or self._idle_task.done():
self._idle_task = asyncio.create_task(self._idle_watcher())
if not self._enabled and self._track.kind == "video":
return None
return await self._track.recv()
async def _idle_watcher(self):
"""Disable receiving if idle for more than _idle_timeout and monitor queue size."""
while self._receiver._enabled:
await asyncio.sleep(self._idle_timeout)
idle_duration = time.time() - self._last_recv_time
if idle_duration >= self._idle_timeout:
# discard old frames to prevent memory growth
logger.debug(
f"Disabling receiver for {self._track.kind} track after {idle_duration:.2f}s idle"
)
await self.discard_old_frames()
self._receiver._enabled = False
def stop(self):
"""Stop receiving frames from the track."""
self._receiver._enabled = False
if self._idle_task:
self._idle_task.cancel()
self._idle_task = None
if self._track:
self._track.stop()
def __getattr__(self, name):
"""Forward attribute access to the underlying track.
@@ -454,6 +490,10 @@ class SmallWebRTCConnection(BaseObject):
async def _close(self):
"""Close the peer connection and cleanup resources."""
for track in self._track_map.values():
if track:
track.stop()
self._track_map.clear()
if self._pc:
await self._pc.close()
self._message_queue.clear()
@@ -526,8 +566,8 @@ class SmallWebRTCConnection(BaseObject):
logger.warning("No audio transceiver is available")
return None
track = transceivers[AUDIO_TRANSCEIVER_INDEX].receiver.track
audio_track = SmallWebRTCTrack(track) if track else None
receiver = transceivers[AUDIO_TRANSCEIVER_INDEX].receiver
audio_track = SmallWebRTCTrack(receiver) if receiver else None
self._track_map[AUDIO_TRANSCEIVER_INDEX] = audio_track
return audio_track
@@ -548,8 +588,8 @@ class SmallWebRTCConnection(BaseObject):
logger.warning("No video transceiver is available")
return None
track = transceivers[VIDEO_TRANSCEIVER_INDEX].receiver.track
video_track = SmallWebRTCTrack(track) if track else None
receiver = transceivers[VIDEO_TRANSCEIVER_INDEX].receiver
video_track = SmallWebRTCTrack(receiver) if receiver else None
self._track_map[VIDEO_TRANSCEIVER_INDEX] = video_track
return video_track
@@ -570,8 +610,8 @@ class SmallWebRTCConnection(BaseObject):
logger.warning("No screen video transceiver is available")
return None
track = transceivers[SCREEN_VIDEO_TRANSCEIVER_INDEX].receiver.track
video_track = SmallWebRTCTrack(track) if track else None
receiver = transceivers[SCREEN_VIDEO_TRANSCEIVER_INDEX].receiver
video_track = SmallWebRTCTrack(receiver) if receiver else None
self._track_map[SCREEN_VIDEO_TRANSCEIVER_INDEX] = video_track
return video_track

View File

@@ -14,13 +14,33 @@ and async cleanup for all Pipecat components.
import asyncio
import inspect
from abc import ABC
from typing import Optional
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from loguru import logger
from pipecat.utils.utils import obj_count, obj_id
@dataclass
class EventHandler:
"""Data class to store event handlers information.
This data class stores the event name, a list of handlers to run for this
event, and whether these handlers will be executed in a task.
Attributes:
name (str): The name of the event handler.
handlers (List[Any]): A list of functions to be called when this event is triggered.
is_sync (bool): Indicates whether the functions are executed in a task.
"""
name: str
handlers: List[Any]
is_sync: bool
class BaseObject(ABC):
"""Abstract base class providing common functionality for Pipecat objects.
@@ -41,7 +61,7 @@ class BaseObject(ABC):
self._name = name or f"{self.__class__.__name__}#{obj_count(self)}"
# Registered event handlers.
self._event_handlers: dict = {}
self._event_handlers: Dict[str, EventHandler] = {}
# Set of tasks being executed. When a task finishes running it gets
# automatically removed from the set. When we cleanup we wait for all
@@ -103,18 +123,21 @@ class BaseObject(ABC):
Can be sync or async.
"""
if event_name in self._event_handlers:
self._event_handlers[event_name].append(handler)
self._event_handlers[event_name].handlers.append(handler)
else:
logger.warning(f"Event handler {event_name} not registered")
def _register_event_handler(self, event_name: str):
def _register_event_handler(self, event_name: str, sync: bool = False):
"""Register an event handler type.
Args:
event_name: The name of the event type to register.
sync: Whether this event handler will be executed in a task.
"""
if event_name not in self._event_handlers:
self._event_handlers[event_name] = []
self._event_handlers[event_name] = EventHandler(
name=event_name, handlers=[], is_sync=sync
)
else:
logger.warning(f"Event handler {event_name} not registered")
@@ -126,34 +149,43 @@ class BaseObject(ABC):
*args: Positional arguments to pass to event handlers.
**kwargs: Keyword arguments to pass to event handlers.
"""
# If we haven't registered an event handler, we don't need to do
# anything.
if not self._event_handlers.get(event_name):
if event_name not in self._event_handlers:
return
# Create the task.
task = asyncio.create_task(self._run_task(event_name, *args, **kwargs))
event_handler = self._event_handlers[event_name]
# Add it to our list of event tasks.
self._event_tasks.add((event_name, task))
for handler in event_handler.handlers:
if event_handler.is_sync:
# Just run the handler.
await self._run_handler(event_handler.name, handler, *args, **kwargs)
else:
# Create the task. Note that this is a task per each function
# handler. Users can register to an event handler multiple
# times.
task = asyncio.create_task(
self._run_handler(event_handler.name, handler, *args, **kwargs)
)
# Remove the task from the event tasks list when the task completes.
task.add_done_callback(self._event_task_finished)
# Add it to our list of event tasks.
self._event_tasks.add((event_name, task))
async def _run_task(self, event_name: str, *args, **kwargs):
# Remove the task from the event tasks list when the task completes.
task.add_done_callback(self._event_task_finished)
async def _run_handler(self, event_name: str, handler, *args, **kwargs):
"""Execute all handlers for an event.
Args:
event_name: The name of the event being handled.
event_name: The event name for this handler.
handler: The handler function to run.
*args: Positional arguments to pass to handlers.
**kwargs: Keyword arguments to pass to handlers.
"""
try:
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.exception(f"Exception in event handler {event_name}: {e}")

44
uv.lock generated
View File

@@ -215,7 +215,7 @@ wheels = [
[[package]]
name = "aiortc"
version = "1.11.0"
version = "1.13.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aioice" },
@@ -227,15 +227,9 @@ dependencies = [
{ name = "pylibsrtp" },
{ name = "pyopenssl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/91/60/7bb59c28c6e65e5d74258d392f531f555f12ab519b0f467ffd6b76650c20/aiortc-1.11.0.tar.gz", hash = "sha256:50b9d86f6cba87d95ce7c6b051949208b48f8062b231837aed8f049045f11a28", size = 1179206, upload-time = "2025-03-28T10:00:50.327Z" }
sdist = { url = "https://files.pythonhosted.org/packages/62/03/bc947d74c548e0c17cf94e5d5bdacaed0ee9e5b2bb7b8b8cf1ac7a7c01ec/aiortc-1.13.0.tar.gz", hash = "sha256:5d209975c22d0910fb5a0f0e2caa828f2da966c53580f7c7170ac3a16a871620", size = 1179894, upload-time = "2025-05-27T03:23:59.017Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/17/34/5c34707ce58ca0fd3b157a3b478255a8445950bf2b87f048864eb7233f5f/aiortc-1.11.0-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:018b0d623c6b88b9cd4bd3b700dece943731d081c50fef1b866a43f6b46a7343", size = 1218501, upload-time = "2025-03-28T10:00:39.44Z" },
{ url = "https://files.pythonhosted.org/packages/1b/d7/cc1d483097f2ae605e07e9f7af004c473da5756af25149823de2047eb991/aiortc-1.11.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:0bdd6477ac9227e9fd80ca079d6614b5b0b45c1887f214e67cddc7fde2692d95", size = 898901, upload-time = "2025-03-28T10:00:41.709Z" },
{ url = "https://files.pythonhosted.org/packages/00/64/caf7e7b3c49d492ba79256638644812d66ca68dcfa8e27307fd58f564555/aiortc-1.11.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bc311672d25091061eaa9c3fe1adbb7f2ef677c6fabd2cffdff8c724c1f81ce7", size = 1750429, upload-time = "2025-03-28T10:00:43.802Z" },
{ url = "https://files.pythonhosted.org/packages/11/12/3e37c16de90ead788e45bfe10fe6fea66711919d2bf3826f663779824de0/aiortc-1.11.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f57c5804135d357291f25de65faf7a844d7595c6eb12493e0a304f4d5c34d660", size = 1867914, upload-time = "2025-03-28T10:00:45.049Z" },
{ url = "https://files.pythonhosted.org/packages/aa/a9/f0a32b3966e8bc8cf4faea558b6e40171eacfc04b14e8b077bebc6ec57e3/aiortc-1.11.0-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:43ff9f5c2a5d657fbb4ab8c9b4e4c9d2967753e03c4539eb1dd82014816ef6a0", size = 1893742, upload-time = "2025-03-28T10:00:46.393Z" },
{ url = "https://files.pythonhosted.org/packages/a5/c5/57f997af08ceca5e78a5f23e4cb93445236eff39af0c9940495ae7069de4/aiortc-1.11.0-cp39-abi3-win32.whl", hash = "sha256:5e10a50ca6df3abc32811e1c84fe131b7d20d3e5349f521ca430683ca9a96c70", size = 923160, upload-time = "2025-03-28T10:00:47.578Z" },
{ url = "https://files.pythonhosted.org/packages/b2/ce/7f969694b950f673d7bf5ec697608366bd585ff741760e107e3eff55b131/aiortc-1.11.0-cp39-abi3-win_amd64.whl", hash = "sha256:67debf5ce89fb12c64b4be24e70809b29f1bb0e635914760d0c2e1193955ff62", size = 1009541, upload-time = "2025-03-28T10:00:49.09Z" },
{ url = "https://files.pythonhosted.org/packages/87/29/765633cab5f1888890f5f172d1d53009b9b14e079cdfa01a62d9896a9ea9/aiortc-1.13.0-py3-none-any.whl", hash = "sha256:9ccccec98796f6a96bd1c3dd437a06da7e0f57521c96bd56e4b965a91b03a0a0", size = 92910, upload-time = "2025-05-27T03:23:57.344Z" },
]
[[package]]
@@ -3160,12 +3154,12 @@ name = "mlx-lm"
version = "0.27.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "jinja2" },
{ name = "mlx" },
{ name = "numpy" },
{ name = "protobuf" },
{ name = "pyyaml" },
{ name = "transformers" },
{ name = "jinja2", marker = "sys_platform == 'darwin'" },
{ name = "mlx", marker = "sys_platform == 'darwin'" },
{ name = "numpy", marker = "sys_platform == 'darwin'" },
{ name = "protobuf", marker = "sys_platform == 'darwin'" },
{ name = "pyyaml", marker = "sys_platform == 'darwin'" },
{ name = "transformers", marker = "sys_platform == 'darwin'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/41/77/e8d3a82658a2070bc392a583dd08c8d24088433e920eac4905bf882255ad/mlx_lm-0.27.1.tar.gz", hash = "sha256:36640fb64c909cfd9baddf37b16e7d3b94a1a141033e6b7ea7a0ef5a965fb4ae", size = 185170, upload-time = "2025-09-04T16:06:57.949Z" }
wheels = [
@@ -3658,7 +3652,7 @@ name = "nvidia-cudnn-cu12"
version = "9.5.1.17"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-cublas-cu12" },
{ name = "nvidia-cublas-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/78/4535c9c7f859a64781e43c969a3a7e84c54634e319a996d43ef32ce46f83/nvidia_cudnn_cu12-9.5.1.17-py3-none-manylinux_2_28_x86_64.whl", hash = "sha256:30ac3869f6db17d170e0e556dd6cc5eee02647abc31ca856634d5a40f82c15b2", size = 570988386, upload-time = "2024-10-25T19:54:26.39Z" },
@@ -3669,7 +3663,7 @@ name = "nvidia-cufft-cu12"
version = "11.3.0.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-nvjitlink-cu12" },
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/8f/16/73727675941ab8e6ffd86ca3a4b7b47065edcca7a997920b831f8147c99d/nvidia_cufft_cu12-11.3.0.4-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:ccba62eb9cef5559abd5e0d54ceed2d9934030f51163df018532142a8ec533e5", size = 200221632, upload-time = "2024-11-20T17:41:32.357Z" },
@@ -3698,9 +3692,9 @@ name = "nvidia-cusolver-cu12"
version = "11.7.1.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-cublas-cu12" },
{ name = "nvidia-cusparse-cu12" },
{ name = "nvidia-nvjitlink-cu12" },
{ name = "nvidia-cublas-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-cusparse-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/f0/6e/c2cf12c9ff8b872e92b4a5740701e51ff17689c4d726fca91875b07f655d/nvidia_cusolver_cu12-11.7.1.2-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:e9e49843a7707e42022babb9bcfa33c29857a93b88020c4e4434656a655b698c", size = 158229790, upload-time = "2024-11-20T17:43:43.211Z" },
@@ -3712,7 +3706,7 @@ name = "nvidia-cusparse-cu12"
version = "12.5.4.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-nvjitlink-cu12" },
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/06/1e/b8b7c2f4099a37b96af5c9bb158632ea9e5d9d27d7391d7eb8fc45236674/nvidia_cusparse_cu12-12.5.4.2-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:7556d9eca156e18184b94947ade0fba5bb47d69cec46bf8660fd2c71a4b48b73", size = 216561367, upload-time = "2024-11-20T17:44:54.824Z" },
@@ -4471,7 +4465,7 @@ requires-dist = [
{ name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.0.0" },
{ name = "aiofiles", specifier = ">=24.1.0,<25" },
{ name = "aiohttp", specifier = ">=3.11.12,<4" },
{ name = "aiortc", marker = "extra == 'webrtc'", specifier = "~=1.11.0" },
{ name = "aiortc", marker = "extra == 'webrtc'", specifier = "~=1.13.0" },
{ name = "anthropic", marker = "extra == 'anthropic'", specifier = "~=0.49.0" },
{ name = "audioop-lts", marker = "python_full_version >= '3.13'", specifier = "~=0.2.1" },
{ name = "aws-sdk-bedrock-runtime", marker = "python_full_version >= '3.12' and extra == 'aws-nova-sonic'", specifier = "~=0.0.2" },
@@ -7169,7 +7163,7 @@ name = "triton"
version = "3.3.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "setuptools" },
{ name = "setuptools", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/76/04/d54d3a6d077c646624dc9461b0059e23fd5d30e0dbe67471e3654aec81f9/triton-3.3.0-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:fad99beafc860501d7fcc1fb7045d9496cbe2c882b1674640304949165a916e7", size = 156441993, upload-time = "2025-04-09T20:27:25.107Z" },
@@ -7722,8 +7716,8 @@ name = "xformers"
version = "0.0.30"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy" },
{ name = "torch" },
{ name = "numpy", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "torch", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
]
sdist = { url = "https://files.pythonhosted.org/packages/bf/f7/dd2269cce89fd1221947dd7cc3a60707ffe721ef55c1803ac3b1a1f7ae5c/xformers-0.0.30.tar.gz", hash = "sha256:a12bf3eb39e294cdbe8a7253ac9b665f41bac61d6d98df174e34ef7bdb6f2fc4", size = 10214139, upload-time = "2025-04-28T20:51:02.045Z" }
wheels = [