Compare commits

...

8 Commits

Author SHA1 Message Date
Chad Bailey
05863ded53 add remote participant updates to DailyTransport 2025-09-19 21:15:07 +00:00
chadbailey59
6ab4a48d8f Add remote participant updates to DailyTransport (#2694)
* add remote participant updates to DailyTransport

* cleanup

* cleanup

* ruff cleanup again
2025-09-19 21:15:07 +00:00
Aleix Conchillo Flaqué
89e0092159 BaseObject: run each handler for the same event in a separate task 2025-09-19 21:15:07 +00:00
Aleix Conchillo Flaqué
0de31dab79 LiveKitTransport: added synchronous before_disconnect event 2025-09-19 21:15:07 +00:00
Aleix Conchillo Flaqué
10ff93307d DailyTransport: added synchronous on_before_disconnect event 2025-09-19 21:15:07 +00:00
Aleix Conchillo Flaqué
414c9e3bc8 BaseObject: allow synchronous event handlers 2025-09-19 21:15:07 +00:00
Chad Bailey
ba64f126a3 reset paused frame processors after interruption 2025-09-19 18:15:01 +00:00
Chad Bailey
d28e3881a7 add remote participant updates to DailyTransport 2025-09-19 18:11:06 +00:00
5 changed files with 134 additions and 22 deletions

View File

@@ -9,6 +9,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `on_before_disconnect` synchronous event to `DailyTransport` and
`LiveKitTransport`.
- It is now possible to register synchronous event handlers. By default, all
event handlers are executed in a separate task. However, in some cases we want
to guarantee order of execution, for example, executing something before
disconnecting a transport.
```python
self._register_event_handler("on_event_name", sync=True)
```
- Added support for global location in `GoogleVertexLLMService`. The service now
supports both regional locations (e.g., "us-east4") and the "global" location
for Vertex AI endpoints. When using "global" location, the service will use
@@ -53,6 +65,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue where multiple handlers for an event would not run in parallel.
- Fixed `DailyTransport.sip_call_transfer()` to automatically use the session
ID from the `on_dialin_connected` event, when not explicitly provided. Now
supports cold transfers (from incoming dial-in calls) by automatically

View File

@@ -568,11 +568,17 @@ class FrameProcessor(BaseObject):
"""Pause processing of queued frames."""
logger.trace(f"{self}: pausing frame processing")
self.__should_block_frames = True
# We should also unset the process event here, in case it was set immediately after an interruption
if self.__process_event:
self.__process_event.clear()
async def pause_processing_system_frames(self):
"""Pause processing of queued system frames."""
logger.trace(f"{self}: pausing system frame processing")
self.__should_block_system_frames = True
# We should also unset the input event here, in case it was set immediately after an interruption
if self.__input_event:
self.__input_event.clear()
async def resume_processing_frames(self):
"""Resume processing of queued frames."""

View File

@@ -25,6 +25,7 @@ from pydantic import BaseModel
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
from pipecat.frames.frames import (
CancelFrame,
ControlFrame,
EndFrame,
ErrorFrame,
Frame,
@@ -41,6 +42,7 @@ from pipecat.frames.frames import (
UserAudioRawFrame,
UserImageRawFrame,
UserImageRequestFrame,
DataFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup
from pipecat.transcriptions.language import Language
@@ -105,6 +107,17 @@ class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
participant_id: Optional[str] = None
@dataclass
class DailyUpdateRemoteParticipantsFrame(ControlFrame):
"""Frame to update remote participants in Daily calls.
Parameters:
remote_participants: See https://reference-python.daily.co/api_reference.html#daily.CallClient.update_remote_participants.
"""
remote_participants: Mapping[str, Any] = None
class WebRTCVADAnalyzer(VADAnalyzer):
"""Voice Activity Detection analyzer using WebRTC.
@@ -215,6 +228,7 @@ class DailyCallbacks(BaseModel):
on_active_speaker_changed: Called when the active speaker of the call has changed.
on_joined: Called when bot successfully joined a room.
on_left: Called when bot left a room.
on_before_leave: Called when bot is about to leave the room.
on_error: Called when an error occurs.
on_app_message: Called when receiving an app message.
on_call_state_updated: Called when call state changes.
@@ -244,6 +258,7 @@ class DailyCallbacks(BaseModel):
on_active_speaker_changed: Callable[[Mapping[str, Any]], Awaitable[None]]
on_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
on_left: Callable[[], Awaitable[None]]
on_before_leave: Callable[[], Awaitable[None]]
on_error: Callable[[str], Awaitable[None]]
on_app_message: Callable[[Any, str], Awaitable[None]]
on_call_state_updated: Callable[[str], Awaitable[None]]
@@ -720,6 +735,9 @@ class DailyTransportClient(EventHandler):
logger.info(f"Leaving {self._room_url}")
# Call callback before leaving.
await self._callbacks.on_before_leave()
if self._params.transcription_enabled:
await self.stop_transcription()
@@ -1785,6 +1803,31 @@ class DailyOutputTransport(BaseOutputTransport):
# Leave the room.
await self._client.leave()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process outgoing frames, including transport messages.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
logger.debug(f"Got a DailyUpdateRemoteParticipantsFrame: {frame}")
await self._client.update_remote_participants(frame.remote_participants)
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process outgoing frames, including transport messages.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
await self._client.update_remote_participants(frame.remote_participants)
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
"""Send a transport message to participants.
@@ -1880,6 +1923,7 @@ class DailyTransport(BaseTransport):
on_active_speaker_changed=self._on_active_speaker_changed,
on_joined=self._on_joined,
on_left=self._on_left,
on_before_leave=self._on_before_leave,
on_error=self._on_error,
on_app_message=self._on_app_message,
on_call_state_updated=self._on_call_state_updated,
@@ -1943,6 +1987,10 @@ class DailyTransport(BaseTransport):
self._register_event_handler("on_recording_started")
self._register_event_handler("on_recording_stopped")
self._register_event_handler("on_recording_error")
self._register_event_handler("on_before_disconnect", sync=True)
# Deprecated
self._register_event_handler("on_joined")
self._register_event_handler("on_left")
#
# BaseTransport
@@ -2194,6 +2242,10 @@ class DailyTransport(BaseTransport):
"""Handle room left events."""
await self._call_event_handler("on_left")
async def _on_before_leave(self):
"""Handle before leave room events."""
await self._call_event_handler("on_before_disconnect")
async def _on_error(self, error):
"""Handle error events and push error frames."""
await self._call_event_handler("on_error", error)
@@ -2333,7 +2385,7 @@ class DailyTransport(BaseTransport):
"""Handle participant updated events."""
await self._call_event_handler("on_participant_updated", participant)
async def _on_transcription_message(self, message: Dict[str, Any]) -> None:
async def _on_transcription_message(self, message: Mapping[str, Any]) -> None:
"""Handle transcription message events."""
await self._call_event_handler("on_transcription_message", message)

View File

@@ -114,6 +114,7 @@ class LiveKitCallbacks(BaseModel):
on_connected: Callable[[], Awaitable[None]]
on_disconnected: Callable[[], Awaitable[None]]
on_before_disconnect: Callable[[], Awaitable[None]]
on_participant_connected: Callable[[str], Awaitable[None]]
on_participant_disconnected: Callable[[str], Awaitable[None]]
on_audio_track_subscribed: Callable[[str], Awaitable[None]]
@@ -282,6 +283,7 @@ class LiveKitTransportClient:
return
logger.info(f"Disconnecting from {self._room_name}")
await self._callbacks.on_before_disconnect()
await self.room.disconnect()
self._connected = False
logger.info(f"Disconnected from {self._room_name}")
@@ -918,6 +920,7 @@ class LiveKitTransport(BaseTransport):
callbacks = LiveKitCallbacks(
on_connected=self._on_connected,
on_disconnected=self._on_disconnected,
on_before_disconnect=self._on_before_disconnect,
on_participant_connected=self._on_participant_connected,
on_participant_disconnected=self._on_participant_disconnected,
on_audio_track_subscribed=self._on_audio_track_subscribed,
@@ -947,6 +950,7 @@ class LiveKitTransport(BaseTransport):
self._register_event_handler("on_first_participant_joined")
self._register_event_handler("on_participant_left")
self._register_event_handler("on_call_state_updated")
self._register_event_handler("on_before_disconnect", sync=True)
def input(self) -> LiveKitInputTransport:
"""Get the input transport for receiving media and events.
@@ -1041,6 +1045,10 @@ class LiveKitTransport(BaseTransport):
"""Handle room disconnected events."""
await self._call_event_handler("on_disconnected")
async def _on_before_disconnect(self):
"""Handle before disconnection room events."""
await self._call_event_handler("on_before_disconnect")
async def _on_participant_connected(self, participant_id: str):
"""Handle participant connected events."""
await self._call_event_handler("on_participant_connected", participant_id)

View File

@@ -14,13 +14,33 @@ and async cleanup for all Pipecat components.
import asyncio
import inspect
from abc import ABC
from typing import Optional
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from loguru import logger
from pipecat.utils.utils import obj_count, obj_id
@dataclass
class EventHandler:
"""Data class to store event handlers information.
This data class stores the event name, a list of handlers to run for this
event, and whether these handlers will be executed in a task.
Attributes:
name (str): The name of the event handler.
handlers (List[Any]): A list of functions to be called when this event is triggered.
is_sync (bool): Indicates whether the functions are executed in a task.
"""
name: str
handlers: List[Any]
is_sync: bool
class BaseObject(ABC):
"""Abstract base class providing common functionality for Pipecat objects.
@@ -41,7 +61,7 @@ class BaseObject(ABC):
self._name = name or f"{self.__class__.__name__}#{obj_count(self)}"
# Registered event handlers.
self._event_handlers: dict = {}
self._event_handlers: Dict[str, EventHandler] = {}
# Set of tasks being executed. When a task finishes running it gets
# automatically removed from the set. When we cleanup we wait for all
@@ -103,18 +123,21 @@ class BaseObject(ABC):
Can be sync or async.
"""
if event_name in self._event_handlers:
self._event_handlers[event_name].append(handler)
self._event_handlers[event_name].handlers.append(handler)
else:
logger.warning(f"Event handler {event_name} not registered")
def _register_event_handler(self, event_name: str):
def _register_event_handler(self, event_name: str, sync: bool = False):
"""Register an event handler type.
Args:
event_name: The name of the event type to register.
sync: Whether this event handler will be executed in a task.
"""
if event_name not in self._event_handlers:
self._event_handlers[event_name] = []
self._event_handlers[event_name] = EventHandler(
name=event_name, handlers=[], is_sync=sync
)
else:
logger.warning(f"Event handler {event_name} not registered")
@@ -126,34 +149,43 @@ class BaseObject(ABC):
*args: Positional arguments to pass to event handlers.
**kwargs: Keyword arguments to pass to event handlers.
"""
# If we haven't registered an event handler, we don't need to do
# anything.
if not self._event_handlers.get(event_name):
if event_name not in self._event_handlers:
return
# Create the task.
task = asyncio.create_task(self._run_task(event_name, *args, **kwargs))
event_handler = self._event_handlers[event_name]
# Add it to our list of event tasks.
self._event_tasks.add((event_name, task))
for handler in event_handler.handlers:
if event_handler.is_sync:
# Just run the handler.
await self._run_handler(event_handler.name, handler, *args, **kwargs)
else:
# Create the task. Note that this is a task per each function
# handler. Users can register to an event handler multiple
# times.
task = asyncio.create_task(
self._run_handler(event_handler.name, handler, *args, **kwargs)
)
# Remove the task from the event tasks list when the task completes.
task.add_done_callback(self._event_task_finished)
# Add it to our list of event tasks.
self._event_tasks.add((event_name, task))
async def _run_task(self, event_name: str, *args, **kwargs):
# Remove the task from the event tasks list when the task completes.
task.add_done_callback(self._event_task_finished)
async def _run_handler(self, event_name: str, handler, *args, **kwargs):
"""Execute all handlers for an event.
Args:
event_name: The name of the event being handled.
event_name: The event name for this handler.
handler: The handler function to run.
*args: Positional arguments to pass to handlers.
**kwargs: Keyword arguments to pass to handlers.
"""
try:
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.exception(f"Exception in event handler {event_name}: {e}")