Compare commits
8 Commits
hush/usage
...
cb/frame-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
05863ded53 | ||
|
|
6ab4a48d8f | ||
|
|
89e0092159 | ||
|
|
0de31dab79 | ||
|
|
10ff93307d | ||
|
|
414c9e3bc8 | ||
|
|
ba64f126a3 | ||
|
|
d28e3881a7 |
14
CHANGELOG.md
14
CHANGELOG.md
@@ -9,6 +9,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
||||||
|
- Added `on_before_disconnect` synchronous event to `DailyTransport` and
|
||||||
|
`LiveKitTransport`.
|
||||||
|
|
||||||
|
- It is now possible to register synchronous event handlers. By default, all
|
||||||
|
event handlers are executed in a separate task. However, in some cases we want
|
||||||
|
to guarantee order of execution, for example, executing something before
|
||||||
|
disconnecting a transport.
|
||||||
|
|
||||||
|
```python
|
||||||
|
self._register_event_handler("on_event_name", sync=True)
|
||||||
|
```
|
||||||
|
|
||||||
- Added support for global location in `GoogleVertexLLMService`. The service now
|
- Added support for global location in `GoogleVertexLLMService`. The service now
|
||||||
supports both regional locations (e.g., "us-east4") and the "global" location
|
supports both regional locations (e.g., "us-east4") and the "global" location
|
||||||
for Vertex AI endpoints. When using "global" location, the service will use
|
for Vertex AI endpoints. When using "global" location, the service will use
|
||||||
@@ -53,6 +65,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
|
- Fixed an issue where multiple handlers for an event would not run in parallel.
|
||||||
|
|
||||||
- Fixed `DailyTransport.sip_call_transfer()` to automatically use the session
|
- Fixed `DailyTransport.sip_call_transfer()` to automatically use the session
|
||||||
ID from the `on_dialin_connected` event, when not explicitly provided. Now
|
ID from the `on_dialin_connected` event, when not explicitly provided. Now
|
||||||
supports cold transfers (from incoming dial-in calls) by automatically
|
supports cold transfers (from incoming dial-in calls) by automatically
|
||||||
|
|||||||
@@ -568,11 +568,17 @@ class FrameProcessor(BaseObject):
|
|||||||
"""Pause processing of queued frames."""
|
"""Pause processing of queued frames."""
|
||||||
logger.trace(f"{self}: pausing frame processing")
|
logger.trace(f"{self}: pausing frame processing")
|
||||||
self.__should_block_frames = True
|
self.__should_block_frames = True
|
||||||
|
# We should also unset the process event here, in case it was set immediately after an interruption
|
||||||
|
if self.__process_event:
|
||||||
|
self.__process_event.clear()
|
||||||
|
|
||||||
async def pause_processing_system_frames(self):
|
async def pause_processing_system_frames(self):
|
||||||
"""Pause processing of queued system frames."""
|
"""Pause processing of queued system frames."""
|
||||||
logger.trace(f"{self}: pausing system frame processing")
|
logger.trace(f"{self}: pausing system frame processing")
|
||||||
self.__should_block_system_frames = True
|
self.__should_block_system_frames = True
|
||||||
|
# We should also unset the input event here, in case it was set immediately after an interruption
|
||||||
|
if self.__input_event:
|
||||||
|
self.__input_event.clear()
|
||||||
|
|
||||||
async def resume_processing_frames(self):
|
async def resume_processing_frames(self):
|
||||||
"""Resume processing of queued frames."""
|
"""Resume processing of queued frames."""
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ from pydantic import BaseModel
|
|||||||
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
|
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
|
||||||
from pipecat.frames.frames import (
|
from pipecat.frames.frames import (
|
||||||
CancelFrame,
|
CancelFrame,
|
||||||
|
ControlFrame,
|
||||||
EndFrame,
|
EndFrame,
|
||||||
ErrorFrame,
|
ErrorFrame,
|
||||||
Frame,
|
Frame,
|
||||||
@@ -41,6 +42,7 @@ from pipecat.frames.frames import (
|
|||||||
UserAudioRawFrame,
|
UserAudioRawFrame,
|
||||||
UserImageRawFrame,
|
UserImageRawFrame,
|
||||||
UserImageRequestFrame,
|
UserImageRequestFrame,
|
||||||
|
DataFrame,
|
||||||
)
|
)
|
||||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup
|
from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup
|
||||||
from pipecat.transcriptions.language import Language
|
from pipecat.transcriptions.language import Language
|
||||||
@@ -105,6 +107,17 @@ class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
|
|||||||
participant_id: Optional[str] = None
|
participant_id: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class DailyUpdateRemoteParticipantsFrame(ControlFrame):
|
||||||
|
"""Frame to update remote participants in Daily calls.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
remote_participants: See https://reference-python.daily.co/api_reference.html#daily.CallClient.update_remote_participants.
|
||||||
|
"""
|
||||||
|
|
||||||
|
remote_participants: Mapping[str, Any] = None
|
||||||
|
|
||||||
|
|
||||||
class WebRTCVADAnalyzer(VADAnalyzer):
|
class WebRTCVADAnalyzer(VADAnalyzer):
|
||||||
"""Voice Activity Detection analyzer using WebRTC.
|
"""Voice Activity Detection analyzer using WebRTC.
|
||||||
|
|
||||||
@@ -215,6 +228,7 @@ class DailyCallbacks(BaseModel):
|
|||||||
on_active_speaker_changed: Called when the active speaker of the call has changed.
|
on_active_speaker_changed: Called when the active speaker of the call has changed.
|
||||||
on_joined: Called when bot successfully joined a room.
|
on_joined: Called when bot successfully joined a room.
|
||||||
on_left: Called when bot left a room.
|
on_left: Called when bot left a room.
|
||||||
|
on_before_leave: Called when bot is about to leave the room.
|
||||||
on_error: Called when an error occurs.
|
on_error: Called when an error occurs.
|
||||||
on_app_message: Called when receiving an app message.
|
on_app_message: Called when receiving an app message.
|
||||||
on_call_state_updated: Called when call state changes.
|
on_call_state_updated: Called when call state changes.
|
||||||
@@ -244,6 +258,7 @@ class DailyCallbacks(BaseModel):
|
|||||||
on_active_speaker_changed: Callable[[Mapping[str, Any]], Awaitable[None]]
|
on_active_speaker_changed: Callable[[Mapping[str, Any]], Awaitable[None]]
|
||||||
on_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
|
on_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
|
||||||
on_left: Callable[[], Awaitable[None]]
|
on_left: Callable[[], Awaitable[None]]
|
||||||
|
on_before_leave: Callable[[], Awaitable[None]]
|
||||||
on_error: Callable[[str], Awaitable[None]]
|
on_error: Callable[[str], Awaitable[None]]
|
||||||
on_app_message: Callable[[Any, str], Awaitable[None]]
|
on_app_message: Callable[[Any, str], Awaitable[None]]
|
||||||
on_call_state_updated: Callable[[str], Awaitable[None]]
|
on_call_state_updated: Callable[[str], Awaitable[None]]
|
||||||
@@ -720,6 +735,9 @@ class DailyTransportClient(EventHandler):
|
|||||||
|
|
||||||
logger.info(f"Leaving {self._room_url}")
|
logger.info(f"Leaving {self._room_url}")
|
||||||
|
|
||||||
|
# Call callback before leaving.
|
||||||
|
await self._callbacks.on_before_leave()
|
||||||
|
|
||||||
if self._params.transcription_enabled:
|
if self._params.transcription_enabled:
|
||||||
await self.stop_transcription()
|
await self.stop_transcription()
|
||||||
|
|
||||||
@@ -1785,6 +1803,31 @@ class DailyOutputTransport(BaseOutputTransport):
|
|||||||
# Leave the room.
|
# Leave the room.
|
||||||
await self._client.leave()
|
await self._client.leave()
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
"""Process outgoing frames, including transport messages.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
frame: The frame to process.
|
||||||
|
direction: The direction of frame flow in the pipeline.
|
||||||
|
"""
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
|
||||||
|
logger.debug(f"Got a DailyUpdateRemoteParticipantsFrame: {frame}")
|
||||||
|
await self._client.update_remote_participants(frame.remote_participants)
|
||||||
|
|
||||||
|
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||||
|
"""Process outgoing frames, including transport messages.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
frame: The frame to process.
|
||||||
|
direction: The direction of frame flow in the pipeline.
|
||||||
|
"""
|
||||||
|
await super().process_frame(frame, direction)
|
||||||
|
|
||||||
|
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
|
||||||
|
await self._client.update_remote_participants(frame.remote_participants)
|
||||||
|
|
||||||
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
|
||||||
"""Send a transport message to participants.
|
"""Send a transport message to participants.
|
||||||
|
|
||||||
@@ -1880,6 +1923,7 @@ class DailyTransport(BaseTransport):
|
|||||||
on_active_speaker_changed=self._on_active_speaker_changed,
|
on_active_speaker_changed=self._on_active_speaker_changed,
|
||||||
on_joined=self._on_joined,
|
on_joined=self._on_joined,
|
||||||
on_left=self._on_left,
|
on_left=self._on_left,
|
||||||
|
on_before_leave=self._on_before_leave,
|
||||||
on_error=self._on_error,
|
on_error=self._on_error,
|
||||||
on_app_message=self._on_app_message,
|
on_app_message=self._on_app_message,
|
||||||
on_call_state_updated=self._on_call_state_updated,
|
on_call_state_updated=self._on_call_state_updated,
|
||||||
@@ -1943,6 +1987,10 @@ class DailyTransport(BaseTransport):
|
|||||||
self._register_event_handler("on_recording_started")
|
self._register_event_handler("on_recording_started")
|
||||||
self._register_event_handler("on_recording_stopped")
|
self._register_event_handler("on_recording_stopped")
|
||||||
self._register_event_handler("on_recording_error")
|
self._register_event_handler("on_recording_error")
|
||||||
|
self._register_event_handler("on_before_disconnect", sync=True)
|
||||||
|
# Deprecated
|
||||||
|
self._register_event_handler("on_joined")
|
||||||
|
self._register_event_handler("on_left")
|
||||||
|
|
||||||
#
|
#
|
||||||
# BaseTransport
|
# BaseTransport
|
||||||
@@ -2194,6 +2242,10 @@ class DailyTransport(BaseTransport):
|
|||||||
"""Handle room left events."""
|
"""Handle room left events."""
|
||||||
await self._call_event_handler("on_left")
|
await self._call_event_handler("on_left")
|
||||||
|
|
||||||
|
async def _on_before_leave(self):
|
||||||
|
"""Handle before leave room events."""
|
||||||
|
await self._call_event_handler("on_before_disconnect")
|
||||||
|
|
||||||
async def _on_error(self, error):
|
async def _on_error(self, error):
|
||||||
"""Handle error events and push error frames."""
|
"""Handle error events and push error frames."""
|
||||||
await self._call_event_handler("on_error", error)
|
await self._call_event_handler("on_error", error)
|
||||||
@@ -2333,7 +2385,7 @@ class DailyTransport(BaseTransport):
|
|||||||
"""Handle participant updated events."""
|
"""Handle participant updated events."""
|
||||||
await self._call_event_handler("on_participant_updated", participant)
|
await self._call_event_handler("on_participant_updated", participant)
|
||||||
|
|
||||||
async def _on_transcription_message(self, message: Dict[str, Any]) -> None:
|
async def _on_transcription_message(self, message: Mapping[str, Any]) -> None:
|
||||||
"""Handle transcription message events."""
|
"""Handle transcription message events."""
|
||||||
await self._call_event_handler("on_transcription_message", message)
|
await self._call_event_handler("on_transcription_message", message)
|
||||||
|
|
||||||
|
|||||||
@@ -114,6 +114,7 @@ class LiveKitCallbacks(BaseModel):
|
|||||||
|
|
||||||
on_connected: Callable[[], Awaitable[None]]
|
on_connected: Callable[[], Awaitable[None]]
|
||||||
on_disconnected: Callable[[], Awaitable[None]]
|
on_disconnected: Callable[[], Awaitable[None]]
|
||||||
|
on_before_disconnect: Callable[[], Awaitable[None]]
|
||||||
on_participant_connected: Callable[[str], Awaitable[None]]
|
on_participant_connected: Callable[[str], Awaitable[None]]
|
||||||
on_participant_disconnected: Callable[[str], Awaitable[None]]
|
on_participant_disconnected: Callable[[str], Awaitable[None]]
|
||||||
on_audio_track_subscribed: Callable[[str], Awaitable[None]]
|
on_audio_track_subscribed: Callable[[str], Awaitable[None]]
|
||||||
@@ -282,6 +283,7 @@ class LiveKitTransportClient:
|
|||||||
return
|
return
|
||||||
|
|
||||||
logger.info(f"Disconnecting from {self._room_name}")
|
logger.info(f"Disconnecting from {self._room_name}")
|
||||||
|
await self._callbacks.on_before_disconnect()
|
||||||
await self.room.disconnect()
|
await self.room.disconnect()
|
||||||
self._connected = False
|
self._connected = False
|
||||||
logger.info(f"Disconnected from {self._room_name}")
|
logger.info(f"Disconnected from {self._room_name}")
|
||||||
@@ -918,6 +920,7 @@ class LiveKitTransport(BaseTransport):
|
|||||||
callbacks = LiveKitCallbacks(
|
callbacks = LiveKitCallbacks(
|
||||||
on_connected=self._on_connected,
|
on_connected=self._on_connected,
|
||||||
on_disconnected=self._on_disconnected,
|
on_disconnected=self._on_disconnected,
|
||||||
|
on_before_disconnect=self._on_before_disconnect,
|
||||||
on_participant_connected=self._on_participant_connected,
|
on_participant_connected=self._on_participant_connected,
|
||||||
on_participant_disconnected=self._on_participant_disconnected,
|
on_participant_disconnected=self._on_participant_disconnected,
|
||||||
on_audio_track_subscribed=self._on_audio_track_subscribed,
|
on_audio_track_subscribed=self._on_audio_track_subscribed,
|
||||||
@@ -947,6 +950,7 @@ class LiveKitTransport(BaseTransport):
|
|||||||
self._register_event_handler("on_first_participant_joined")
|
self._register_event_handler("on_first_participant_joined")
|
||||||
self._register_event_handler("on_participant_left")
|
self._register_event_handler("on_participant_left")
|
||||||
self._register_event_handler("on_call_state_updated")
|
self._register_event_handler("on_call_state_updated")
|
||||||
|
self._register_event_handler("on_before_disconnect", sync=True)
|
||||||
|
|
||||||
def input(self) -> LiveKitInputTransport:
|
def input(self) -> LiveKitInputTransport:
|
||||||
"""Get the input transport for receiving media and events.
|
"""Get the input transport for receiving media and events.
|
||||||
@@ -1041,6 +1045,10 @@ class LiveKitTransport(BaseTransport):
|
|||||||
"""Handle room disconnected events."""
|
"""Handle room disconnected events."""
|
||||||
await self._call_event_handler("on_disconnected")
|
await self._call_event_handler("on_disconnected")
|
||||||
|
|
||||||
|
async def _on_before_disconnect(self):
|
||||||
|
"""Handle before disconnection room events."""
|
||||||
|
await self._call_event_handler("on_before_disconnect")
|
||||||
|
|
||||||
async def _on_participant_connected(self, participant_id: str):
|
async def _on_participant_connected(self, participant_id: str):
|
||||||
"""Handle participant connected events."""
|
"""Handle participant connected events."""
|
||||||
await self._call_event_handler("on_participant_connected", participant_id)
|
await self._call_event_handler("on_participant_connected", participant_id)
|
||||||
|
|||||||
@@ -14,13 +14,33 @@ and async cleanup for all Pipecat components.
|
|||||||
import asyncio
|
import asyncio
|
||||||
import inspect
|
import inspect
|
||||||
from abc import ABC
|
from abc import ABC
|
||||||
from typing import Optional
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
from pipecat.utils.utils import obj_count, obj_id
|
from pipecat.utils.utils import obj_count, obj_id
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class EventHandler:
|
||||||
|
"""Data class to store event handlers information.
|
||||||
|
|
||||||
|
This data class stores the event name, a list of handlers to run for this
|
||||||
|
event, and whether these handlers will be executed in a task.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
name (str): The name of the event handler.
|
||||||
|
handlers (List[Any]): A list of functions to be called when this event is triggered.
|
||||||
|
is_sync (bool): Indicates whether the functions are executed in a task.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
handlers: List[Any]
|
||||||
|
is_sync: bool
|
||||||
|
|
||||||
|
|
||||||
class BaseObject(ABC):
|
class BaseObject(ABC):
|
||||||
"""Abstract base class providing common functionality for Pipecat objects.
|
"""Abstract base class providing common functionality for Pipecat objects.
|
||||||
|
|
||||||
@@ -41,7 +61,7 @@ class BaseObject(ABC):
|
|||||||
self._name = name or f"{self.__class__.__name__}#{obj_count(self)}"
|
self._name = name or f"{self.__class__.__name__}#{obj_count(self)}"
|
||||||
|
|
||||||
# Registered event handlers.
|
# Registered event handlers.
|
||||||
self._event_handlers: dict = {}
|
self._event_handlers: Dict[str, EventHandler] = {}
|
||||||
|
|
||||||
# Set of tasks being executed. When a task finishes running it gets
|
# Set of tasks being executed. When a task finishes running it gets
|
||||||
# automatically removed from the set. When we cleanup we wait for all
|
# automatically removed from the set. When we cleanup we wait for all
|
||||||
@@ -103,18 +123,21 @@ class BaseObject(ABC):
|
|||||||
Can be sync or async.
|
Can be sync or async.
|
||||||
"""
|
"""
|
||||||
if event_name in self._event_handlers:
|
if event_name in self._event_handlers:
|
||||||
self._event_handlers[event_name].append(handler)
|
self._event_handlers[event_name].handlers.append(handler)
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Event handler {event_name} not registered")
|
logger.warning(f"Event handler {event_name} not registered")
|
||||||
|
|
||||||
def _register_event_handler(self, event_name: str):
|
def _register_event_handler(self, event_name: str, sync: bool = False):
|
||||||
"""Register an event handler type.
|
"""Register an event handler type.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
event_name: The name of the event type to register.
|
event_name: The name of the event type to register.
|
||||||
|
sync: Whether this event handler will be executed in a task.
|
||||||
"""
|
"""
|
||||||
if event_name not in self._event_handlers:
|
if event_name not in self._event_handlers:
|
||||||
self._event_handlers[event_name] = []
|
self._event_handlers[event_name] = EventHandler(
|
||||||
|
name=event_name, handlers=[], is_sync=sync
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Event handler {event_name} not registered")
|
logger.warning(f"Event handler {event_name} not registered")
|
||||||
|
|
||||||
@@ -126,34 +149,43 @@ class BaseObject(ABC):
|
|||||||
*args: Positional arguments to pass to event handlers.
|
*args: Positional arguments to pass to event handlers.
|
||||||
**kwargs: Keyword arguments to pass to event handlers.
|
**kwargs: Keyword arguments to pass to event handlers.
|
||||||
"""
|
"""
|
||||||
# If we haven't registered an event handler, we don't need to do
|
if event_name not in self._event_handlers:
|
||||||
# anything.
|
|
||||||
if not self._event_handlers.get(event_name):
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# Create the task.
|
event_handler = self._event_handlers[event_name]
|
||||||
task = asyncio.create_task(self._run_task(event_name, *args, **kwargs))
|
|
||||||
|
|
||||||
# Add it to our list of event tasks.
|
for handler in event_handler.handlers:
|
||||||
self._event_tasks.add((event_name, task))
|
if event_handler.is_sync:
|
||||||
|
# Just run the handler.
|
||||||
|
await self._run_handler(event_handler.name, handler, *args, **kwargs)
|
||||||
|
else:
|
||||||
|
# Create the task. Note that this is a task per each function
|
||||||
|
# handler. Users can register to an event handler multiple
|
||||||
|
# times.
|
||||||
|
task = asyncio.create_task(
|
||||||
|
self._run_handler(event_handler.name, handler, *args, **kwargs)
|
||||||
|
)
|
||||||
|
|
||||||
# Remove the task from the event tasks list when the task completes.
|
# Add it to our list of event tasks.
|
||||||
task.add_done_callback(self._event_task_finished)
|
self._event_tasks.add((event_name, task))
|
||||||
|
|
||||||
async def _run_task(self, event_name: str, *args, **kwargs):
|
# Remove the task from the event tasks list when the task completes.
|
||||||
|
task.add_done_callback(self._event_task_finished)
|
||||||
|
|
||||||
|
async def _run_handler(self, event_name: str, handler, *args, **kwargs):
|
||||||
"""Execute all handlers for an event.
|
"""Execute all handlers for an event.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
event_name: The name of the event being handled.
|
event_name: The event name for this handler.
|
||||||
|
handler: The handler function to run.
|
||||||
*args: Positional arguments to pass to handlers.
|
*args: Positional arguments to pass to handlers.
|
||||||
**kwargs: Keyword arguments to pass to handlers.
|
**kwargs: Keyword arguments to pass to handlers.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
for handler in self._event_handlers[event_name]:
|
if inspect.iscoroutinefunction(handler):
|
||||||
if inspect.iscoroutinefunction(handler):
|
await handler(self, *args, **kwargs)
|
||||||
await handler(self, *args, **kwargs)
|
else:
|
||||||
else:
|
handler(self, *args, **kwargs)
|
||||||
handler(self, *args, **kwargs)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"Exception in event handler {event_name}: {e}")
|
logger.exception(f"Exception in event handler {event_name}: {e}")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user