Compare commits

...

8 Commits

Author SHA1 Message Date
Chad Bailey
05863ded53 add remote participant updates to DailyTransport 2025-09-19 21:15:07 +00:00
chadbailey59
6ab4a48d8f Add remote participant updates to DailyTransport (#2694)
* add remote participant updates to DailyTransport

* cleanup

* cleanup

* ruff cleanup again
2025-09-19 21:15:07 +00:00
Aleix Conchillo Flaqué
89e0092159 BaseObject: run each handler for the same event in a separate task 2025-09-19 21:15:07 +00:00
Aleix Conchillo Flaqué
0de31dab79 LiveKitTransport: added synchronous before_disconnect event 2025-09-19 21:15:07 +00:00
Aleix Conchillo Flaqué
10ff93307d DailyTransport: added synchronous on_before_disconnect event 2025-09-19 21:15:07 +00:00
Aleix Conchillo Flaqué
414c9e3bc8 BaseObject: allow synchronous event handlers 2025-09-19 21:15:07 +00:00
Chad Bailey
ba64f126a3 reset paused frame processors after interruption 2025-09-19 18:15:01 +00:00
Chad Bailey
d28e3881a7 add remote participant updates to DailyTransport 2025-09-19 18:11:06 +00:00
5 changed files with 134 additions and 22 deletions

View File

@@ -9,6 +9,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added ### Added
- Added `on_before_disconnect` synchronous event to `DailyTransport` and
`LiveKitTransport`.
- It is now possible to register synchronous event handlers. By default, all
event handlers are executed in a separate task. However, in some cases we want
to guarantee order of execution, for example, executing something before
disconnecting a transport.
```python
self._register_event_handler("on_event_name", sync=True)
```
- Added support for global location in `GoogleVertexLLMService`. The service now - Added support for global location in `GoogleVertexLLMService`. The service now
supports both regional locations (e.g., "us-east4") and the "global" location supports both regional locations (e.g., "us-east4") and the "global" location
for Vertex AI endpoints. When using "global" location, the service will use for Vertex AI endpoints. When using "global" location, the service will use
@@ -53,6 +65,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed ### Fixed
- Fixed an issue where multiple handlers for an event would not run in parallel.
- Fixed `DailyTransport.sip_call_transfer()` to automatically use the session - Fixed `DailyTransport.sip_call_transfer()` to automatically use the session
ID from the `on_dialin_connected` event, when not explicitly provided. Now ID from the `on_dialin_connected` event, when not explicitly provided. Now
supports cold transfers (from incoming dial-in calls) by automatically supports cold transfers (from incoming dial-in calls) by automatically

View File

@@ -568,11 +568,17 @@ class FrameProcessor(BaseObject):
"""Pause processing of queued frames.""" """Pause processing of queued frames."""
logger.trace(f"{self}: pausing frame processing") logger.trace(f"{self}: pausing frame processing")
self.__should_block_frames = True self.__should_block_frames = True
# We should also unset the process event here, in case it was set immediately after an interruption
if self.__process_event:
self.__process_event.clear()
async def pause_processing_system_frames(self): async def pause_processing_system_frames(self):
"""Pause processing of queued system frames.""" """Pause processing of queued system frames."""
logger.trace(f"{self}: pausing system frame processing") logger.trace(f"{self}: pausing system frame processing")
self.__should_block_system_frames = True self.__should_block_system_frames = True
# We should also unset the input event here, in case it was set immediately after an interruption
if self.__input_event:
self.__input_event.clear()
async def resume_processing_frames(self): async def resume_processing_frames(self):
"""Resume processing of queued frames.""" """Resume processing of queued frames."""

View File

@@ -25,6 +25,7 @@ from pydantic import BaseModel
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
from pipecat.frames.frames import ( from pipecat.frames.frames import (
CancelFrame, CancelFrame,
ControlFrame,
EndFrame, EndFrame,
ErrorFrame, ErrorFrame,
Frame, Frame,
@@ -41,6 +42,7 @@ from pipecat.frames.frames import (
UserAudioRawFrame, UserAudioRawFrame,
UserImageRawFrame, UserImageRawFrame,
UserImageRequestFrame, UserImageRequestFrame,
DataFrame,
) )
from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup
from pipecat.transcriptions.language import Language from pipecat.transcriptions.language import Language
@@ -105,6 +107,17 @@ class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
participant_id: Optional[str] = None participant_id: Optional[str] = None
@dataclass
class DailyUpdateRemoteParticipantsFrame(ControlFrame):
"""Frame to update remote participants in Daily calls.
Parameters:
remote_participants: See https://reference-python.daily.co/api_reference.html#daily.CallClient.update_remote_participants.
"""
remote_participants: Mapping[str, Any] = None
class WebRTCVADAnalyzer(VADAnalyzer): class WebRTCVADAnalyzer(VADAnalyzer):
"""Voice Activity Detection analyzer using WebRTC. """Voice Activity Detection analyzer using WebRTC.
@@ -215,6 +228,7 @@ class DailyCallbacks(BaseModel):
on_active_speaker_changed: Called when the active speaker of the call has changed. on_active_speaker_changed: Called when the active speaker of the call has changed.
on_joined: Called when bot successfully joined a room. on_joined: Called when bot successfully joined a room.
on_left: Called when bot left a room. on_left: Called when bot left a room.
on_before_leave: Called when bot is about to leave the room.
on_error: Called when an error occurs. on_error: Called when an error occurs.
on_app_message: Called when receiving an app message. on_app_message: Called when receiving an app message.
on_call_state_updated: Called when call state changes. on_call_state_updated: Called when call state changes.
@@ -244,6 +258,7 @@ class DailyCallbacks(BaseModel):
on_active_speaker_changed: Callable[[Mapping[str, Any]], Awaitable[None]] on_active_speaker_changed: Callable[[Mapping[str, Any]], Awaitable[None]]
on_joined: Callable[[Mapping[str, Any]], Awaitable[None]] on_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
on_left: Callable[[], Awaitable[None]] on_left: Callable[[], Awaitable[None]]
on_before_leave: Callable[[], Awaitable[None]]
on_error: Callable[[str], Awaitable[None]] on_error: Callable[[str], Awaitable[None]]
on_app_message: Callable[[Any, str], Awaitable[None]] on_app_message: Callable[[Any, str], Awaitable[None]]
on_call_state_updated: Callable[[str], Awaitable[None]] on_call_state_updated: Callable[[str], Awaitable[None]]
@@ -720,6 +735,9 @@ class DailyTransportClient(EventHandler):
logger.info(f"Leaving {self._room_url}") logger.info(f"Leaving {self._room_url}")
# Call callback before leaving.
await self._callbacks.on_before_leave()
if self._params.transcription_enabled: if self._params.transcription_enabled:
await self.stop_transcription() await self.stop_transcription()
@@ -1785,6 +1803,31 @@ class DailyOutputTransport(BaseOutputTransport):
# Leave the room. # Leave the room.
await self._client.leave() await self._client.leave()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process outgoing frames, including transport messages.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
logger.debug(f"Got a DailyUpdateRemoteParticipantsFrame: {frame}")
await self._client.update_remote_participants(frame.remote_participants)
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process outgoing frames, including transport messages.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
await self._client.update_remote_participants(frame.remote_participants)
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame): async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
"""Send a transport message to participants. """Send a transport message to participants.
@@ -1880,6 +1923,7 @@ class DailyTransport(BaseTransport):
on_active_speaker_changed=self._on_active_speaker_changed, on_active_speaker_changed=self._on_active_speaker_changed,
on_joined=self._on_joined, on_joined=self._on_joined,
on_left=self._on_left, on_left=self._on_left,
on_before_leave=self._on_before_leave,
on_error=self._on_error, on_error=self._on_error,
on_app_message=self._on_app_message, on_app_message=self._on_app_message,
on_call_state_updated=self._on_call_state_updated, on_call_state_updated=self._on_call_state_updated,
@@ -1943,6 +1987,10 @@ class DailyTransport(BaseTransport):
self._register_event_handler("on_recording_started") self._register_event_handler("on_recording_started")
self._register_event_handler("on_recording_stopped") self._register_event_handler("on_recording_stopped")
self._register_event_handler("on_recording_error") self._register_event_handler("on_recording_error")
self._register_event_handler("on_before_disconnect", sync=True)
# Deprecated
self._register_event_handler("on_joined")
self._register_event_handler("on_left")
# #
# BaseTransport # BaseTransport
@@ -2194,6 +2242,10 @@ class DailyTransport(BaseTransport):
"""Handle room left events.""" """Handle room left events."""
await self._call_event_handler("on_left") await self._call_event_handler("on_left")
async def _on_before_leave(self):
"""Handle before leave room events."""
await self._call_event_handler("on_before_disconnect")
async def _on_error(self, error): async def _on_error(self, error):
"""Handle error events and push error frames.""" """Handle error events and push error frames."""
await self._call_event_handler("on_error", error) await self._call_event_handler("on_error", error)
@@ -2333,7 +2385,7 @@ class DailyTransport(BaseTransport):
"""Handle participant updated events.""" """Handle participant updated events."""
await self._call_event_handler("on_participant_updated", participant) await self._call_event_handler("on_participant_updated", participant)
async def _on_transcription_message(self, message: Dict[str, Any]) -> None: async def _on_transcription_message(self, message: Mapping[str, Any]) -> None:
"""Handle transcription message events.""" """Handle transcription message events."""
await self._call_event_handler("on_transcription_message", message) await self._call_event_handler("on_transcription_message", message)

View File

@@ -114,6 +114,7 @@ class LiveKitCallbacks(BaseModel):
on_connected: Callable[[], Awaitable[None]] on_connected: Callable[[], Awaitable[None]]
on_disconnected: Callable[[], Awaitable[None]] on_disconnected: Callable[[], Awaitable[None]]
on_before_disconnect: Callable[[], Awaitable[None]]
on_participant_connected: Callable[[str], Awaitable[None]] on_participant_connected: Callable[[str], Awaitable[None]]
on_participant_disconnected: Callable[[str], Awaitable[None]] on_participant_disconnected: Callable[[str], Awaitable[None]]
on_audio_track_subscribed: Callable[[str], Awaitable[None]] on_audio_track_subscribed: Callable[[str], Awaitable[None]]
@@ -282,6 +283,7 @@ class LiveKitTransportClient:
return return
logger.info(f"Disconnecting from {self._room_name}") logger.info(f"Disconnecting from {self._room_name}")
await self._callbacks.on_before_disconnect()
await self.room.disconnect() await self.room.disconnect()
self._connected = False self._connected = False
logger.info(f"Disconnected from {self._room_name}") logger.info(f"Disconnected from {self._room_name}")
@@ -918,6 +920,7 @@ class LiveKitTransport(BaseTransport):
callbacks = LiveKitCallbacks( callbacks = LiveKitCallbacks(
on_connected=self._on_connected, on_connected=self._on_connected,
on_disconnected=self._on_disconnected, on_disconnected=self._on_disconnected,
on_before_disconnect=self._on_before_disconnect,
on_participant_connected=self._on_participant_connected, on_participant_connected=self._on_participant_connected,
on_participant_disconnected=self._on_participant_disconnected, on_participant_disconnected=self._on_participant_disconnected,
on_audio_track_subscribed=self._on_audio_track_subscribed, on_audio_track_subscribed=self._on_audio_track_subscribed,
@@ -947,6 +950,7 @@ class LiveKitTransport(BaseTransport):
self._register_event_handler("on_first_participant_joined") self._register_event_handler("on_first_participant_joined")
self._register_event_handler("on_participant_left") self._register_event_handler("on_participant_left")
self._register_event_handler("on_call_state_updated") self._register_event_handler("on_call_state_updated")
self._register_event_handler("on_before_disconnect", sync=True)
def input(self) -> LiveKitInputTransport: def input(self) -> LiveKitInputTransport:
"""Get the input transport for receiving media and events. """Get the input transport for receiving media and events.
@@ -1041,6 +1045,10 @@ class LiveKitTransport(BaseTransport):
"""Handle room disconnected events.""" """Handle room disconnected events."""
await self._call_event_handler("on_disconnected") await self._call_event_handler("on_disconnected")
async def _on_before_disconnect(self):
"""Handle before disconnection room events."""
await self._call_event_handler("on_before_disconnect")
async def _on_participant_connected(self, participant_id: str): async def _on_participant_connected(self, participant_id: str):
"""Handle participant connected events.""" """Handle participant connected events."""
await self._call_event_handler("on_participant_connected", participant_id) await self._call_event_handler("on_participant_connected", participant_id)

View File

@@ -14,13 +14,33 @@ and async cleanup for all Pipecat components.
import asyncio import asyncio
import inspect import inspect
from abc import ABC from abc import ABC
from typing import Optional from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from loguru import logger from loguru import logger
from pipecat.utils.utils import obj_count, obj_id from pipecat.utils.utils import obj_count, obj_id
@dataclass
class EventHandler:
"""Data class to store event handlers information.
This data class stores the event name, a list of handlers to run for this
event, and whether these handlers will be executed in a task.
Attributes:
name (str): The name of the event handler.
handlers (List[Any]): A list of functions to be called when this event is triggered.
is_sync (bool): Indicates whether the functions are executed in a task.
"""
name: str
handlers: List[Any]
is_sync: bool
class BaseObject(ABC): class BaseObject(ABC):
"""Abstract base class providing common functionality for Pipecat objects. """Abstract base class providing common functionality for Pipecat objects.
@@ -41,7 +61,7 @@ class BaseObject(ABC):
self._name = name or f"{self.__class__.__name__}#{obj_count(self)}" self._name = name or f"{self.__class__.__name__}#{obj_count(self)}"
# Registered event handlers. # Registered event handlers.
self._event_handlers: dict = {} self._event_handlers: Dict[str, EventHandler] = {}
# Set of tasks being executed. When a task finishes running it gets # Set of tasks being executed. When a task finishes running it gets
# automatically removed from the set. When we cleanup we wait for all # automatically removed from the set. When we cleanup we wait for all
@@ -103,18 +123,21 @@ class BaseObject(ABC):
Can be sync or async. Can be sync or async.
""" """
if event_name in self._event_handlers: if event_name in self._event_handlers:
self._event_handlers[event_name].append(handler) self._event_handlers[event_name].handlers.append(handler)
else: else:
logger.warning(f"Event handler {event_name} not registered") logger.warning(f"Event handler {event_name} not registered")
def _register_event_handler(self, event_name: str): def _register_event_handler(self, event_name: str, sync: bool = False):
"""Register an event handler type. """Register an event handler type.
Args: Args:
event_name: The name of the event type to register. event_name: The name of the event type to register.
sync: Whether this event handler will be executed in a task.
""" """
if event_name not in self._event_handlers: if event_name not in self._event_handlers:
self._event_handlers[event_name] = [] self._event_handlers[event_name] = EventHandler(
name=event_name, handlers=[], is_sync=sync
)
else: else:
logger.warning(f"Event handler {event_name} not registered") logger.warning(f"Event handler {event_name} not registered")
@@ -126,34 +149,43 @@ class BaseObject(ABC):
*args: Positional arguments to pass to event handlers. *args: Positional arguments to pass to event handlers.
**kwargs: Keyword arguments to pass to event handlers. **kwargs: Keyword arguments to pass to event handlers.
""" """
# If we haven't registered an event handler, we don't need to do if event_name not in self._event_handlers:
# anything.
if not self._event_handlers.get(event_name):
return return
# Create the task. event_handler = self._event_handlers[event_name]
task = asyncio.create_task(self._run_task(event_name, *args, **kwargs))
# Add it to our list of event tasks. for handler in event_handler.handlers:
self._event_tasks.add((event_name, task)) if event_handler.is_sync:
# Just run the handler.
await self._run_handler(event_handler.name, handler, *args, **kwargs)
else:
# Create the task. Note that this is a task per each function
# handler. Users can register to an event handler multiple
# times.
task = asyncio.create_task(
self._run_handler(event_handler.name, handler, *args, **kwargs)
)
# Remove the task from the event tasks list when the task completes. # Add it to our list of event tasks.
task.add_done_callback(self._event_task_finished) self._event_tasks.add((event_name, task))
async def _run_task(self, event_name: str, *args, **kwargs): # Remove the task from the event tasks list when the task completes.
task.add_done_callback(self._event_task_finished)
async def _run_handler(self, event_name: str, handler, *args, **kwargs):
"""Execute all handlers for an event. """Execute all handlers for an event.
Args: Args:
event_name: The name of the event being handled. event_name: The event name for this handler.
handler: The handler function to run.
*args: Positional arguments to pass to handlers. *args: Positional arguments to pass to handlers.
**kwargs: Keyword arguments to pass to handlers. **kwargs: Keyword arguments to pass to handlers.
""" """
try: try:
for handler in self._event_handlers[event_name]: if inspect.iscoroutinefunction(handler):
if inspect.iscoroutinefunction(handler): await handler(self, *args, **kwargs)
await handler(self, *args, **kwargs) else:
else: handler(self, *args, **kwargs)
handler(self, *args, **kwargs)
except Exception as e: except Exception as e:
logger.exception(f"Exception in event handler {event_name}: {e}") logger.exception(f"Exception in event handler {event_name}: {e}")