Compare commits

...

38 Commits

Author SHA1 Message Date
James Hush
45c790c7d5 feat: Add debug logging to Gladia STT service
- Log transcript processing including confidence checks
2025-09-18 11:51:53 +08:00
Mark Backman
9e7260393a Merge pull request #2671 from pipecat-ai/mb/fix-asyncai-ttstextframe
fix: AsyncAITTSService wasn't pushing TTSTextFrames
2025-09-17 14:06:41 -07:00
Mark Backman
073b585c52 fix: AsyncAITTSService wasn't pushing TTSTextFrames 2025-09-17 16:54:18 -04:00
Aleix Conchillo Flaqué
81c2e51bec Merge pull request #2669 from pipecat-ai/aleix/interruption-task-frame-wait-fixes
interruption task frame wait fixes
2025-09-17 13:47:57 -07:00
Aleix Conchillo Flaqué
42344125b1 tests: add unit tests for push_interruption_task_frame_and_wait() 2025-09-17 13:38:22 -07:00
Aleix Conchillo Flaqué
db5bcfaa51 FrameProcessor: fix push_interruption_task_frame_and_wait() 2025-09-17 13:38:21 -07:00
kompfner
615239b7d2 Merge pull request #2646 from pipecat-ai/pk/service-switcher-unit-tests
`ServiceSwitcher` unit tests (ended up being much more than that)
2025-09-17 16:30:18 -04:00
Paul Kompfner
27f1e9dd69 Update CHANGELOG with a description of the recently-fixed ServiceSwitcher bugs 2025-09-17 16:27:12 -04:00
Paul Kompfner
bd760deff2 Update comment with more detail for posterity 2025-09-17 16:19:31 -04:00
Paul Kompfner
8bc3c89140 Fix a bug preventing usage of multiple ServiceSwitchers in a pipeline 2025-09-17 16:09:18 -04:00
Paul Kompfner
2cd2567a37 Add a unit tests validating that multiple ServiceSwitchers can be used in the same pipeline (currently failing) 2025-09-17 16:04:30 -04:00
Paul Kompfner
5b55988846 Denote a couple of variables are private with a leading underscore 2025-09-17 15:38:28 -04:00
Paul Kompfner
a12392182c Simplify, undoing the change allowing controlling ServiceSwitcher with immediate frames (SystemFrames). Service switcher frames are ControlFrames, which are easier to reason about. We can always build the immediate option later if needed (i.e. if there's sufficient user pull for it) 2025-09-17 15:35:02 -04:00
Paul Kompfner
b814b70e1e Allow controlling ServiceSwitcher with either immediate frames (SystemFrames) or in-order frames (ControlFrames).
Immediate is the "default", i.e. has the more obvious name (e.g. `ManuallySwitchServiceFrame` v `ManuallySwitchServiceControlFrame`), since that's *probably* what users will want to reach for. Also, the immediate frames are more likely to behave like what we had before the last few commits, where the service switch would always "jump the queue" by having an immediate effect once it hit the `ServiceSwitcher` in the pipeline, jumping ahead of frames in front of it destined for the service.
2025-09-17 15:35:02 -04:00
Paul Kompfner
a1f84e1b50 Remove extraneous unit tests 2025-09-17 15:35:02 -04:00
Paul Kompfner
0839b48da8 Fix an issue where the upstream ServiceSwitcherFilter wouldn't get updated with the current active service 2025-09-17 15:35:02 -04:00
Paul Kompfner
de51637b77 Update ServiceSwitcher so that ServiceSwitcherFrames (which might update the currently active service) are processed and have an effect at the expected time. We should be able to, for example, queue:
- A text frame
- A `ManuallySwitchServiceFrame` (which is a `ServiceSwitcherFrame`)
- Another text frame

And expect that the first text frame be handled by the initially active service and the second text frame be handled by the newly active one.

Previously, the `ManuallySwitchServiceFrame` would have an effect too early, causing both text frames to be handled by the newly active service. Why? Because the frame filtering condition was being updated *directly* by the `ServiceSwitcher`, which is upstream from the services it's switching between. It could therefore update the filters *before* the services received the prior frames.
2025-09-17 15:35:02 -04:00
Paul Kompfner
e1b1dc16ec Add unit tests for ServiceSwitcher 2025-09-17 15:35:02 -04:00
Mark Backman
1fe27eb0a2 Merge pull request #2660 from pipecat-ai/mb/fix-user-idle-processor-cancel-task
fix: clean up how UserIdleProcessor handles return False
2025-09-16 14:48:59 -07:00
Mark Backman
d7e1389497 fix: clean up how UserIdleProcessor handles return False 2025-09-16 17:44:06 -04:00
Aleix Conchillo Flaqué
8c7230aa8f Merge pull request #2668 from pipecat-ai/aleix/livekit-update
livekit package update
2025-09-16 14:43:18 -07:00
Aleix Conchillo Flaqué
2cf71239b0 examples(01b): use TTSSpeakFrame instead of TextFrame 2025-09-16 17:18:45 -04:00
Aleix Conchillo Flaqué
ec2c62e32b pyproject: update to livekit 1.0.13
Fixes #2643
2025-09-16 17:18:44 -04:00
Mark Backman
38ce85e9a0 Merge pull request #2667 from zytegalaxy/mcp-serverparameters-typefix
fix: replace `Tuple` type with `TypeAlias` for server params in MCP client
2025-09-16 14:14:59 -07:00
Mark Backman
2279e5a899 Merge pull request #2663 from pipecat-ai/mb/websockets-15
Add support for websockets 15.0
2025-09-16 14:08:36 -07:00
Mark Backman
cce6eb5d87 Merge pull request #2666 from pipecat-ai/mb/update-38b-local-turn-model
38b: Update bundled ONNX smart-turn model
2025-09-16 14:05:12 -07:00
mehrdad
c2b98ae557 fix(lint): fix space format issue 2025-09-16 13:44:15 -07:00
Filipi da Silva Fuchter
727eb12b16 Merge pull request #2648 from pipecat-ai/filipi/pcc_small_webrtc
Creating SmallWebRTCRequestHandler for managing peer connections.
2025-09-16 16:37:04 -03:00
mehrdad
ba96bd05d3 fix: replace Tuple type with TypeAlias for server params in MCP client 2025-09-16 11:44:25 -07:00
Mark Backman
8ead309f8d 38b: Update bundled ONNX smart-turn model 2025-09-16 13:17:14 -04:00
Mark Backman
fad0e55c64 Add websockets-base optional dependency and use for DRY pyproject.toml 2025-09-16 11:24:38 -04:00
Mark Backman
74b1af56a0 Update uv.lock 2025-09-16 11:21:49 -04:00
Mark Backman
6924850ec4 Add support for websockets 15.0 2025-09-16 11:21:49 -04:00
marcus-daily
dfe7815dc5 Smart Turn v3: removing torch and torchaudio deps 2025-09-16 16:02:41 +01:00
Filipi Fuchter
0a043154f2 Removing not used import. 2025-09-15 10:46:43 -03:00
Filipi Fuchter
5e322eba9e Supporting both single and multiple connection modes. 2025-09-15 10:43:46 -03:00
Filipi Fuchter
11d0c3d46d Refactoring SmallWebRTCRequestHandler. 2025-09-15 09:58:44 -03:00
Filipi Fuchter
95f72f6dce Creating SmallWebRTCRequestHandler for managing peer connections. 2025-09-12 18:15:24 -03:00
18 changed files with 2115 additions and 1373 deletions

View File

@@ -19,6 +19,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
...
```
### Changed
- Updated `livekit` to 1.0.13.
- `torch` and `torchaudio` are no longer required for running Smart Turn
locally. This avoids gigabytes of dependencies being installed.
- Updated `websockets` dependency to support version 15.0. Removed deprecated
usage of `ConnectionClosed.code` and `ConnectionClosed.reason` attributes in
`AWSTranscribeSTTService` for compatibility.
- Refactored `pyproject.toml` to reduce websockets dependency repetition using
self-referencing extras. All websockets-dependent services now reference a
shared `websockets-base` extra.
### Deprecated
- `PipelineTask` events `on_pipeline_stopped`, `on_pipeline_ended` and
@@ -27,6 +42,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed an issue in `AsyncAITTSService`, where `TTSTextFrames` were not being
pushed.
- Fixed an issue that would cause `push_interruption_task_frame_and_wait()` to
not wait if a previous interruption had already happened.
- Fixed a couple of bugs in `ServiceSwitcher`:
- Using multiple `ServiceSwitcher`s in a pipeline would result in an error.
- `ServiceSwitcherFrame`s (such as `ManuallySwitchServiceFrame`s) were having
an effect too early, essentially "jumping the queue" in terms of pipeline
frame ordering.
- Fixed a self-cancellation deadlock in `UserIdleProcessor` when returning
`False` from an idle callback. The task now terminates naturally instead of
attempting to cancel itself.
- Fixed an issue in `AudioBufferProcessor` where a recording is not created
when a bot speaks and user input is blocked.

View File

@@ -11,7 +11,7 @@ import sys
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import TextFrame
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -50,7 +50,7 @@ async def main():
async def on_first_participant_joined(transport, participant_id):
await asyncio.sleep(1)
await task.queue_frame(
TextFrame(
TTSSpeakFrame(
"Hello there! How are you doing today? Would you like to talk about the weather?"
)
)

View File

@@ -30,10 +30,6 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# To use this locally, set the environment variable LOCAL_SMART_TURN_MODEL_PATH
# to the Smart Turn v3 ONNX model file.
smart_turn_model_path = os.getenv("LOCAL_SMART_TURN_MODEL_PATH")
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -42,25 +38,19 @@ transport_params = {
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(
smart_turn_model_path=smart_turn_model_path, params=SmartTurnParams()
),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}

View File

@@ -47,32 +47,32 @@ Website = "https://pipecat.ai"
[project.optional-dependencies]
aic = [ "aic-sdk~=1.0.1" ]
anthropic = [ "anthropic~=0.49.0" ]
assemblyai = [ "websockets>=13.1,<15.0" ]
asyncai = [ "websockets>=13.1,<15.0" ]
aws = [ "aioboto3~=15.0.0", "websockets>=13.1,<15.0" ]
assemblyai = [ "pipecat-ai[websockets-base]" ]
asyncai = [ "pipecat-ai[websockets-base]" ]
aws = [ "aioboto3~=15.0.0", "pipecat-ai[websockets-base]" ]
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.0.2; python_version>='3.12'" ]
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
cartesia = [ "cartesia~=2.0.3", "websockets>=13.1,<15.0" ]
cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ]
cerebras = []
deepseek = []
daily = [ "daily-python~=0.19.9" ]
deepgram = [ "deepgram-sdk~=4.7.0" ]
elevenlabs = [ "websockets>=13.1,<15.0" ]
elevenlabs = [ "pipecat-ai[websockets-base]" ]
fal = [ "fal-client~=0.5.9" ]
fireworks = []
fish = [ "ormsgpack~=1.7.0", "websockets>=13.1,<15.0" ]
gladia = [ "websockets>=13.1,<15.0" ]
google = [ "google-cloud-speech~=2.32.0", "google-cloud-texttospeech~=2.26.0", "google-genai~=1.24.0", "websockets>=13.1,<15.0" ]
fish = [ "ormsgpack~=1.7.0", "pipecat-ai[websockets-base]" ]
gladia = [ "pipecat-ai[websockets-base]" ]
google = [ "google-cloud-speech~=2.32.0", "google-cloud-texttospeech~=2.26.0", "google-genai~=1.24.0", "pipecat-ai[websockets-base]" ]
grok = []
groq = [ "groq~=0.23.0" ]
gstreamer = [ "pygobject~=3.50.0" ]
heygen = [ "livekit>=0.22.0", "websockets>=13.1,<15.0" ]
heygen = [ "livekit>=1.0.13", "pipecat-ai[websockets-base]" ]
inworld = []
krisp = [ "pipecat-ai-krisp~=0.4.0" ]
koala = [ "pvkoala~=2.0.3" ]
langchain = [ "langchain~=0.3.20", "langchain-community~=0.3.20", "langchain-openai~=0.3.9" ]
livekit = [ "livekit~=0.22.0", "livekit-api~=0.8.2", "tenacity>=8.2.3,<10.0.0" ]
lmnt = [ "websockets>=13.1,<15.0" ]
livekit = [ "livekit~=1.0.13", "livekit-api~=1.0.5", "tenacity>=8.2.3,<10.0.0" ]
lmnt = [ "pipecat-ai[websockets-base]" ]
local = [ "pyaudio~=0.2.14" ]
mcp = [ "mcp[cli]~=1.9.4" ]
mem0 = [ "mem0ai~=0.1.94" ]
@@ -80,26 +80,26 @@ mistral = []
mlx-whisper = [ "mlx-whisper~=0.4.2" ]
moondream = [ "accelerate~=1.10.0", "einops~=0.8.0", "pyvips[binary]~=3.0.0", "timm~=1.0.13", "transformers>=4.48.0" ]
nim = []
neuphonic = [ "websockets>=13.1,<15.0" ]
neuphonic = [ "pipecat-ai[websockets-base]" ]
noisereduce = [ "noisereduce~=3.0.3" ]
openai = [ "websockets>=13.1,<15.0" ]
openai = [ "pipecat-ai[websockets-base]" ]
openpipe = [ "openpipe~=4.50.0" ]
openrouter = []
perplexity = []
playht = [ "websockets>=13.1,<15.0" ]
playht = [ "pipecat-ai[websockets-base]" ]
qwen = []
rime = [ "websockets>=13.1,<15.0" ]
rime = [ "pipecat-ai[websockets-base]" ]
riva = [ "nvidia-riva-client~=2.21.1" ]
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<0.117.0", "pipecat-ai-small-webrtc-prebuilt>=1.0.0"]
sambanova = []
sarvam = [ "websockets>=13.1,<15.0" ]
sarvam = [ "pipecat-ai[websockets-base]" ]
sentry = [ "sentry-sdk~=2.23.1" ]
local-smart-turn = [ "coremltools>=8.0", "transformers", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3" ]
local-smart-turn-v3 = [ "transformers", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3", "onnxruntime>=1.20.1, <2" ]
local-smart-turn-v3 = [ "transformers", "onnxruntime>=1.20.1, <2" ]
remote-smart-turn = []
silero = [ "onnxruntime>=1.20.1, <2" ]
simli = [ "simli-ai~=0.1.10"]
soniox = [ "websockets>=13.1,<15.0" ]
soniox = [ "pipecat-ai[websockets-base]" ]
soundfile = [ "soundfile~=0.13.0" ]
speechmatics = [ "speechmatics-rt>=0.4.0" ]
tavus=[]
@@ -107,7 +107,8 @@ together = []
tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ]
ultravox = [ "transformers>=4.48.0", "vllm>=0.9.0" ]
webrtc = [ "aiortc~=1.11.0", "opencv-python~=4.11.0.86" ]
websocket = [ "websockets>=13.1,<15.0", "fastapi>=0.115.6,<0.117.0" ]
websocket = [ "pipecat-ai[websockets-base]", "fastapi>=0.115.6,<0.117.0" ]
websockets-base = [ "websockets>=13.1,<16.0" ]
whisper = [ "faster-whisper~=1.1.1" ]
[dependency-groups]

View File

@@ -98,15 +98,15 @@ class LocalSmartTurnAnalyzerV3(BaseSmartTurn):
inputs = self._feature_extractor(
audio_array,
sampling_rate=16000,
return_tensors="pt",
return_tensors="np",
padding="max_length",
max_length=8 * 16000,
truncation=True,
do_normalize=True,
)
# Convert to numpy and ensure correct shape for ONNX
input_features = inputs.input_features.squeeze(0).numpy().astype(np.float32)
# Extract features and ensure correct shape for ONNX
input_features = inputs.input_features.squeeze(0).astype(np.float32)
input_features = np.expand_dims(input_features, axis=0) # Add batch dimension
# Run ONNX inference

View File

@@ -1604,7 +1604,7 @@ class MixerEnableFrame(MixerControlFrame):
@dataclass
class ServiceSwitcherFrame(ControlFrame):
"""A base class for frames that control ServiceSwitcher behavior."""
"""A base class for frames that affect ServiceSwitcher behavior."""
pass

View File

@@ -6,9 +6,15 @@
"""Service switcher for switching between different services at runtime, with different switching strategies."""
from dataclasses import dataclass
from typing import Any, Generic, List, Optional, Type, TypeVar
from pipecat.frames.frames import Frame, ManuallySwitchServiceFrame, ServiceSwitcherFrame
from pipecat.frames.frames import (
ControlFrame,
Frame,
ManuallySwitchServiceFrame,
ServiceSwitcherFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@@ -22,19 +28,6 @@ class ServiceSwitcherStrategy:
self.services = services
self.active_service: Optional[FrameProcessor] = None
def is_active(self, service: FrameProcessor) -> bool:
"""Determine if the given service is the currently active one.
This method should be overridden by subclasses to implement specific logic.
Args:
service: The service to check.
Returns:
True if the given service is the active one, False otherwise.
"""
raise NotImplementedError("Subclasses must implement this method.")
def handle_frame(self, frame: ServiceSwitcherFrame, direction: FrameDirection):
"""Handle a frame that controls service switching.
@@ -60,17 +53,6 @@ class ServiceSwitcherStrategyManual(ServiceSwitcherStrategy):
super().__init__(services)
self.active_service = services[0] if services else None
def is_active(self, service: FrameProcessor) -> bool:
"""Check if the given service is the currently active one.
Args:
service: The service to check.
Returns:
True if the given service is the active one, False otherwise.
"""
return service == self.active_service
def handle_frame(self, frame: ServiceSwitcherFrame, direction: FrameDirection):
"""Handle a frame that controls service switching.
@@ -79,20 +61,21 @@ class ServiceSwitcherStrategyManual(ServiceSwitcherStrategy):
direction: The direction of the frame (upstream or downstream).
"""
if isinstance(frame, ManuallySwitchServiceFrame):
self._set_active(frame.service)
self._set_active_if_available(frame.service)
else:
raise ValueError(f"Unsupported frame type: {type(frame)}")
def _set_active(self, service: FrameProcessor):
"""Set the active service to the given one.
def _set_active_if_available(self, service: FrameProcessor):
"""Set the active service to the given one, if it is in the list of available services.
If it's not in the list, the request is ignored, as it may have been
intended for another ServiceSwitcher in the pipeline.
Args:
service: The service to set as active.
"""
if service in self.services:
self.active_service = service
else:
raise ValueError(f"Service {service} is not in the list of available services.")
StrategyType = TypeVar("StrategyType", bound=ServiceSwitcherStrategy)
@@ -108,6 +91,43 @@ class ServiceSwitcher(ParallelPipeline, Generic[StrategyType]):
self.services = services
self.strategy = strategy
class ServiceSwitcherFilter(FunctionFilter):
"""An internal filter that allows frames to pass through to the wrapped service only if it's the active service."""
def __init__(
self,
wrapped_service: FrameProcessor,
active_service: FrameProcessor,
direction: FrameDirection,
):
"""Initialize the service switcher filter with a strategy and direction."""
async def filter(_: Frame) -> bool:
return self._wrapped_service == self._active_service
super().__init__(filter, direction)
self._wrapped_service = wrapped_service
self._active_service = active_service
async def process_frame(self, frame, direction):
"""Process a frame through the filter, handling special internal filter-updating frames."""
if isinstance(frame, ServiceSwitcher.ServiceSwitcherFilterFrame):
self._active_service = frame.active_service
# Two ServiceSwitcherFilters "sandwich" a service. Push the
# frame only to update the other side of the sandwich, but
# otherwise don't let it leave the sandwich.
if direction == self._direction:
await self.push_frame(frame, direction)
return
await super().process_frame(frame, direction)
@dataclass
class ServiceSwitcherFilterFrame(ControlFrame):
"""An internal frame used by ServiceSwitcher to filter frames based on active service."""
active_service: FrameProcessor
@staticmethod
def _make_pipeline_definitions(
services: List[FrameProcessor], strategy: ServiceSwitcherStrategy
@@ -121,14 +141,18 @@ class ServiceSwitcher(ParallelPipeline, Generic[StrategyType]):
def _make_pipeline_definition(
service: FrameProcessor, strategy: ServiceSwitcherStrategy
) -> Any:
async def filter(frame) -> bool:
_ = frame
return strategy.is_active(service)
return [
FunctionFilter(filter, direction=FrameDirection.DOWNSTREAM),
ServiceSwitcher.ServiceSwitcherFilter(
wrapped_service=service,
active_service=strategy.active_service,
direction=FrameDirection.DOWNSTREAM,
),
service,
FunctionFilter(filter, direction=FrameDirection.UPSTREAM),
ServiceSwitcher.ServiceSwitcherFilter(
wrapped_service=service,
active_service=strategy.active_service,
direction=FrameDirection.UPSTREAM,
),
]
async def process_frame(self, frame: Frame, direction: FrameDirection):
@@ -142,3 +166,14 @@ class ServiceSwitcher(ParallelPipeline, Generic[StrategyType]):
if isinstance(frame, ServiceSwitcherFrame):
self.strategy.handle_frame(frame, direction)
service_switcher_filter_frame = ServiceSwitcher.ServiceSwitcherFilterFrame(
active_service=self.strategy.active_service
)
# Hack: we need access ParallelPipeline internals here to queue the
# frame in each of the pipelines.
# Why not just call super().process_frame(service_switcher_filter_frame, direction),
# you ask? Because that would also send this internal-only frame
# down the "main" pipeline instead of only down the individual
# branches of the parallel pipeline, which we want to avoid.
for p in self._pipelines:
await p.queue_frame(service_switcher_filter_frame, direction)

View File

@@ -220,6 +220,11 @@ class FrameProcessor(BaseObject):
self.__process_event: Optional[asyncio.Event] = None
self.__process_frame_task: Optional[asyncio.Task] = None
# To interrupt a pipeline, we push an `InterruptionTaskFrame` upstream.
# Then we wait for the corresponding `InterruptionFrame` to travel from
# the start of the pipeline back to the processor that sent the
# `InterruptionTaskFrame`. This wait is handled using the following
# event.
self._wait_for_interruption = False
self._wait_interruption_event = asyncio.Event()
@@ -632,7 +637,9 @@ class FrameProcessor(BaseObject):
await self.__internal_push_frame(frame, direction)
if isinstance(frame, InterruptionFrame):
# If we are waiting for an interruption and we get an interruption, then
# we can unblock `push_interruption_task_frame_and_wait()`.
if self._wait_for_interruption and isinstance(frame, InterruptionFrame):
self._wait_interruption_event.set()
async def push_interruption_task_frame_and_wait(self):

View File

@@ -17,7 +17,6 @@ from pipecat.frames.frames import (
Frame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
StartFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
@@ -185,15 +184,13 @@ class UserIdleProcessor(FrameProcessor):
Runs in a loop until cancelled or callback indicates completion.
"""
while True:
running = True
while running:
try:
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
except asyncio.TimeoutError:
if not self._interrupted:
self._retry_count += 1
should_continue = await self._callback(self, self._retry_count)
if not should_continue:
await self._stop()
break
running = await self._callback(self, self._retry_count)
finally:
self._idle_event.clear()

View File

@@ -70,7 +70,6 @@ import asyncio
import os
import sys
from contextlib import asynccontextmanager
from typing import Dict
from loguru import logger
@@ -79,6 +78,10 @@ from pipecat.runner.types import (
SmallWebRTCRunnerArguments,
WebSocketRunnerArguments,
)
from pipecat.transports.smallwebrtc.request_handler import (
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)
try:
import uvicorn
@@ -187,9 +190,6 @@ def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "lo
logger.error(f"WebRTC transport dependencies not installed: {e}")
return
# Store connections by pc_id
pcs_map: Dict[str, SmallWebRTCConnection] = {}
# Mount the frontend
app.mount("/client", SmallWebRTCPrebuiltUI)
@@ -198,51 +198,33 @@ def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "lo
"""Redirect root requests to client interface."""
return RedirectResponse(url="/client/")
# Initialize the SmallWebRTC request handler
small_webrtc_handler: SmallWebRTCRequestHandler = SmallWebRTCRequestHandler(
esp32_mode=esp32_mode, host=host
)
@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
"""Handle WebRTC offer requests and manage peer connections."""
pc_id = request.get("pc_id")
if pc_id and pc_id in pcs_map:
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(
sdp=request["sdp"],
type=request["type"],
restart_pc=request.get("restart_pc", False),
)
else:
pipecat_connection = SmallWebRTCConnection()
await pipecat_connection.initialize(sdp=request["sdp"], type=request["type"])
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
"""Handle WebRTC connection closure and cleanup."""
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
pcs_map.pop(webrtc_connection.pc_id, None)
async def offer(request: SmallWebRTCRequest, background_tasks: BackgroundTasks):
"""Handle WebRTC offer requests via SmallWebRTCRequestHandler."""
# Prepare runner arguments with the callback to run your bot
async def webrtc_connection_callback(connection):
bot_module = _get_bot_module()
runner_args = SmallWebRTCRunnerArguments(webrtc_connection=pipecat_connection)
runner_args = SmallWebRTCRunnerArguments(webrtc_connection=connection)
background_tasks.add_task(bot_module.bot, runner_args)
answer = pipecat_connection.get_answer()
# Apply ESP32 SDP munging if enabled
if esp32_mode and host != "localhost":
from pipecat.runner.utils import smallwebrtc_sdp_munging
answer["sdp"] = smallwebrtc_sdp_munging(answer["sdp"], host)
pcs_map[answer["pc_id"]] = pipecat_connection
# Delegate handling to SmallWebRTCRequestHandler
answer = await small_webrtc_handler.handle_web_request(
request=request,
webrtc_connection_callback=webrtc_connection_callback,
)
return answer
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage FastAPI application lifecycle and cleanup connections."""
yield
coros = [pc.disconnect() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
await small_webrtc_handler.close()
app.router.lifespan_context = lifespan

View File

@@ -119,7 +119,6 @@ class AsyncAITTSService(InterruptibleTTSService):
"""
super().__init__(
aggregate_sentences=aggregate_sentences,
push_text_frames=False,
pause_frame_processing=True,
push_stop_frames=True,
sample_rate=sample_rate,

View File

@@ -532,9 +532,7 @@ class AWSTranscribeSTTService(STTService):
logger.debug(f"{self} Other message type received: {headers}")
logger.debug(f"{self} Payload: {payload}")
except websockets.exceptions.ConnectionClosed as e:
logger.error(
f"{self} WebSocket connection closed in receive loop with code {e.code}: {e.reason}"
)
logger.error(f"{self} WebSocket connection closed in receive loop: {e}")
break
except Exception as e:
logger.error(f"{self} Unexpected error in receive loop: {e}")

View File

@@ -579,6 +579,7 @@ class GladiaSTTService(STTService):
language = utterance["language"]
transcript = utterance["text"]
is_final = content["data"]["is_final"]
if confidence >= self._confidence:
if is_final:
await self.push_frame(
@@ -605,6 +606,10 @@ class GladiaSTTService(STTService):
result=content,
)
)
else:
logger.debug(
f"{self} Skipping transcript due to low confidence: '{transcript}' (confidence: {confidence} < {self._confidence})"
)
elif content["type"] == "translation":
translated_utterance = content["data"]["translated_utterance"]
original_language = content["data"]["original_language"]

View File

@@ -7,7 +7,7 @@
"""MCP (Model Context Protocol) client for integrating external tools with LLMs."""
import json
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, TypeAlias
from loguru import logger
@@ -28,6 +28,8 @@ except ModuleNotFoundError as e:
logger.error("In order to use an MCP client, you need to `pip install pipecat-ai[mcp]`.")
raise Exception(f"Missing module: {e}")
ServerParameters: TypeAlias = StdioServerParameters | SseServerParameters | StreamableHttpParameters
class MCPClient(BaseObject):
"""Client for Model Context Protocol (MCP) servers.
@@ -42,7 +44,7 @@ class MCPClient(BaseObject):
def __init__(
self,
server_params: Tuple[StdioServerParameters, SseServerParameters, StreamableHttpParameters],
server_params: ServerParameters,
**kwargs,
):
"""Initialize the MCP client with server parameters.

View File

@@ -0,0 +1,200 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""SmallWebRTC request handler for managing peer connections.
This module provides a client for handling web requests and managing WebRTC connections.
"""
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Any, Awaitable, Callable, Dict, List, Optional
from fastapi import HTTPException
from loguru import logger
from pipecat.transports.smallwebrtc.connection import IceServer, SmallWebRTCConnection
@dataclass
class SmallWebRTCRequest:
"""Small WebRTC transport session arguments for the runner.
Parameters:
sdp: The SDP string (Session Description Protocol).
type: The type of the SDP, either "offer" or "answer".
pc_id: Optional identifier for the peer connection.
restart_pc: Optional whether to restart the peer connection.
request_data: Optional custom data sent by the customer.
"""
sdp: str
type: str
pc_id: Optional[str] = None
restart_pc: Optional[bool] = None
request_data: Optional[Any] = None
class ConnectionMode(Enum):
"""Enum defining the connection handling modes."""
SINGLE = "single" # Only one active connection allowed
MULTIPLE = "multiple" # Multiple simultaneous connections allowed
class SmallWebRTCRequestHandler:
"""SmallWebRTC request handler for managing peer connections.
This class is responsible for:
- Handling incoming SmallWebRTC requests.
- Creating and managing WebRTC peer connections.
- Supporting ESP32-specific SDP munging if enabled.
- Invoking callbacks for newly initialized connections.
- Supporting both single and multiple connection modes.
"""
def __init__(
self,
ice_servers: Optional[List[IceServer]] = None,
esp32_mode: bool = False,
host: Optional[str] = None,
connection_mode: ConnectionMode = ConnectionMode.MULTIPLE,
) -> None:
"""Initialize a SmallWebRTC request handler.
Args:
ice_servers (Optional[List[IceServer]]): List of ICE servers to use for WebRTC
connections.
esp32_mode (bool): If True, enables ESP32-specific SDP munging.
host (Optional[str]): Host address used for SDP munging in ESP32 mode.
Ignored if `esp32_mode` is False.
connection_mode (ConnectionMode): Mode of operation for handling connections.
SINGLE allows only one active connection, MULTIPLE allows several.
"""
self._ice_servers = ice_servers
self._esp32_mode = esp32_mode
self._host = host
self._connection_mode = connection_mode
# Store connections by pc_id
self._pcs_map: Dict[str, SmallWebRTCConnection] = {}
def _check_single_connection_constraints(self, pc_id: Optional[str]) -> None:
"""Check if the connection request satisfies single connection mode constraints.
Args:
pc_id: The peer connection ID from the request
Raises:
HTTPException: If constraints are violated in single connection mode
"""
if self._connection_mode != ConnectionMode.SINGLE:
return
if not self._pcs_map: # No existing connections
return
# Get the existing connection (should be only one in single mode)
existing_connection = next(iter(self._pcs_map.values()))
if existing_connection.pc_id != pc_id and pc_id:
logger.warning(
f"Connection pc_id mismatch: existing={existing_connection.pc_id}, received={pc_id}"
)
raise HTTPException(status_code=400, detail="PC ID mismatch with existing connection")
if not pc_id:
logger.warning(
"Cannot create new connection: existing connection found but no pc_id received"
)
raise HTTPException(
status_code=400,
detail="Cannot create new connection with existing connection active",
)
async def handle_web_request(
self,
request: SmallWebRTCRequest,
webrtc_connection_callback: Callable[[Any], Awaitable[None]],
) -> None:
"""Handle a SmallWebRTC request and resolve the pending answer.
This method will:
- Reuse an existing WebRTC connection if `pc_id` exists.
- Otherwise, create a new `SmallWebRTCConnection`.
- Invoke the provided callback with the connection.
- Manage ESP32-specific munging if enabled.
- Enforce single/multiple connection mode constraints.
Args:
request (SmallWebRTCRequest): The incoming WebRTC request, containing
SDP, type, and optionally a `pc_id`.
webrtc_connection_callback (Callable[[Any], Awaitable[None]]): An
asynchronous callback function that is invoked with the WebRTC connection.
Raises:
HTTPException: If connection mode constraints are violated
Exception: Any exception raised during request handling or callback execution
will be logged and propagated.
"""
try:
pc_id = request.pc_id
# Check connection mode constraints first
self._check_single_connection_constraints(pc_id)
# After constraints are satisfied, get the existing connection if any
existing_connection = self._pcs_map.get(pc_id) if pc_id else None
if existing_connection:
pipecat_connection = existing_connection
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(
sdp=request.sdp,
type=request.type,
restart_pc=request.restart_pc or False,
)
else:
pipecat_connection = SmallWebRTCConnection(ice_servers=self._ice_servers)
await pipecat_connection.initialize(sdp=request.sdp, type=request.type)
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"Discarding peer connection for pc_id: {webrtc_connection.pc_id}")
self._pcs_map.pop(webrtc_connection.pc_id, None)
# Invoke callback provided in runner arguments
try:
await webrtc_connection_callback(pipecat_connection)
logger.debug(
f"webrtc_connection_callback executed successfully for peer: {pipecat_connection.pc_id}"
)
except Exception as callback_error:
logger.error(
f"webrtc_connection_callback failed for peer {pipecat_connection.pc_id}: {callback_error}"
)
answer = pipecat_connection.get_answer()
if self._esp32_mode and self._host and self._host != "localhost":
from pipecat.runner.utils import smallwebrtc_sdp_munging
answer["sdp"] = smallwebrtc_sdp_munging(answer["sdp"], self._host)
self._pcs_map[answer["pc_id"]] = pipecat_connection
return answer
except Exception as e:
logger.error(f"Error processing SmallWebRTC request: {e}")
logger.debug(f"SmallWebRTC request details: {request}")
raise
async def close(self):
"""Clear the connection map."""
coros = [pc.disconnect() for pc in self._pcs_map.values()]
await asyncio.gather(*coros)
self._pcs_map.clear()

View File

@@ -0,0 +1,67 @@
#
# Copyright (c) 2024-2025 Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import unittest
from pipecat.frames.frames import (
EndFrame,
Frame,
InterruptionFrame,
TextFrame,
TransportMessageUrgentFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.tests.utils import SleepFrame, run_test
class TestFrameProcessor(unittest.IsolatedAsyncioTestCase):
async def test_interruption_and_wait(self):
class DelayFrameProcessor(FrameProcessor):
"""This processors just gives time to the event loop to change
between tasks. Otherwise things happen to fast."""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await asyncio.sleep(0.1)
await self.push_frame(frame, direction)
class InterruptFrameProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
await self.push_interruption_task_frame_and_wait()
await self.push_frame(TransportMessageUrgentFrame(message=frame.text))
else:
await self.push_frame(frame, direction)
pipeline = Pipeline([DelayFrameProcessor(), InterruptFrameProcessor()])
frames_to_send = [
# Just a random interruption to make sure we don't clear anything
# before the actual `InterruptionTaskFrame` interruption.
InterruptionFrame(),
# This will generate an `InterruptionTaskFrame` and will wait for an
# `InterruptionFrame`.
TextFrame(text="Hello from Pipecat!"),
# Just give time for everything to complete.
SleepFrame(sleep=0.5),
EndFrame(),
]
expected_down_frames = [
InterruptionFrame,
InterruptionFrame,
TransportMessageUrgentFrame,
EndFrame,
]
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
send_end_frame=False,
)

View File

@@ -0,0 +1,303 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Unit tests for ServiceSwitcher and related components."""
import unittest
from pipecat.frames.frames import (
Frame,
ManuallySwitchServiceFrame,
TextFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.service_switcher import ServiceSwitcher, ServiceSwitcherStrategyManual
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.tests.utils import run_test
class MockFrameProcessor(FrameProcessor):
"""A test frame processor that tracks which frames it has processed."""
def __init__(self, test_name: str, **kwargs):
"""Initialize the test processor with a name.
Args:
test_name: A unique name for this processor instance.
**kwargs: Additional arguments passed to the parent FrameProcessor.
"""
super().__init__(name=test_name, **kwargs)
self.test_name = test_name
self.processed_frames = []
self.frame_count = 0
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process an incoming frame and track it.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
self.processed_frames.append(frame)
self.frame_count += 1
await self.push_frame(frame, direction)
def reset_counters(self):
"""Reset the frame tracking counters."""
self.processed_frames = []
self.frame_count = 0
class TestServiceSwitcherStrategyManual(unittest.IsolatedAsyncioTestCase):
"""Test cases for ServiceSwitcherStrategyManual."""
def setUp(self):
"""Set up test fixtures."""
self.service1 = MockFrameProcessor("service1")
self.service2 = MockFrameProcessor("service2")
self.service3 = MockFrameProcessor("service3")
self.services = [self.service1, self.service2, self.service3]
def test_init_with_services(self):
"""Test initialization with a list of services."""
strategy = ServiceSwitcherStrategyManual(self.services)
self.assertEqual(strategy.services, self.services)
self.assertEqual(strategy.active_service, self.service1) # First service should be active
def test_init_with_empty_services(self):
"""Test initialization with an empty list of services."""
strategy = ServiceSwitcherStrategyManual([])
self.assertEqual(strategy.services, [])
self.assertIsNone(strategy.active_service)
def test_handle_manually_switch_service_frame(self):
"""Test manual service switching with ManuallySwitchServiceFrame."""
strategy = ServiceSwitcherStrategyManual(self.services)
# Initially service1 should be active
self.assertEqual(strategy.active_service, self.service1)
self.assertNotEqual(strategy.active_service, self.service2)
# Switch to service2
switch_frame = ManuallySwitchServiceFrame(service=self.service2)
strategy.handle_frame(switch_frame, FrameDirection.DOWNSTREAM)
self.assertNotEqual(strategy.active_service, self.service1)
self.assertEqual(strategy.active_service, self.service2)
self.assertNotEqual(strategy.active_service, self.service3)
# Switch to service3
switch_frame = ManuallySwitchServiceFrame(service=self.service3)
strategy.handle_frame(switch_frame, FrameDirection.DOWNSTREAM)
self.assertNotEqual(strategy.active_service, self.service1)
self.assertNotEqual(strategy.active_service, self.service2)
self.assertEqual(strategy.active_service, self.service3)
def test_handle_frame_unsupported_frame_type(self):
"""Test that unsupported frame types raise an error."""
strategy = ServiceSwitcherStrategyManual(self.services)
unsupported_frame = TextFrame(text="test") # Not a ServiceSwitcherFrame
with self.assertRaises(ValueError) as context:
strategy.handle_frame(unsupported_frame, FrameDirection.DOWNSTREAM)
self.assertIn("Unsupported frame type", str(context.exception))
class TestServiceSwitcher(unittest.IsolatedAsyncioTestCase):
"""Test cases for ServiceSwitcher."""
def setUp(self):
"""Set up test fixtures."""
self.service1 = MockFrameProcessor("service1")
self.service2 = MockFrameProcessor("service2")
self.service3 = MockFrameProcessor("service3")
self.services = [self.service1, self.service2, self.service3]
def test_init_with_manual_strategy(self):
"""Test initialization with manual strategy."""
switcher = ServiceSwitcher(self.services, ServiceSwitcherStrategyManual)
self.assertEqual(switcher.services, self.services)
self.assertIsInstance(switcher.strategy, ServiceSwitcherStrategyManual)
self.assertEqual(switcher.strategy.services, self.services)
async def test_default_active_service(self):
"""Test that the initially-active service receives frames while others don't."""
switcher = ServiceSwitcher(self.services, ServiceSwitcherStrategyManual)
# Reset counters
for service in self.services:
service.reset_counters()
# Send some test frames
frames_to_send = [
TextFrame(text="Hello 1"),
TextFrame(text="Hello 2"),
TextFrame(text="Hello 3"),
]
await run_test(
switcher,
frames_to_send=frames_to_send,
expected_down_frames=[TextFrame, TextFrame, TextFrame],
expected_up_frames=[], # Expect no error frames
)
# Only service1 should have processed the text frames
# Note: The service also receives StartFrame and EndFrame, so count those too
text_frames = [f for f in self.service1.processed_frames if isinstance(f, TextFrame)]
self.assertEqual(len(text_frames), 3)
# Check that other services don't receive text frames (they might get StartFrame/EndFrame)
service2_text_frames = [
f for f in self.service2.processed_frames if isinstance(f, TextFrame)
]
service3_text_frames = [
f for f in self.service3.processed_frames if isinstance(f, TextFrame)
]
self.assertEqual(len(service2_text_frames), 0)
self.assertEqual(len(service3_text_frames), 0)
# Verify the actual text frames processed
for i, frame in enumerate(text_frames):
self.assertEqual(frame.text, f"Hello {i + 1}")
async def test_service_switching(self):
"""Test that after service switching using ManuallySwitchServiceFrame, the new active service receives frames while others don't."""
switcher = ServiceSwitcher(self.services, ServiceSwitcherStrategyManual)
# Reset counters
for service in self.services:
service.reset_counters()
# Send a test frame, a switch frame, and another test frame
await run_test(
switcher,
frames_to_send=[
TextFrame("Hello 1"),
ManuallySwitchServiceFrame(service=self.service2),
TextFrame("Hello 2"),
],
expected_down_frames=[TextFrame, ManuallySwitchServiceFrame, TextFrame],
expected_up_frames=[], # Expect no error frames
)
# Verify service2 received the frame
service1_text_frames = [
f for f in self.service1.processed_frames if isinstance(f, TextFrame)
]
service2_text_frames = [
f for f in self.service2.processed_frames if isinstance(f, TextFrame)
]
service3_text_frames = [
f for f in self.service3.processed_frames if isinstance(f, TextFrame)
]
self.assertEqual(len(service1_text_frames), 1)
self.assertEqual(len(service2_text_frames), 1)
self.assertEqual(len(service3_text_frames), 0)
self.assertEqual(service1_text_frames[0].text, "Hello 1")
self.assertEqual(service2_text_frames[0].text, "Hello 2")
async def test_multi_service_switcher_targeting(self):
"""Test that ManuallySwitchServiceFrame targets the correct ServiceSwitcher in a multi-switcher pipeline."""
# Create services for first switcher
switcher1_service1 = MockFrameProcessor("switcher1_service1")
switcher1_service2 = MockFrameProcessor("switcher1_service2")
switcher1_services = [switcher1_service1, switcher1_service2]
# Create services for second switcher
switcher2_service1 = MockFrameProcessor("switcher2_service1")
switcher2_service2 = MockFrameProcessor("switcher2_service2")
switcher2_services = [switcher2_service1, switcher2_service2]
# Create two service switchers
switcher1 = ServiceSwitcher(switcher1_services, ServiceSwitcherStrategyManual)
switcher2 = ServiceSwitcher(switcher2_services, ServiceSwitcherStrategyManual)
# Create a pipeline with both switchers: switcher1 -> switcher2
pipeline = Pipeline([switcher1, switcher2])
# Reset counters
for service in switcher1_services + switcher2_services:
service.reset_counters()
# Initially, both switchers should use their first services
self.assertEqual(switcher1.strategy.active_service, switcher1_service1)
self.assertEqual(switcher2.strategy.active_service, switcher2_service1)
# Send frames to test the pipeline:
# 1. Text frame (should go through both switchers' active services)
# 2. Switch frame targeting switcher1's second service
# 3. Text frame (should go through switcher1's new service and switcher2's original service)
# 4. Switch frame targeting switcher2's second service
# 5. Text frame (should go through switcher1's current service and switcher2's new service)
await run_test(
pipeline,
frames_to_send=[
TextFrame("Before any switches"),
ManuallySwitchServiceFrame(service=switcher1_service2), # Switch first switcher
TextFrame("After switching first switcher"),
ManuallySwitchServiceFrame(service=switcher2_service2), # Switch second switcher
TextFrame("After switching second switcher"),
],
expected_down_frames=[
TextFrame,
ManuallySwitchServiceFrame,
TextFrame,
ManuallySwitchServiceFrame,
TextFrame,
],
expected_up_frames=[], # Expect no error frames
)
# Verify the active services changed correctly
self.assertEqual(switcher1.strategy.active_service, switcher1_service2)
self.assertEqual(switcher2.strategy.active_service, switcher2_service2)
# Verify frame distribution:
# First text frame should go through switcher1_service1 and switcher2_service1
switcher1_service1_texts = [
f for f in switcher1_service1.processed_frames if isinstance(f, TextFrame)
]
switcher2_service1_texts = [
f for f in switcher2_service1.processed_frames if isinstance(f, TextFrame)
]
# Second text frame should go through switcher1_service2 and switcher2_service1
switcher1_service2_texts = [
f for f in switcher1_service2.processed_frames if isinstance(f, TextFrame)
]
# Third text frame should go through switcher1_service2 and switcher2_service2
switcher2_service2_texts = [
f for f in switcher2_service2.processed_frames if isinstance(f, TextFrame)
]
# Verify frame counts and content
self.assertEqual(len(switcher1_service1_texts), 1)
self.assertEqual(switcher1_service1_texts[0].text, "Before any switches")
self.assertEqual(len(switcher1_service2_texts), 2)
self.assertEqual(switcher1_service2_texts[0].text, "After switching first switcher")
self.assertEqual(switcher1_service2_texts[1].text, "After switching second switcher")
self.assertEqual(len(switcher2_service1_texts), 2)
self.assertEqual(switcher2_service1_texts[0].text, "Before any switches")
self.assertEqual(switcher2_service1_texts[1].text, "After switching first switcher")
self.assertEqual(len(switcher2_service2_texts), 1)
self.assertEqual(switcher2_service2_texts[0].text, "After switching second switcher")
if __name__ == "__main__":
unittest.main()

2620
uv.lock generated

File diff suppressed because it is too large Load Diff