Compare commits

..

1 Commits

Author SHA1 Message Date
Mark Backman
6d5969076e Add a currency test to test_utils_string unit test 2025-09-18 21:01:50 -04:00
8 changed files with 27 additions and 140 deletions

View File

@@ -9,18 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added `on_before_disconnect` synchronous event to `DailyTransport` and
`LiveKitTransport`.
- It is now possible to register synchronous event handlers. By default, all
event handlers are executed in a separate task. However, in some cases we want
to guarantee order of execution, for example, executing something before
disconnecting a transport.
```python
self._register_event_handler("on_event_name", sync=True)
```
- Added support for global location in `GoogleVertexLLMService`. The service now
supports both regional locations (e.g., "us-east4") and the "global" location
for Vertex AI endpoints. When using "global" location, the service will use
@@ -38,8 +26,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Updated Silero VAD model to v6.
- Updated `livekit` to 1.0.13.
- `torch` and `torchaudio` are no longer required for running Smart Turn
@@ -65,8 +51,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue where multiple handlers for an event would not run in parallel.
- Fixed `DailyTransport.sip_call_transfer()` to automatically use the session
ID from the `on_dialin_connected` event, when not explicitly provided. Now
supports cold transfers (from incoming dial-in calls) by automatically

View File

@@ -568,17 +568,11 @@ class FrameProcessor(BaseObject):
"""Pause processing of queued frames."""
logger.trace(f"{self}: pausing frame processing")
self.__should_block_frames = True
# We should also unset the process event here, in case it was set immediately after an interruption
if self.__process_event:
self.__process_event.clear()
async def pause_processing_system_frames(self):
"""Pause processing of queued system frames."""
logger.trace(f"{self}: pausing system frame processing")
self.__should_block_system_frames = True
# We should also unset the input event here, in case it was set immediately after an interruption
if self.__input_event:
self.__input_event.clear()
async def resume_processing_frames(self):
"""Resume processing of queued frames."""

View File

@@ -78,6 +78,10 @@ from pipecat.runner.types import (
SmallWebRTCRunnerArguments,
WebSocketRunnerArguments,
)
from pipecat.transports.smallwebrtc.request_handler import (
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)
try:
import uvicorn
@@ -182,10 +186,6 @@ def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "lo
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)
except ImportError as e:
logger.error(f"WebRTC transport dependencies not installed: {e}")
return

View File

@@ -25,7 +25,6 @@ from pydantic import BaseModel
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams
from pipecat.frames.frames import (
CancelFrame,
ControlFrame,
EndFrame,
ErrorFrame,
Frame,
@@ -42,7 +41,6 @@ from pipecat.frames.frames import (
UserAudioRawFrame,
UserImageRawFrame,
UserImageRequestFrame,
DataFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessorSetup
from pipecat.transcriptions.language import Language
@@ -107,17 +105,6 @@ class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
participant_id: Optional[str] = None
@dataclass
class DailyUpdateRemoteParticipantsFrame(ControlFrame):
"""Frame to update remote participants in Daily calls.
Parameters:
remote_participants: See https://reference-python.daily.co/api_reference.html#daily.CallClient.update_remote_participants.
"""
remote_participants: Mapping[str, Any] = None
class WebRTCVADAnalyzer(VADAnalyzer):
"""Voice Activity Detection analyzer using WebRTC.
@@ -228,7 +215,6 @@ class DailyCallbacks(BaseModel):
on_active_speaker_changed: Called when the active speaker of the call has changed.
on_joined: Called when bot successfully joined a room.
on_left: Called when bot left a room.
on_before_leave: Called when bot is about to leave the room.
on_error: Called when an error occurs.
on_app_message: Called when receiving an app message.
on_call_state_updated: Called when call state changes.
@@ -258,7 +244,6 @@ class DailyCallbacks(BaseModel):
on_active_speaker_changed: Callable[[Mapping[str, Any]], Awaitable[None]]
on_joined: Callable[[Mapping[str, Any]], Awaitable[None]]
on_left: Callable[[], Awaitable[None]]
on_before_leave: Callable[[], Awaitable[None]]
on_error: Callable[[str], Awaitable[None]]
on_app_message: Callable[[Any, str], Awaitable[None]]
on_call_state_updated: Callable[[str], Awaitable[None]]
@@ -735,9 +720,6 @@ class DailyTransportClient(EventHandler):
logger.info(f"Leaving {self._room_url}")
# Call callback before leaving.
await self._callbacks.on_before_leave()
if self._params.transcription_enabled:
await self.stop_transcription()
@@ -1803,31 +1785,6 @@ class DailyOutputTransport(BaseOutputTransport):
# Leave the room.
await self._client.leave()
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process outgoing frames, including transport messages.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
logger.debug(f"Got a DailyUpdateRemoteParticipantsFrame: {frame}")
await self._client.update_remote_participants(frame.remote_participants)
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process outgoing frames, including transport messages.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
await self._client.update_remote_participants(frame.remote_participants)
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
"""Send a transport message to participants.
@@ -1923,7 +1880,6 @@ class DailyTransport(BaseTransport):
on_active_speaker_changed=self._on_active_speaker_changed,
on_joined=self._on_joined,
on_left=self._on_left,
on_before_leave=self._on_before_leave,
on_error=self._on_error,
on_app_message=self._on_app_message,
on_call_state_updated=self._on_call_state_updated,
@@ -1987,10 +1943,6 @@ class DailyTransport(BaseTransport):
self._register_event_handler("on_recording_started")
self._register_event_handler("on_recording_stopped")
self._register_event_handler("on_recording_error")
self._register_event_handler("on_before_disconnect", sync=True)
# Deprecated
self._register_event_handler("on_joined")
self._register_event_handler("on_left")
#
# BaseTransport
@@ -2242,10 +2194,6 @@ class DailyTransport(BaseTransport):
"""Handle room left events."""
await self._call_event_handler("on_left")
async def _on_before_leave(self):
"""Handle before leave room events."""
await self._call_event_handler("on_before_disconnect")
async def _on_error(self, error):
"""Handle error events and push error frames."""
await self._call_event_handler("on_error", error)
@@ -2385,7 +2333,7 @@ class DailyTransport(BaseTransport):
"""Handle participant updated events."""
await self._call_event_handler("on_participant_updated", participant)
async def _on_transcription_message(self, message: Mapping[str, Any]) -> None:
async def _on_transcription_message(self, message: Dict[str, Any]) -> None:
"""Handle transcription message events."""
await self._call_event_handler("on_transcription_message", message)

View File

@@ -114,7 +114,6 @@ class LiveKitCallbacks(BaseModel):
on_connected: Callable[[], Awaitable[None]]
on_disconnected: Callable[[], Awaitable[None]]
on_before_disconnect: Callable[[], Awaitable[None]]
on_participant_connected: Callable[[str], Awaitable[None]]
on_participant_disconnected: Callable[[str], Awaitable[None]]
on_audio_track_subscribed: Callable[[str], Awaitable[None]]
@@ -283,7 +282,6 @@ class LiveKitTransportClient:
return
logger.info(f"Disconnecting from {self._room_name}")
await self._callbacks.on_before_disconnect()
await self.room.disconnect()
self._connected = False
logger.info(f"Disconnected from {self._room_name}")
@@ -920,7 +918,6 @@ class LiveKitTransport(BaseTransport):
callbacks = LiveKitCallbacks(
on_connected=self._on_connected,
on_disconnected=self._on_disconnected,
on_before_disconnect=self._on_before_disconnect,
on_participant_connected=self._on_participant_connected,
on_participant_disconnected=self._on_participant_disconnected,
on_audio_track_subscribed=self._on_audio_track_subscribed,
@@ -950,7 +947,6 @@ class LiveKitTransport(BaseTransport):
self._register_event_handler("on_first_participant_joined")
self._register_event_handler("on_participant_left")
self._register_event_handler("on_call_state_updated")
self._register_event_handler("on_before_disconnect", sync=True)
def input(self) -> LiveKitInputTransport:
"""Get the input transport for receiving media and events.
@@ -1045,10 +1041,6 @@ class LiveKitTransport(BaseTransport):
"""Handle room disconnected events."""
await self._call_event_handler("on_disconnected")
async def _on_before_disconnect(self):
"""Handle before disconnection room events."""
await self._call_event_handler("on_before_disconnect")
async def _on_participant_connected(self, participant_id: str):
"""Handle participant connected events."""
await self._call_event_handler("on_participant_connected", participant_id)

View File

@@ -14,33 +14,13 @@ and async cleanup for all Pipecat components.
import asyncio
import inspect
from abc import ABC
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from typing import Optional
from loguru import logger
from pipecat.utils.utils import obj_count, obj_id
@dataclass
class EventHandler:
"""Data class to store event handlers information.
This data class stores the event name, a list of handlers to run for this
event, and whether these handlers will be executed in a task.
Attributes:
name (str): The name of the event handler.
handlers (List[Any]): A list of functions to be called when this event is triggered.
is_sync (bool): Indicates whether the functions are executed in a task.
"""
name: str
handlers: List[Any]
is_sync: bool
class BaseObject(ABC):
"""Abstract base class providing common functionality for Pipecat objects.
@@ -61,7 +41,7 @@ class BaseObject(ABC):
self._name = name or f"{self.__class__.__name__}#{obj_count(self)}"
# Registered event handlers.
self._event_handlers: Dict[str, EventHandler] = {}
self._event_handlers: dict = {}
# Set of tasks being executed. When a task finishes running it gets
# automatically removed from the set. When we cleanup we wait for all
@@ -123,21 +103,18 @@ class BaseObject(ABC):
Can be sync or async.
"""
if event_name in self._event_handlers:
self._event_handlers[event_name].handlers.append(handler)
self._event_handlers[event_name].append(handler)
else:
logger.warning(f"Event handler {event_name} not registered")
def _register_event_handler(self, event_name: str, sync: bool = False):
def _register_event_handler(self, event_name: str):
"""Register an event handler type.
Args:
event_name: The name of the event type to register.
sync: Whether this event handler will be executed in a task.
"""
if event_name not in self._event_handlers:
self._event_handlers[event_name] = EventHandler(
name=event_name, handlers=[], is_sync=sync
)
self._event_handlers[event_name] = []
else:
logger.warning(f"Event handler {event_name} not registered")
@@ -149,43 +126,34 @@ class BaseObject(ABC):
*args: Positional arguments to pass to event handlers.
**kwargs: Keyword arguments to pass to event handlers.
"""
if event_name not in self._event_handlers:
# If we haven't registered an event handler, we don't need to do
# anything.
if not self._event_handlers.get(event_name):
return
event_handler = self._event_handlers[event_name]
# Create the task.
task = asyncio.create_task(self._run_task(event_name, *args, **kwargs))
for handler in event_handler.handlers:
if event_handler.is_sync:
# Just run the handler.
await self._run_handler(event_handler.name, handler, *args, **kwargs)
else:
# Create the task. Note that this is a task per each function
# handler. Users can register to an event handler multiple
# times.
task = asyncio.create_task(
self._run_handler(event_handler.name, handler, *args, **kwargs)
)
# Add it to our list of event tasks.
self._event_tasks.add((event_name, task))
# Add it to our list of event tasks.
self._event_tasks.add((event_name, task))
# Remove the task from the event tasks list when the task completes.
task.add_done_callback(self._event_task_finished)
# Remove the task from the event tasks list when the task completes.
task.add_done_callback(self._event_task_finished)
async def _run_handler(self, event_name: str, handler, *args, **kwargs):
async def _run_task(self, event_name: str, *args, **kwargs):
"""Execute all handlers for an event.
Args:
event_name: The event name for this handler.
handler: The handler function to run.
event_name: The name of the event being handled.
*args: Positional arguments to pass to handlers.
**kwargs: Keyword arguments to pass to handlers.
"""
try:
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
for handler in self._event_handlers[event_name]:
if inspect.iscoroutinefunction(handler):
await handler(self, *args, **kwargs)
else:
handler(self, *args, **kwargs)
except Exception as e:
logger.exception(f"Exception in event handler {event_name}: {e}")

View File

@@ -31,6 +31,7 @@ class TestUtilsString(unittest.IsolatedAsyncioTestCase):
assert match_endofsentence("Valid scientific notation 1.23e4.") == 33
assert match_endofsentence("Valid scientific notation 0.e4.") == 31
assert match_endofsentence("It still early, it's 3:00 a.m.") == 30
assert match_endofsentence("That'll be $20.00.") == 18
assert not match_endofsentence("This is not a sentence")
assert not match_endofsentence("This is not a sentence,")
assert not match_endofsentence("This is not a sentence, ")