Compare commits

..

9 Commits

Author SHA1 Message Date
Mark Backman
708ef71c96 Update python-compatibility workflow to include new user project check 2025-08-09 20:19:16 -04:00
Aleix Conchillo Flaqué
241ab19228 update uv.lock with numba dependency 2025-08-08 15:12:55 -07:00
Mark Backman
c08e8ec8fb Merge pull request #2391 from pipecat-ai/mb/readme-local-dev
Update README with local dev setup for contributors
2025-08-08 11:15:58 -07:00
Mark Backman
eb9bc9644e Merge pull request #2400 from pipecat-ai/mb/pin-numba-0.61.2
fix: pin numba to >=0.61.2
2025-08-08 11:15:22 -07:00
Mark Backman
3a306dae90 fix: pin numba to >=0.61.2 2025-08-08 10:52:47 -04:00
Mark Backman
c42cc8254f Update README with local dev setup for contributors 2025-08-07 22:07:35 -04:00
Aleix Conchillo Flaqué
a8e21f7d5d Merge pull request #2395 from pipecat-ai/aleix/examples-15-inherit-parallel-pipeline
examples(foundational): move 15/15a logic into its own processor
2025-08-07 17:59:28 -07:00
Aleix Conchillo Flaqué
c6ef8de578 scripts(evals): fix 14v-function-calling-openai.py 2025-08-07 17:57:47 -07:00
Aleix Conchillo Flaqué
fc571fba42 examples(foundational): move 15/15a logic into its own processor 2025-08-07 17:57:47 -07:00
11 changed files with 625 additions and 587 deletions

View File

@@ -9,14 +9,14 @@ on:
paths: ['pyproject.toml']
jobs:
test-compatibility:
test-dev-environment:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ['3.10.18', '3.11.13', '3.12.11', '3.13.5']
name: Python ${{ matrix.python-version }}
name: Dev Environment - Python ${{ matrix.python-version }}
steps:
- name: Checkout code
uses: actions/checkout@v4
@@ -55,7 +55,69 @@ jobs:
--no-extra moondream \
--no-extra mlx-whisper
- name: Verify installation
- name: Verify dev installation
run: |
uv run python --version
uv run python -c "import pipecat; print('✅ Pipecat imports successfully')"
uv run python -c "import pipecat; print('✅ Dev environment - Pipecat imports successfully')"
test-user-experience:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ['3.10.18', '3.11.13', '3.12.11', '3.13.5']
name: User Experience - Python ${{ matrix.python-version }}
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y \
portaudio19-dev \
libcairo2-dev \
libgirepository1.0-dev \
pkg-config
- name: Install uv
uses: astral-sh/setup-uv@v4
with:
version: 'latest'
- name: Set up Python ${{ matrix.python-version }}
run: |
uv python install ${{ matrix.python-version }}
- name: Build local package
run: |
uv build
- name: Create test project
run: |
mkdir test-project
cd test-project
uv init --python ${{ matrix.python-version }}
- name: Test comprehensive extras with uv add (Python 3.10-3.12)
if: "!startsWith(matrix.python-version, '3.13.')"
run: |
cd test-project
# Use uv add with built wheel to leverage dependency management
uv add "../dist/pipecat_ai-"*".whl[anthropic,assemblyai,asyncai,aws,aws-nova-sonic,azure,cartesia,cerebras,deepseek,daily,deepgram,elevenlabs,fal,fireworks,fish,gladia,google,grok,groq,gstreamer,heygen,inworld,koala,langchain,livekit,lmnt,local,mcp,mem0,mlx-whisper,moondream,nim,neuphonic,noisereduce,openai,openpipe,openrouter,perplexity,playht,qwen,rime,riva,runner,sambanova,sentry,local-smart-turn,remote-smart-turn,silero,simli,soniox,soundfile,speechmatics,tavus,together,tracing,ultravox,webrtc,websocket,whisper]"
- name: Test Python 3.13 compatible extras with uv add
if: startsWith(matrix.python-version, '3.13.')
run: |
cd test-project
# Use uv add with built wheel and Python 3.13 compatible extras
uv add "../dist/pipecat_ai-"*".whl[anthropic,assemblyai,asyncai,aws,aws-nova-sonic,azure,cartesia,cerebras,deepseek,daily,deepgram,elevenlabs,fal,fireworks,fish,gladia,google,grok,groq,gstreamer,heygen,inworld,koala,langchain,livekit,lmnt,local,mcp,mem0,nim,neuphonic,noisereduce,openai,openpipe,openrouter,perplexity,playht,qwen,rime,riva,runner,sambanova,sentry,remote-smart-turn,silero,simli,soniox,soundfile,speechmatics,tavus,together,tracing,webrtc,websocket,whisper]"
- name: Verify user installation
run: |
cd test-project
uv run python --version
uv run python -c "import pipecat; print('✅ User experience - Pipecat imports successfully')"
# Test that basic functionality works
uv run python -c "from pipecat.pipeline.pipeline import Pipeline; print('✅ Pipeline import works')"

View File

@@ -5,6 +5,19 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
### Changed
- Updated `pyproject.toml` to once again pin `numba` to `>=0.61.2` in order to
resolve package versioning issues.
### Other
- Updated `15-switch-voices.py` and `15a-switch-languages.py` examples to show
how to enclose complex logic (e.g. `ParallelPipeline`) into a single processor
so the main pipeline becomes simpler.
## [0.0.79] - 2025-08-07
### Changed

View File

@@ -128,7 +128,7 @@ You can get started with Pipecat running on your local machine, then move your a
2. Install development and testing dependencies:
```bash
uv sync --group dev --all-extras --no-extra krisp
uv sync --group dev --all-extras --no-extra gstreamer --no-extra krisp --no-extra local
```
3. Install the git pre-commit hooks:
@@ -137,18 +137,25 @@ You can get started with Pipecat running on your local machine, then move your a
uv run pre-commit install
```
### Python 3.13+ Note
### Python 3.13+ Compatibility
Some features require PyTorch (not yet available on Python 3.13+):
- `ultravox`, `local-smart-turn`, `moondream`, `mlx-whisper`
**For full compatibility:** Use Python 3.12
Some features require PyTorch, which doesn't yet support Python 3.13+. Install using:
```bash
uv python pin 3.12 && uv sync --group dev --all-extras --no-extra krisp
uv sync --group dev --all-extras \
--no-extra gstreamer \
--no-extra krisp \
--no-extra local \
--no-extra local-smart-turn \
--no-extra mlx-whisper \
--no-extra moondream \
--no-extra ultravox
```
> **Tip:** For full compatibility, use Python 3.12: `uv python pin 3.12`
> **Note**: Some extras (local, gstreamer) require system dependencies. See documentation if you encounter build errors.
### Running tests
To run all tests, from the root directory:

View File

@@ -12,6 +12,7 @@ from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -31,29 +32,54 @@ from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
current_voice = "News Lady"
class SwitchVoices(ParallelPipeline):
def __init__(self):
self._current_voice = "News Lady"
news_lady = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="bf991597-6c13-47e4-8411-91ec2de5c466", # Newslady
)
async def switch_voice(params: FunctionCallParams):
global current_voice
current_voice = params.arguments["voice"]
await params.result_callback(
{
"voice": f"You are now using your {current_voice} voice. Your responses should now be as if you were a {current_voice}."
}
)
british_lady = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
barbershop_man = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
)
async def news_lady_filter(frame) -> bool:
return current_voice == "News Lady"
super().__init__(
# News Lady voice
[FunctionFilter(self.news_lady_filter), news_lady],
# British Reading Lady voice
[FunctionFilter(self.british_lady_filter), british_lady],
# Barbershop Man voice
[FunctionFilter(self.barbershop_man_filter), barbershop_man],
)
@property
def current_voice(self):
return self._current_voice
async def british_lady_filter(frame) -> bool:
return current_voice == "British Lady"
async def switch_voice(self, params: FunctionCallParams):
self._current_voice = params.arguments["voice"]
await params.result_callback(
{
"voice": f"You are now using your {self.current_voice} voice. Your responses should now be as if you were a {self.current_voice}."
}
)
async def news_lady_filter(self, _: Frame) -> bool:
return self.current_voice == "News Lady"
async def barbershop_man_filter(frame) -> bool:
return current_voice == "Barbershop Man"
async def british_lady_filter(self, _: Frame) -> bool:
return self.current_voice == "British Lady"
async def barbershop_man_filter(self, _: Frame) -> bool:
return self.current_voice == "Barbershop Man"
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
@@ -83,23 +109,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
news_lady = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="bf991597-6c13-47e4-8411-91ec2de5c466", # Newslady
)
british_lady = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
barbershop_man = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a0e99841-438c-4a64-b679-ae501e7d6091", # Barbershop Man
)
tts = SwitchVoices()
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm.register_function("switch_voice", switch_voice)
llm.register_function("switch_voice", tts.switch_voice)
tools = [
ChatCompletionToolParam(
@@ -136,14 +149,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
stt,
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (one of the following vocies)
[FunctionFilter(news_lady_filter), news_lady], # News Lady voice
[
FunctionFilter(british_lady_filter),
british_lady,
], # British Reading Lady voice
[FunctionFilter(barbershop_man_filter), barbershop_man], # Barbershop Man voice
),
tts, # TTS with switch voice functionality
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
@@ -165,7 +171,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user and let them know the voices you can do. Your initial responses should be as if you were a {current_voice}.",
"content": f"Please introduce yourself to the user and let them know the voices you can do. Your initial responses should be as if you were a {tts.current_voice}.",
}
)
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -13,6 +13,7 @@ from loguru import logger
from openai.types.chat import ChatCompletionToolParam
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -32,23 +33,42 @@ from pipecat.transports.services.daily import DailyParams
load_dotenv(override=True)
current_language = "English"
class SwitchLanguage(ParallelPipeline):
def __init__(self):
self._current_language = "English"
english_tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
async def switch_language(params: FunctionCallParams):
global current_language
current_language = params.arguments["language"]
await params.result_callback(
{"voice": f"Your answers from now on should be in {current_language}."}
)
spanish_tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="d4db5fb9-f44b-4bd1-85fa-192e0f0d75f9", # Spanish-speaking Lady
)
super().__init__(
# English
[FunctionFilter(self.english_filter), english_tts],
# Spanish
[FunctionFilter(self.spanish_filter), spanish_tts],
)
async def english_filter(frame) -> bool:
return current_language == "English"
@property
def current_language(self):
return self._current_language
async def switch_language(self, params: FunctionCallParams):
self._current_language = params.arguments["language"]
await params.result_callback(
{"voice": f"Your answers from now on should be in {self.current_language}."}
)
async def spanish_filter(frame) -> bool:
return current_language == "Spanish"
async def english_filter(self, _: Frame) -> bool:
return self.current_language == "English"
async def spanish_filter(self, _: Frame) -> bool:
return self.current_language == "Spanish"
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
@@ -80,18 +100,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
api_key=os.getenv("DEEPGRAM_API_KEY"), live_options=LiveOptions(language="multi")
)
english_tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
spanish_tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="d4db5fb9-f44b-4bd1-85fa-192e0f0d75f9", # Spanish-speaking Lady
)
tts = SwitchLanguage()
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
llm.register_function("switch_language", switch_language)
llm.register_function("switch_language", tts.switch_language)
tools = [
ChatCompletionToolParam(
@@ -128,10 +140,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (bot will speak the chosen language)
[FunctionFilter(english_filter), english_tts], # English
[FunctionFilter(spanish_filter), spanish_tts], # Spanish
),
tts, # TTS (bot will speak the chosen language)
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
@@ -153,7 +162,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
messages.append(
{
"role": "system",
"content": f"Please introduce yourself to the user and let them know the languages you speak. Your initial responses should be in {current_language}.",
"content": f"Please introduce yourself to the user and let them know the languages you speak. Your initial responses should be in {tts.current_language}.",
}
)
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -34,6 +34,8 @@ dependencies = [
"resampy~=0.4.3",
"soxr~=0.5.0",
"openai>=1.74.0,<=1.99.1",
# Pinning numba (resampy dep) to resolve a package dependency
"numba==0.61.2",
]
[project.urls]

View File

@@ -93,7 +93,7 @@ TESTS_14 = [
("14p-function-calling-gemini-vertex-ai.py", PROMPT_WEATHER, EVAL_WEATHER),
("14q-function-calling-qwen.py", PROMPT_WEATHER, EVAL_WEATHER),
("14r-function-calling-aws.py", PROMPT_WEATHER, EVAL_WEATHER),
("14v-function-calling-openai.py.py", PROMPT_WEATHER, EVAL_WEATHER),
("14v-function-calling-openai.py", PROMPT_WEATHER, EVAL_WEATHER),
# Currently not working.
# ("14c-function-calling-together.py", PROMPT_WEATHER, EVAL_WEATHER),
# ("14k-function-calling-cerebras.py", PROMPT_WEATHER, EVAL_WEATHER),

View File

@@ -479,7 +479,6 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
self._websocket = await websocket_connect(
url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key}
)
logger.debug(f"{self}: WebSocket connected to ElevenLabs - ready to receive audio")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
@@ -514,9 +513,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
# Close the current context when interrupted without closing the websocket
if self._context_id and self._websocket:
logger.debug(
f"{self}: Closing context {self._context_id} due to interruption - this will stop audio stream"
)
logger.trace(f"Closing context {self._context_id} due to interruption")
try:
# ElevenLabs requires that Pipecat manages the contexts and closes them
# when they're not longer in use. Since a StartInterruptionFrame is pushed
@@ -527,7 +524,6 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
await self._websocket.send(
json.dumps({"context_id": self._context_id, "close_context": True})
)
logger.debug(f"{self}: Sent close_context message for {self._context_id}")
except Exception as e:
logger.error(f"Error closing context on interruption: {e}")
self._context_id = None
@@ -538,28 +534,15 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
async for message in WatchdogAsyncIterator(
self._get_websocket(), manager=self.task_manager
):
# Log raw message structure for debugging (truncated for readability)
message_preview = message[:500] + "..." if len(message) > 500 else message
logger.debug(f"{self}: Raw WebSocket message preview: {message_preview}")
msg = json.loads(message)
received_ctx_id = msg.get("contextId")
# Log the message structure without the large audio data
msg_structure = {
k: (f"<{len(v)} chars>" if k == "audio" and isinstance(v, str) else v)
for k, v in msg.items()
}
logger.debug(f"{self}: Parsed message structure: {msg_structure}")
# Handle final messages first, regardless of context availability
# At the moment, this message is received AFTER the close_context message is
# sent, so it doesn't serve any functional purpose. For now, we'll just log it.
if msg.get("isFinal") is True:
logger.debug(
f"{self}: Received final message for context {received_ctx_id} - audio stream ended"
)
logger.trace(f"Received final message for context {received_ctx_id}")
continue
# Check if this message belongs to the current context.
@@ -581,44 +564,7 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
self.start_word_timestamps()
audio = base64.b64decode(msg["audio"])
# Targeted logging for audio debugging
audio_size = len(audio)
# More comprehensive audio analysis
if audio_size > 0:
# Sample first and last 8 bytes
first_8 = audio[:8].hex() if audio_size >= 8 else audio.hex()
last_8 = audio[-8:].hex() if audio_size >= 8 else ""
# Statistical analysis
non_zero_count = sum(1 for b in audio if b != 0)
non_zero_percent = (non_zero_count / audio_size * 100) if audio_size > 0 else 0
# Check for common silence patterns
all_zeros = all(b == 0 for b in audio)
mostly_zeros = non_zero_percent < 1.0 # Less than 1% non-zero
# Sample some values to check amplitude range
max_val = max(audio) if audio else 0
min_val = min(audio) if audio else 0
logger.debug(
f"{self}: Audio received - size: {audio_size} bytes, "
f"first_8: {first_8}, last_8: {last_8}, "
f"non_zero: {non_zero_percent:.1f}%, all_zeros: {all_zeros}, "
f"mostly_zeros: {mostly_zeros}, range: {min_val}-{max_val}, "
f"context: {received_ctx_id}, cumulative_time: {self._cumulative_time:.3f}s"
)
else:
logger.warning(
f"{self}: Received empty audio data for context {received_ctx_id}"
)
frame = TTSAudioRawFrame(audio, self.sample_rate, 1)
logger.debug(
f"[ELEVENLABS_AUDIO] Creating TTSAudioRawFrame with {len(audio)} bytes for context {received_ctx_id}"
)
await self.append_to_audio_context(received_ctx_id, frame)
if msg.get("alignment"):
@@ -674,9 +620,6 @@ class ElevenLabsTTSService(AudioContextWordTTSService):
"""Send text to the WebSocket for synthesis."""
if self._websocket and self._context_id:
msg = {"text": text, "context_id": self._context_id}
logger.debug(
f"{self}: Sending text to ElevenLabs - length: {len(text)}, context: {self._context_id}"
)
await self._websocket.send(json.dumps(msg))
@traced_tts
@@ -1030,9 +973,6 @@ class ElevenLabsHttpTTSService(WordTTSService):
if data and "audio_base64" in data:
await self.stop_ttfb_metrics()
audio = base64.b64decode(data["audio_base64"])
logger.debug(
f"[ELEVENLABS_AUDIO] Yielding TTSAudioRawFrame with {len(audio)} bytes"
)
yield TTSAudioRawFrame(audio, self.sample_rate, 1)
# Process alignment if present

View File

@@ -333,9 +333,6 @@ class TTSService(AIService):
elif isinstance(frame, TTSUpdateSettingsFrame):
await self._update_settings(frame.settings)
elif isinstance(frame, BotStoppedSpeakingFrame):
logger.warning(
f"[TTS_RESUME] {self.__class__.__name__} received BotStoppedSpeakingFrame"
)
await self._maybe_resume_frame_processing()
await self.push_frame(frame, direction)
else:
@@ -379,15 +376,11 @@ class TTSService(AIService):
async def _maybe_pause_frame_processing(self):
if self._processing_text and self._pause_frame_processing:
logger.warning(f"[TTS_PAUSE] {self.__class__.__name__} pausing frame processing")
await self.pause_processing_frames()
async def _maybe_resume_frame_processing(self):
if self._pause_frame_processing:
logger.warning(f"[TTS_RESUME] {self.__class__.__name__} resuming frame processing")
await self.resume_processing_frames()
else:
logger.debug(f"[TTS_RESUME] {self.__class__.__name__} resume called but not paused")
async def _process_text_frame(self, frame: TextFrame):
text: Optional[str] = None

View File

@@ -503,9 +503,6 @@ class BaseOutputTransport(FrameProcessor):
num_channels=frame.num_channels,
)
chunk.transport_destination = self._destination
logger.debug(
f"[AUDIO_QUEUE] Putting {chunk.__class__.__name__} with {len(chunk.audio)} bytes into audio queue"
)
await self._audio_queue.put(chunk)
self._audio_buffer = self._audio_buffer[self._audio_chunk_size :]
@@ -585,18 +582,15 @@ class BaseOutputTransport(FrameProcessor):
async def _bot_stopped_speaking(self):
"""Handle bot stopped speaking event."""
if self._bot_speaking:
logger.warning(
f"[BOT_STOPPED] Bot stopped speaking - sending BotStoppedSpeakingFrame upstream"
logger.debug(
f"Bot{f' [{self._destination}]' if self._destination else ''} stopped speaking"
)
downstream_frame = BotStoppedSpeakingFrame()
downstream_frame.transport_destination = self._destination
upstream_frame = BotStoppedSpeakingFrame()
upstream_frame.transport_destination = self._destination
logger.debug(f"[BOT_STOPPED] Pushing downstream BotStoppedSpeakingFrame")
await self._transport.push_frame(downstream_frame)
logger.debug(f"[BOT_STOPPED] Pushing upstream BotStoppedSpeakingFrame")
await self._transport.push_frame(upstream_frame, FrameDirection.UPSTREAM)
self._bot_speaking = False
@@ -604,8 +598,6 @@ class BaseOutputTransport(FrameProcessor):
# Clean audio buffer (there could be tiny left overs if not multiple
# to our output chunk size).
self._audio_buffer = bytearray()
else:
logger.debug(f"[BOT_STOPPED] _bot_stopped_speaking called but bot was not speaking")
async def _handle_frame(self, frame: Frame):
"""Handle various frame types with appropriate processing.
@@ -632,24 +624,12 @@ class BaseOutputTransport(FrameProcessor):
async def without_mixer(vad_stop_secs: float) -> AsyncGenerator[Frame, None]:
while True:
try:
logger.debug(
f"[AUDIO_QUEUE] Waiting for audio frame (timeout={vad_stop_secs}s)"
)
start_time = time.time()
frame = await asyncio.wait_for(
self._audio_queue.get(), timeout=vad_stop_secs
)
wait_time = time.time() - start_time
logger.debug(
f"[AUDIO_QUEUE] Got frame {frame.__class__.__name__} after {wait_time:.3f}s"
)
self._transport.reset_watchdog()
yield frame
except asyncio.TimeoutError:
wait_time = time.time() - start_time
logger.warning(
f"[AUDIO_QUEUE] TIMEOUT after {wait_time:.3f}s - triggering bot_stopped_speaking"
)
self._transport.reset_watchdog()
# Notify the bot stopped speaking upstream if necessary.
await self._bot_stopped_speaking()

862
uv.lock generated

File diff suppressed because it is too large Load Diff