Compare commits

...

66 Commits

Author SHA1 Message Date
vipyne
03dc1b343a add rtvi 2025-11-20 10:13:25 -06:00
vipyne
f0471dccda more examples 2025-11-20 10:13:25 -06:00
vipyne
fd0ef110ff pipecat bot example before trace 2025-11-20 10:13:25 -06:00
Mark Backman
ab58f72322 Merge pull request #3101 from hwuiwon/hw/inworld-talking-speed
feat: Add speaking rate control to Inworld TTS service.
2025-11-20 09:50:55 -05:00
Hwuiwon Kim
ead361f665 fix 2025-11-20 07:45:13 -05:00
Hwuiwon Kim
1cc69d475d feat: Add speaking rate control to Inworld TTS service & fix param cases 2025-11-19 22:57:53 -05:00
Mark Backman
51bdd8b728 Merge pull request #3097 from hwuiwon/fix-typo
Fix typo in STT event handler documentation
2025-11-19 17:10:32 -05:00
Hwuiwon Kim
30ff488714 Fix typo in event handler documentation 2025-11-19 17:04:07 -05:00
Vanessa Pyne
510f3df6b7 Merge pull request #3091 from pipecat-ai/vp-fix-mcp-examples
update MCP foundational examples
2025-11-19 10:35:08 -06:00
vipyne
68292bd75f rename MCP foundational examples 2025-11-19 10:34:13 -06:00
vipyne
42423bff41 update MCP foundational examples 2025-11-19 10:29:18 -06:00
Aleix Conchillo Flaqué
c3d2a25229 Merge pull request #3082 from pipecat-ai/aleix/pipecat-0.0.95
update CHANGELOG for 0.0.95
2025-11-18 21:17:07 -08:00
Aleix Conchillo Flaqué
cf1a9c1548 update CHANGELOG for 0.0.95 2025-11-18 21:14:27 -08:00
Aleix Conchillo Flaqué
51ba245e10 scripts(evals): fix EVAL_CONVERSATION/EVAL_WEATHER eval 2025-11-18 21:14:27 -08:00
Aleix Conchillo Flaqué
39b4e61837 SimliVideoService: fix connection issue 2025-11-18 19:41:47 -08:00
Aleix Conchillo Flaqué
ceaf53fdb0 LLMContext: async create_image_message/create_audio_message fixes 2025-11-18 19:41:13 -08:00
Aleix Conchillo Flaqué
f93276c64f Merge pull request #3090 from pipecat-ai/revert_function_calling_pr
Reverting: Ensure that the function call results respect the previous LLM context
2025-11-18 19:40:58 -08:00
Mark Backman
62a0f0c0f5 Merge pull request #3070 from ivaaan/hume-timestamps 2025-11-18 19:56:20 -05:00
Filipi Fuchter
793aca6b8b Revert "Ensure that the function call results respect the previous LLM context."
This reverts commit a510b276e6.
2025-11-18 21:38:49 -03:00
Filipi Fuchter
1fcaf3a4bf Revert "Searching in both _function_calls_context_messages and context messages when updating the result."
This reverts commit fccc91e923.
2025-11-18 21:38:49 -03:00
ivaaan
6484855139 fix changelog 2025-11-18 21:47:46 +01:00
ivaaan
771469b834 fix changelog 2025-11-18 21:39:29 +01:00
kompfner
a60618b0ca Merge pull request #3080 from pipecat-ai/pk/assistant-aggregator-handles-mixed-includes-inter-frame-spaces-text
`LLMAssistantAggregator` now properly aggregates text that might be a…
2025-11-18 15:24:27 -05:00
Paul Kompfner
3d21faaac2 LLMAssistantAggregator now properly aggregates text that might be a mix of includes_inter_frame_spaces=True and includes_inter_frame_spaces=False frames 2025-11-18 15:12:25 -05:00
ivaaan
f325eeb95b rm TranscriptProcessor 2 2025-11-18 20:41:10 +01:00
ivaaan
4c3fd42b1c fix changelog 2025-11-18 20:36:45 +01:00
ivaaan
c2309efd7e rm TranscriptProcessor 2025-11-18 20:35:09 +01:00
Ivan A
4ae1819645 Update src/pipecat/services/hume/tts.py
Co-authored-by: Mark Backman <m.backman@gmail.com>
2025-11-18 20:30:44 +01:00
Ivan A
a38f208135 Update examples/foundational/07ae-interruptible-hume.py
Co-authored-by: Mark Backman <m.backman@gmail.com>
2025-11-18 20:30:28 +01:00
Mark Backman
d1eb837890 Merge pull request #3081 from pipecat-ai/mb/fix-30-tts-text-frame-log
Fix foundational 30 example to output TTSTextFrames synced to audio
2025-11-18 14:10:56 -05:00
Mark Backman
153201542b Fix foundational 30 example to output TTSTextFrames synced to audio 2025-11-18 13:29:06 -05:00
Filipi da Silva Fuchter
9137e50043 Merge pull request #3053 from pipecat-ai/filipi/function_calls
Ensure that the function call results respect the previous LLM context.
2025-11-18 14:59:01 -03:00
Ivan A
8dbe119a73 Merge branch 'main' into hume-timestamps 2025-11-18 18:38:24 +01:00
ivaaan
26f96d0be8 upd example 2025-11-18 18:31:38 +01:00
ivaaan
9944e6faf0 upd service based on Mark's suggestions 2025-11-18 18:25:53 +01:00
Aleix Conchillo Flaqué
c1573c1f76 Merge pull request #3078 from pipecat-ai/aleix/llm-context-create-image-audio-async
LLMContext: create_image_message/create_audio_message are now async
2025-11-18 09:06:51 -08:00
Aleix Conchillo Flaqué
9f45ad4d2e LLMContext: create_image_message/create_audio_message are now async 2025-11-18 09:04:40 -08:00
Filipi Fuchter
fccc91e923 Searching in both _function_calls_context_messages and context messages when updating the result. 2025-11-18 11:50:28 -03:00
Filipi Fuchter
a510b276e6 Ensure that the function call results respect the previous LLM context. 2025-11-18 11:37:57 -03:00
Mark Backman
6481094638 Merge pull request #3058 from pipecat-ai/mb/add-camera-screen-support-smallwebrtc
Add camera and screen capture support to dev runner for SmallWebRTC
2025-11-18 09:22:36 -05:00
Mark Backman
3132e12265 Add camera and screen capture support to dev runner for SmallWebRTC 2025-11-18 09:19:13 -05:00
Aleix Conchillo Flaqué
12af3f79d0 Merge pull request #3060 from pipecat-ai/aleix/consumer-queue-frames
ConsumerProcessor: queue frames internally instead of pushing them
2025-11-18 00:54:18 -08:00
Aleix Conchillo Flaqué
4835617b16 ConsumerProcessor: queue frames internally instead of pushing them 2025-11-17 23:52:09 -08:00
Aleix Conchillo Flaqué
9283108240 Merge pull request #3073 from pipecat-ai/aleix/base-text-filter-only-filter
BaseTextFilter: only require subclasses to implement filter()
2025-11-17 23:29:26 -08:00
kompfner
515eaeeb1a Merge pull request #3074 from pipecat-ai/pk/tweak-moondream-example
Update Moondream example so that Moondream service output makes it in…
2025-11-17 16:52:18 -05:00
Paul Kompfner
5095fc6a64 Update Moondream example so that Moondream service output makes it into the context, even if the TTS service is disabled 2025-11-17 15:16:19 -05:00
Aleix Conchillo Flaqué
7eedb33d50 BaseTextFilter: only require subclasses to implement filter() 2025-11-17 11:23:47 -08:00
Filipi da Silva Fuchter
47f78df497 Merge pull request #3071 from pipecat-ai/filipi/small_webrtc_custom_data
Passing the custom request_data to the SmallWebRTCRunnerArguments body.
2025-11-17 15:50:11 -03:00
Filipi Fuchter
74154b26a2 Mentioning the SmallWebRTCTransport fix in the readme. 2025-11-17 15:39:07 -03:00
Filipi Fuchter
0c3c26b7b8 Passing the custom request_data to the SmallWebRTCRunnerArguments body. 2025-11-17 15:20:09 -03:00
kompfner
64417ef4ff Merge pull request #3061 from pipecat-ai/pk/greatly-simplify-inter-frame-spaces-logic
D'oh! My TTS "inter-frame-spaces" logic was *way* overcomplicated (an…
2025-11-17 10:47:56 -05:00
Paul Kompfner
f3b254e335 D'oh! My TTS "inter-frame-spaces" logic was *way* overcomplicated (and fundamentally mistaken, though it happened to work)
Now:
- For TTS word-by-word output and `TTSSpeakFrames`: `TTSTextFrame`s' have `includes_inter_frame_spaces=False`.
- For all other TTS output: `TTSTextFrame` pass through the received text frames' `includes_inter_frame_spaces` value. So far, this value has always been `True`: LLMs send text chunks already containing all necessary spaces.
- `LLMTextFrame`s set `includes_inter_frame_spaces=False` at init time, per the aforementioned assumption.
2025-11-17 10:14:28 -05:00
Filipi da Silva Fuchter
f27119a712 Merge pull request #3069 from pipecat-ai/filipi/fix_riva
Fixing RivaTTSService error handler.
2025-11-17 11:48:15 -03:00
ivaaan
2a51d0f1e5 add changelog 2025-11-17 15:20:06 +01:00
ivaaan
9156e21727 fix formatting 2025-11-17 14:00:03 +01:00
Filipi da Silva Fuchter
a5145be16e Merge pull request #3038 from pipecat-ai/filipi/flux_improvements
Deepgram Flux improvements
2025-11-17 09:57:43 -03:00
Filipi Fuchter
b104a59b10 Mentioning the Deepgram Flux improvements in the changelog. 2025-11-17 09:54:39 -03:00
Filipi Fuchter
04dbbabc03 Introduced a minimum confidence parameter in DeepgramFluxSTTService to avoid generating transcriptions below a defined threshold. 2025-11-17 09:54:30 -03:00
Filipi Fuchter
19cc0177b8 Refactored DeepgramFluxSTTService to automatically reconnect if sending a message fails. 2025-11-17 09:54:20 -03:00
Filipi Fuchter
77cd106795 Extracted the logic for retrying connections, and create a new send_with_retry method inside WebSocketService. 2025-11-17 09:54:08 -03:00
ivaaan
71869a116d fix errors 2025-11-17 13:51:04 +01:00
ivaaan
2f2bde9856 add timestamps to example 2025-11-17 13:40:03 +01:00
ivaaan
7de8838deb add word-level timestamp support to Hume service 2025-11-17 13:25:12 +01:00
Filipi Fuchter
9bf88bbf14 Fixing RivaTTSService error handler. 2025-11-17 07:43:30 -03:00
Mark Backman
35ff44b799 Merge pull request #3059 from pipecat-ai/mb/remove-llm-tracing-fallback 2025-11-14 14:07:40 -05:00
Mark Backman
d01876ee60 Remove fallbacks in traced_llm 2025-11-14 12:13:49 -05:00
64 changed files with 1239 additions and 496 deletions

View File

@@ -5,22 +5,54 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.0.95] - 2025-11-18
### Added
- Added ai-coustics integrated VAD (`AICVADAnalyzer`) with `AICFilter` factory and
example wiring; leverages the enhancement model for robust detection with no
ONNX dependency or added processing complexity.
- Added a watchdog to `DeepgramFluxSTTService` to prevent dangling tasks in case the
user was speaking and we stop receiving audio.
- Introduced a minimum confidence parameter in `DeepgramFluxSTTService` to avoid
generating transcriptions below a defined threshold.
- Added `ElevenLabsRealtimeSTTService` which implements the Realtime STT
service from ElevenLabs.
- Added a `TTSService.includes_inter_frame_spaces` property getter, so that TTS
services that subclass `TTSService` can indicate whether the text in the
`TTSTextFrame`s they push already contain any necessary inter-frame spaces.
- Added word-level timestamps support to Hume TTS service
- Added optional speaking rate control to `InworldTTSService`.
### Changed
- ⚠️ Breaking change: `LLMContext.create_image_message()`,
`LLMContext.create_audio_message()`, `LLMContext.add_image_frame_message()`
and `LLMContext.add_audio_frames_message()` are now async methods. This fixes
an issue where the asyncio event loop would be blocked while encoding audio or
images.
- `ConsumerProcessor` now queues frames from the producer internally instead of
pushing them directly. This allows us to subclass consumer processors and
manipulate frames before they are pushed.
- `BaseTextFilter` only require subclasses to implement the `filter()` method.
- Extracted the logic for retrying connections, and create a new `send_with_retry`
method inside `WebSocketService`.
- Refactored `DeepgramFluxSTTService` to automatically reconnect if sending a
message fails.
- Updated all STT and TTS services to use consistent error handling pattern with
`push_error()` method for better pipeline error event integration.
- Added support for `maybe_capture_participant_camera()` and
`maybe_capture_participant_screen()` for `SmallWebRTCTransport` in the runner
utils.
- Added Hindi support for Rime TTS services.
- Updated `GeminiTTSService` to use Google Cloud Text-to-Speech streaming API
@@ -40,6 +72,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed a `SimliVideoService` connection issue.
- Fixed an issue in the `Runner` where, when using `SmallWebRTCTransport`, the
`request_data` was not being passed to the `SmallWebRTCRunnerArguments` body.
- Fixed subtle issue of assistant context messages ending up with double spaces
between words or sentences.
@@ -54,11 +91,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Prevented `HeyGenVideoService` from automatically disconnecting after 5 minutes.
### Added
- Added ai-coustics integrated VAD (`AICVADAnalyzer`) with `AICFilter` factory and
example wiring; leverages the enhancement model for robust detection with no
ONNX dependency or added processing complexity.
- Fixed `InworldTTSService` audio config payload to use camelCase keys expected
by the Inworld API.
## [0.0.94] - 2025-11-10

View File

@@ -13,24 +13,29 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import LLMRunFrame, TTSTextFrame
from pipecat.observers.loggers.debug_log_observer import DebugLogObserver, FrameEndpoint
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
)
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.hume.tts import HUME_SAMPLE_RATE, HumeTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -88,7 +93,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
tts, # TTS (HumeTTSService with word timestamps)
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
@@ -102,7 +107,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
audio_out_sample_rate=HUME_SAMPLE_RATE,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[RTVIObserver(rtvi)],
observers=[
RTVIObserver(rtvi),
DebugLogObserver(
frame_types={
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),
}
),
],
)
@rtvi.event_handler("on_client_ready")
@@ -112,6 +124,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
logger.info(
"💡 Word timestamps are enabled! Watch the console for TTSTextFrame logs showing each word with its PTS."
)
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])

View File

@@ -52,7 +52,10 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramFluxSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramFluxSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"),
params=DeepgramFluxSTTService.InputParams(min_confidence=0.3),
)
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-2-andromeda-en")

View File

@@ -110,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Kick off the conversation.
image = Image.open(image_path)
message = LLMContext.create_image_message(
message = await LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,

View File

@@ -110,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Kick off the conversation.
image = Image.open(image_path)
message = LLMContext.create_image_message(
message = await LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,

View File

@@ -117,7 +117,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Kick off the conversation.
image = Image.open(image_path)
message = LLMContext.create_image_message(
message = await LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,

View File

@@ -110,7 +110,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Kick off the conversation.
image = Image.open(image_path)
message = LLMContext.create_image_message(
message = await LLMContext.create_image_message(
image=image.tobytes(),
format="RGB",
size=image.size,

View File

@@ -15,14 +15,21 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame, UserImageRequestFrame
from pipecat.frames.frames import (
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMRunFrame,
TextFrame,
UserImageRequestFrame,
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frame_processor import FrameDirection
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
@@ -66,6 +73,27 @@ async def fetch_user_image(params: FunctionCallParams):
# await params.result_callback({"result": "Image is being captured."})
class MoondreamTextFrameWrapper(FrameProcessor):
"""Wraps Moondream-provided TextFrames with LLM response start/end frames.
This processor detects TextFrames and automatically wraps them with
LLMFullResponseStartFrame and LLMFullResponseEndFrame to provide proper
response boundaries for downstream processors.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# If we receive a TextFrame, wrap it with response start/end frames
if isinstance(frame, TextFrame):
await self.push_frame(LLMFullResponseStartFrame(), direction)
await self.push_frame(frame, direction)
await self.push_frame(LLMFullResponseEndFrame(), direction)
else:
# For all other frames, just pass them through
await self.push_frame(frame, direction)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
@@ -130,6 +158,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# If you run into weird description, try with use_cpu=True
moondream = MoondreamService()
# Wrap TextFrames with LLM response start/end frames, which makes Moondream
# output be treated like LLM responses for the purpose of context
# aggregation. Without this, the assistant context aggregator would ignore
# Moondream output (if the TTS service is disabled).
moondream_text_wrapper = MoondreamTextFrameWrapper()
pipeline = Pipeline(
[
transport.input(), # Transport user input
@@ -137,7 +171,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context_aggregator.user(), # User responses
ParallelPipeline(
[llm], # LLM
[moondream],
[moondream, moondream_text_wrapper],
),
tts, # TTS
transport.output(), # Transport bot output

View File

@@ -391,7 +391,7 @@ class AudioAccumulator(FrameProcessor):
)
self._user_speaking = False
context = LLMContext()
context.add_audio_frames_message(audio_frames=self._audio_frames)
await context.add_audio_frames_message(audio_frames=self._audio_frames)
await self.push_frame(LLMContextFrame(context=context))
elif isinstance(frame, InputAudioRawFrame):
# Append the audio frame to our buffer. Treat the buffer as a ring buffer, dropping the oldest

View File

@@ -150,7 +150,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
LLMLogObserver(),
DebugLogObserver(
frame_types={
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.DESTINATION),
TTSTextFrame: (BaseOutputTransport, FrameEndpoint.SOURCE),
UserStartedSpeakingFrame: (BaseInputTransport, FrameEndpoint.SOURCE),
EndFrame: None,
}

View File

@@ -155,7 +155,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
You are a helpful LLM in a WebRTC call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to tools to search the Rijksmuseum collection.
Offer, for example, to show the earliest Rembrandt work from the museum. Use the `search_artwork` tool.
Offer, for example, to show a floral still life, use the `search_artwork` tool.
The tool may respond with a JSON object with an `artworks` array. Choose the art from that array.
Once the tool has responded, tell the user the title and use the `open_image_in_browser` tool.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.

View File

@@ -7,6 +7,7 @@
import asyncio
import io
import json
import os
import re
import shutil
@@ -15,7 +16,7 @@ import aiohttp
from dotenv import load_dotenv
from loguru import logger
from mcp import StdioServerParameters
from mcp.client.session_group import SseServerParameters
from mcp.client.session_group import StreamableHttpParameters
from PIL import Image
from pipecat.adapters.schemas.tools_schema import ToolsSchema
@@ -66,10 +67,12 @@ class UrlToImageProcessor(FrameProcessor):
await self.push_frame(frame, direction)
def extract_url(self, text: str):
pattern = r"!\[[^\]]*\]\((https?://[^)]+\.(png|jpg|jpeg|PNG|JPG|JPEG|gif))\)"
match = re.search(pattern, text)
if match:
return match.group(1)
data = json.loads(text)
if "artObject" in data:
return data["artObject"]["webImage"]["url"]
if "artworks" in data and len(data["artworks"]):
return data["artworks"][0]["webImage"]["url"]
return None
async def run_image_process(self, image_url: str):
@@ -132,10 +135,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
system = f"""
You are a helpful LLM in a WebRTC call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to tools to search the Rijksmuseum collection.
Offer, for example, to show the earliest Rembrandt work from the museum. Use the `search_artwork` tool.
You have access to tools to search the Rijksmuseum collection and the user's GitHub repositories and account.
Offer, for example, to show a floral still life, use the `search_artwork` tool.
The tool may respond with a JSON object with an `artworks` array. Choose the art from that array.
Once the tool has responded, tell the user the title and use the `open_image_in_browser` tool.
You can also offer to answer users questions about their GitHub repositories and account.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Respond to what the user said in a creative and helpful way.
Don't overexplain what you are doing.
@@ -145,11 +149,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
messages = [{"role": "system", "content": system}]
try:
mcp = MCPClient(
rijksmuseum_mcp = MCPClient(
server_params=StdioServerParameters(
command=shutil.which("npx"),
# https://github.com/r-huijts/rijksmuseum-mcp
args=["-y", "mcp-server-error setting up mcp"],
args=["-y", "mcp-server-rijksmuseum"],
env={"RIJKSMUSEUM_API_KEY": os.getenv("RIJKSMUSEUM_API_KEY")},
)
)
@@ -157,24 +161,32 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.error(f"error setting up rijksmuseum mcp")
logger.exception("error trace:")
try:
# https://docs.mcp.run/integrating/tutorials/mcp-run-sse-openai-agents/
# ie. "https://www.mcp.run/api/mcp/sse?..."
# ensure the profile has a tool or few installed
mcp_run = MCPClient(server_params=SseServerParameters(url=os.getenv("MCP_RUN_SSE_URL")))
# Github MCP docs: https://github.com/github/github-mcp-server
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
github_mcp = MCPClient(
server_params=StreamableHttpParameters(
url="https://api.githubcopilot.com/mcp/",
headers={
"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"
},
)
)
except Exception as e:
logger.error(f"error setting up mcp.run")
logger.exception("error trace:")
tools = {}
run_tools = {}
rijksmuseum_tools = {}
github_tools = {}
try:
tools = await mcp.register_tools(llm)
run_tools = await mcp_run.register_tools(llm)
rijksmuseum_tools = await rijksmuseum_mcp.register_tools(llm)
github_tools = await github_mcp.register_tools(llm)
except Exception as e:
logger.error(f"error registering tools")
logger.exception("error trace:")
all_standard_tools = run_tools.standard_tools + tools.standard_tools
all_standard_tools = rijksmuseum_tools.standard_tools + github_tools.standard_tools
all_tools = ToolsSchema(standard_tools=all_standard_tools)
context = LLMContext(messages, all_tools)
@@ -226,9 +238,9 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
if not os.getenv("RIJKSMUSEUM_API_KEY") or not os.getenv("MCP_RUN_SSE_URL"):
if not os.getenv("RIJKSMUSEUM_API_KEY") or not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"):
logger.error(
f"Please set RIJKSMUSEUM_API_KEY and MCP_RUN_SSE_URL environment variables. See https://github.com/r-huijts/rijksmuseum-mcp and https://mcp.run"
f"Please set `RIJKSMUSEUM_API_KEY` and `GITHUB_PERSONAL_ACCESS_TOKEN` environment variables. See https://github.com/r-huijts/rijksmuseum-mcp."
)
import sys

View File

@@ -4,12 +4,11 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from mcp.client.session_group import SseServerParameters
from turn_detector_observer import TurnDetectorObserver
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
@@ -21,12 +20,12 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.mcp_service import MCPClient
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.stt import OpenAISTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -61,59 +60,55 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
### STT ###
stt = OpenAISTTService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-transcribe",
prompt="Expect normal helpful conversation.",
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-7-sonnet-latest"
### LLM ###
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
### TTS ###
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="ballad",
params=OpenAITTSService.InputParams(
instructions="Please speak clearly and at a moderate pace."
),
)
try:
# https://docs.mcp.run/integrating/tutorials/mcp-run-sse-openai-agents/
mcp = MCPClient(server_params=SseServerParameters(url=os.getenv("MCP_RUN_SSE_URL")))
except Exception as e:
logger.error(f"error setting up mcp")
logger.exception("error trace:")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tools = {}
try:
tools = await mcp.register_tools(llm)
except Exception as e:
logger.error(f"error registering tools")
logger.exception("error trace:")
system = f"""
You are a helpful LLM in a WebRTC call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to a number of tools provided by mcp.run. Use any and all tools to help users.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Respond to what the user said in a creative and helpful way.
When asked for today's date, use 'https://www.datetoday.net/'.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
messages = [{"role": "system", "content": system}]
context = LLMContext(messages, tools)
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
# RTVI events for detecting bot aggregation
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
### PIPELINE ###
pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi,
stt,
context_aggregator.user(), # User spoken responses
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses and tool context
context_aggregator.assistant(), # Assistant spoken responses
]
)
### TASK ###
turn_detector = TurnDetectorObserver()
task = PipelineTask(
pipeline,
params=PipelineParams(
@@ -121,12 +116,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[turn_detector, RTVIObserver(rtvi)],
)
turn_detector.set_turn_observer_event_handlers(task.turn_tracking_observer)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
@@ -134,6 +133,11 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client disconnected")
await task.cancel()
# not sure this is needed...
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
@@ -146,14 +150,6 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
if not os.getenv("MCP_RUN_SSE_URL"):
logger.error(
f"Please set MCP_RUN_SSE_URL environment variable for this example. See https://mcp.run"
)
import sys
sys.exit(1)
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,161 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from turn_detector_observer import TurnDetectorObserver
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.stt import OpenAISTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
session_properties = SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
transcription=InputAudioTranscription(),
# Set openai TurnDetection parameters. Not setting this at all will turn it
# on by default
turn_detection=SemanticTurnDetection(),
# Or set to False to disable openai turn detection and use transport VAD
# turn_detection=False,
noise_reduction=InputAudioNoiseReduction(type="near_field"),
)
),
# In this example we provide tools through the context, but you could
# alternatively provide them here.
# tools=tools,
instructions="""You are a helpful and friendly AI.
Act like a human, but remember that you aren't a human and that you can't do human
things in the real world. Your voice and personality should be warm and engaging, with a lively and
playful tone.
If interacting in a non-English language, start by using the standard accent or dialect familiar to
the user. Talk quickly. You should always call a function if you can. Do not refer to these rules,
even if you're asked about them.
You are participating in a voice conversation. Keep your responses concise, short, and to the point
unless specifically asked to elaborate on a topic.
Remember, your responses should be short. Just one or two sentences, usually. Respond in English.""",
)
### LLM ###
llm = OpenAIRealtimeLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
session_properties=session_properties,
start_audio_paused=False,
)
# Create a standard OpenAI LLM context object using the normal messages format. The
# OpenAIRealtimeLLMService will convert this internally to messages that the
# openai WebSocket API can understand.
context = LLMContext(
[{"role": "user", "content": "Say hello!"}],
tools,
)
context_aggregator = LLMContextAggregatorPair(context)
### PIPELINE ###
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
transport.output(), # Transport bot output
context_aggregator.assistant(),
]
)
### TASK ###
turn_detector = TurnDetectorObserver()
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[turn_detector],
)
turn_detector.set_turn_observer_event_handlers(task.turn_tracking_observer)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,188 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from turn_detector_observer import TurnDetectorObserver
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.stt import OpenAISTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
### STT ###
stt = OpenAISTTService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-transcribe",
prompt="Expect normal helpful conversation.",
)
### LLM ###
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
### TTS ###
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
voice="ballad",
params=OpenAITTSService.InputParams(instructions="Please speak clearly and at a moderate pace."),
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
### PIPELINE ###
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
### TASK ###
turn_detector = TurnDetectorObserver()
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[turn_detector],
)
turn_detector.set_turn_observer_event_handlers(task.turn_tracking_observer)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,11 @@
```bash
uv sync
uv pip install -e '.[cartesia,daily,elevenlabs,local-smart-turn-v3,openai,runner,webrtc]'
```
```bash
python examples/foundational/trace/001-trace.py
```
- open [http://localhost:7860](http://localhost:7860)
- click `connect` button in top right

View File

@@ -0,0 +1,5 @@
OPENAI_API_KEY=...
ELEVENLABS_API_KEY=...
ELEVENLABS_VOICE_ID=...
CARTESIA_API_KEY=...

View File

@@ -0,0 +1,181 @@
import time
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
EndFrame,
FunctionCallResultFrame,
FunctionCallsStartedFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
StartFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.openai.base_llm import LLMService
from pipecat.transports.base_output import BaseOutputTransport
class TurnDetectorObserver(BaseObserver):
"""Observer ... of turns."""
def __init__(self):
super().__init__()
self._turn_observer = None
self._arrow = ""
self._turn_number = 1
self._endframe_queued = False
def init(self):
"""
Set ...
"""
pass
def set_turn_observer_event_handlers(self, turn_observer):
self._turn_observer = turn_observer
self.set_turn_observer_event_handlers(self._turn_observer)
def get_turn_observer(self):
return self._turn_observer
def set_turn_observer_event_handlers(self, turn_observer):
"""Sets the Turn Observer event handlers `on_turn_started` and `on_turn_ended`.
Args:
turn_observer: The turn tracking observer of the pipeline task
"""
@turn_observer.event_handler("on_turn_started")
async def on_turn_started(observer, turn_number):
self._turn_number = turn_number
current_time = time.time()
logger.info(f"🔄 Turn {turn_number} started")
# 🫆🫆🫆🫆
# code to start conversation turn here
# 🫆🫆🫆🫆
# 🫆🫆🫆🫆
# 🫆🫆🫆🫆
@turn_observer.event_handler("on_turn_ended")
async def on_turn_ended(observer, turn_number, duration, was_interrupted):
current_time = time.time()
if was_interrupted:
logger.info(f"🔄 Turn {turn_number} interrupted after {duration:.2f}s")
else:
logger.info(f"🏁 Turn {turn_number} completed in {duration:.2f}s")
# 🫆🫆🫆🫆
# code to end conversation turn here
# 🫆🫆🫆🫆
# 🫆🫆🫆🫆
# 🫆🫆🫆🫆
########
# everything past here isn't needed, just nice to have logging
########
async def on_push_frame(self, data: FramePushed):
"""Runs when any frame is pushed through pipeline.
Determines based on what type of frame and where it came from
what metrics to update.
Args:
data: the pushed frame
"""
src = data.source
dst = data.destination
frame = data.frame
direction = data.direction
timestamp = data.timestamp
# Convert timestamp to milliseconds for readability
time_sec = timestamp / 1_000_000
# Convert timestamp to seconds for readability
# time_sec = timestamp / 1_000_000_000
# only log downstream frames
if direction == FrameDirection.UPSTREAM:
return
if isinstance(src, Pipeline) or isinstance(dst, Pipeline):
if isinstance(frame, StartFrame):
self._handle_StartFrame(src, dst, frame, time_sec)
elif isinstance(frame, EndFrame):
self._handle_EndFrame(src, dst, frame, time_sec)
if isinstance(src, BaseOutputTransport):
if isinstance(frame, BotStartedSpeakingFrame):
self._handle_BotStartedSpeakingFrame(src, dst, frame, time_sec)
elif isinstance(frame, BotStoppedSpeakingFrame):
self._handle_BotStoppedSpeakingFrame(src, dst, frame, time_sec)
elif isinstance(frame, UserStartedSpeakingFrame):
self._handle_UserStartedSpeakingFrame(src, dst, frame, time_sec)
elif isinstance(frame, UserStoppedSpeakingFrame):
self._handle_UserStoppedSpeakingFrame(src, dst, frame, time_sec)
if isinstance(src, LLMService):
if isinstance(frame, LLMFullResponseStartFrame):
self._handle_LLMFullResponseStartFrame(src, dst, frame, time_sec)
elif isinstance(frame, LLMFullResponseEndFrame):
self._handle_LLMFullResponseEndFrame(src, dst, frame, time_sec)
elif isinstance(frame, FunctionCallsStartedFrame):
self._handle_FunctionCallsStartedFrame(src, dst, frame, time_sec)
elif isinstance(frame, FunctionCallResultFrame):
self._handle_FunctionCallResultFrame(src, dst, frame, time_sec)
# ------------ FRAME HANDLERS ------------
def _handle_StartFrame(self, src, dst, frame, time_sec):
if isinstance(dst, Pipeline):
logger.info(f"🟢🟢🟢 StartFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
def _handle_EndFrame(self, src, dst, frame, time_sec):
if isinstance(dst, Pipeline):
logger.info(f"Queueing 🔴🔴🔴 EndFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
self._endframe_queued = True
if isinstance(src, Pipeline):
logger.info(f"🔴🔴🔴 EndFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
current_time = time.time()
end_state_info = {
"turn_number": self._turn_number,
}
def _handle_BotStartedSpeakingFrame(self, src, dst, frame, time_sec):
logger.info(f"🤖🟢 BotStartedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
def _handle_BotStoppedSpeakingFrame(self, src, dst, frame, time_sec):
logger.info(f"🤖🔴 BotStoppedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
def _handle_LLMFullResponseStartFrame(self, src, dst, frame, time_sec):
logger.info(f"🧠🟢 LLMFullResponseStartFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
def _handle_LLMFullResponseEndFrame(self, src, dst, frame, time_sec):
logger.info(f"🧠🔴 LLMFullResponseEndFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
def _handle_UserStartedSpeakingFrame(self, src, dst, frame, time_sec):
logger.info(f"🙂🟢 UserStartedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
def _handle_UserStoppedSpeakingFrame(self, src, dst, frame, time_sec):
logger.info(f"🙂🔴 UserStoppedSpeakingFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s")
def _handle_FunctionCallsStartedFrame(self, src, dst, frame, time_sec):
logger.info(
f"📐🟢 {frame.function_calls[0].function_name} FunctionCallsStartedFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s"
)
def _handle_FunctionCallResultFrame(self, src, dst, frame, time_sec):
logger.info(
f"📐🔴 {frame.function_name} FunctionCallResultFrame: {src} {self._arrow} {dst} at {time_sec:.2f}s"
)

View File

@@ -99,7 +99,7 @@ local-smart-turn = [ "coremltools>=8.0", "transformers", "torch>=2.5.0,<3", "tor
local-smart-turn-v3 = [ "transformers", "onnxruntime>=1.20.1,<2" ]
remote-smart-turn = []
silero = [ "onnxruntime>=1.20.1,<2" ]
simli = [ "simli-ai~=0.1.25"]
simli = [ "simli-ai~=1.0.3"]
soniox = [ "pipecat-ai[websockets-base]" ]
soundfile = [ "soundfile~=0.13.1" ]
speechmatics = [ "speechmatics-rt>=0.5.0" ]

View File

@@ -30,8 +30,8 @@ EVAL_SIMPLE_MATH = EvalConfig(
)
EVAL_WEATHER = EvalConfig(
prompt="What's the weather in San Francisco?",
eval="The user says something specific about the current weather in San Francisco, including the degrees.",
prompt="What's the weather in San Francisco (in farhenheit or celsius)?",
eval="The user says something specific about the current weather in San Francisco, including the degrees (in farhenheit or celsius).",
)
EVAL_ONLINE_SEARCH = EvalConfig(
@@ -70,7 +70,7 @@ EVAL_VOICEMAIL = EvalConfig(
EVAL_CONVERSATION = EvalConfig(
prompt="Hello, this is Mark.",
eval="The user replies with a greeting.",
eval="The user acknowledges the greeting.",
eval_speaks_first=True,
)

View File

@@ -352,7 +352,10 @@ class TextFrame(DataFrame):
class LLMTextFrame(TextFrame):
"""Text frame generated by LLM services."""
pass
def __post_init__(self):
super().__post_init__()
# LLM services send text frames with all necessary spaces included
self.includes_inter_frame_spaces = True
@dataclass

View File

@@ -14,6 +14,7 @@ translation from this universal context into whatever format it needs, using a
service-specific adapter.
"""
import asyncio
import base64
import io
import wave
@@ -137,7 +138,7 @@ class LLMContext:
return {"role": role, "content": content}
@staticmethod
def create_image_message(
async def create_image_message(
*,
role: str = "user",
format: str,
@@ -154,15 +155,21 @@ class LLMContext:
image: Raw image bytes.
text: Optional text to include with the image.
"""
buffer = io.BytesIO()
Image.frombytes(format, size, image).save(buffer, format="JPEG")
encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
def encode_image():
buffer = io.BytesIO()
Image.frombytes(format, size, image).save(buffer, format="JPEG")
encoded_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
return encoded_image
encoded_image = await asyncio.to_thread(encode_image)
url = f"data:image/jpeg;base64,{encoded_image}"
return LLMContext.create_image_url_message(role=role, url=url, text=text)
@staticmethod
def create_audio_message(
async def create_audio_message(
*, role: str = "user", audio_frames: list[AudioRawFrame], text: str = "Audio follows"
) -> LLMContextMessage:
"""Create a context message containing audio.
@@ -172,21 +179,26 @@ class LLMContext:
audio_frames: List of audio frame objects to include.
text: Optional text to include with the audio.
"""
sample_rate = audio_frames[0].sample_rate
num_channels = audio_frames[0].num_channels
content = []
content.append({"type": "text", "text": text})
data = b"".join(frame.audio for frame in audio_frames)
async def encode_audio():
sample_rate = audio_frames[0].sample_rate
num_channels = audio_frames[0].num_channels
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(data)
content = []
content.append({"type": "text", "text": text})
data = b"".join(frame.audio for frame in audio_frames)
encoded_audio = base64.b64encode(buffer.getvalue()).decode("utf-8")
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(data)
encoded_audio = base64.b64encode(buffer.getvalue()).decode("utf-8")
return encoded_audio
encoded_audio = await asyncio.to_thread(encode_audio)
content.append(
{
@@ -321,7 +333,7 @@ class LLMContext:
"""
self._tool_choice = tool_choice
def add_image_frame_message(
async def add_image_frame_message(
self, *, format: str, size: tuple[int, int], image: bytes, text: Optional[str] = None
):
"""Add a message containing an image frame.
@@ -332,10 +344,12 @@ class LLMContext:
image: Raw image bytes.
text: Optional text to include with the image.
"""
message = LLMContext.create_image_message(format=format, size=size, image=image, text=text)
message = await LLMContext.create_image_message(
format=format, size=size, image=image, text=text
)
self.add_message(message)
def add_audio_frames_message(
async def add_audio_frames_message(
self, *, audio_frames: list[AudioRawFrame], text: str = "Audio follows"
):
"""Add a message containing audio frames.
@@ -344,7 +358,7 @@ class LLMContext:
audio_frames: List of audio frame objects to include.
text: Optional text to include with the audio.
"""
message = LLMContext.create_audio_message(audio_frames=audio_frames, text=text)
message = await LLMContext.create_audio_message(audio_frames=audio_frames, text=text)
self.add_message(message)
@staticmethod

View File

@@ -66,7 +66,7 @@ from pipecat.processors.aggregators.llm_response import (
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.string import concatenate_aggregated_text
from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text
from pipecat.utils.time import time_now_iso8601
@@ -90,15 +90,7 @@ class LLMContextAggregator(FrameProcessor):
self._context = context
self._role = role
self._aggregation: List[str] = []
# Whether to add spaces between text parts.
# (Currently only used by LLMAssistantAggregator, but could be expanded
# to LLMUserAggregator in the future if needed; that would require
# additional work since LLMUserAggregator currently trims spaces from
# incoming frames before determining whether it "really" received any
# text).
self._add_spaces = True
self._aggregation: List[TextPartForConcatenation] = []
@property
def messages(self) -> List[LLMContextMessage]:
@@ -191,7 +183,7 @@ class LLMContextAggregator(FrameProcessor):
Returns:
The concatenated aggregation string.
"""
return concatenate_aggregated_text(self._aggregation, self._add_spaces)
return concatenate_aggregated_text(self._aggregation)
class LLMUserAggregator(LLMContextAggregator):
@@ -441,7 +433,12 @@ class LLMUserAggregator(LLMContextAggregator):
if not text.strip():
return
self._aggregation.append(text)
# Transcriptions never include inter-part spaces (so far).
self._aggregation.append(
TextPartForConcatenation(
text, includes_inter_part_spaces=frame.includes_inter_frame_spaces
)
)
# We just got a final result, so let's reset interim results.
self._seen_interim_results = False
# Reset aggregation timer.
@@ -796,7 +793,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
logger.debug(f"{self} Appending UserImageRawFrame to LLM context (size: {frame.size})")
self._context.add_image_frame_message(
await self._context.add_image_frame_message(
format=frame.format,
size=frame.size,
image=frame.image,
@@ -821,11 +818,11 @@ class LLMAssistantAggregator(LLMContextAggregator):
if len(frame.text) == 0:
return
# Track whether we need to add spaces between text parts
# Assumption: we can just keep track of the latest frame's value
self._add_spaces = not frame.includes_inter_frame_spaces
self._aggregation.append(frame.text)
self._aggregation.append(
TextPartForConcatenation(
frame.text, includes_inter_part_spaces=frame.includes_inter_frame_spaces
)
)
def _context_updated_task_finished(self, task: asyncio.Task):
self._context_updated_tasks.discard(task)

View File

@@ -83,4 +83,4 @@ class ConsumerProcessor(FrameProcessor):
while True:
frame = await self._queue.get()
new_frame = await self._transformer(frame)
await self.push_frame(new_frame, self._direction)
await self.queue_frame(new_frame, self._direction)

View File

@@ -26,7 +26,7 @@ from pipecat.frames.frames import (
TTSTextFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.utils.string import concatenate_aggregated_text
from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text
from pipecat.utils.time import time_now_iso8601
@@ -98,15 +98,9 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
self._current_text_parts: List[str] = []
self._current_text_parts: List[TextPartForConcatenation] = []
self._aggregation_start_time: Optional[str] = None
# Whether to add spaces between text parts.
# (The use of this could be expanded to the UserTranscriptProcessor in
# the future if needed; currently the UserTranscriptProcessor assumes
# that user transcription frames do not need aggregation).
self._add_spaces = True
async def _emit_aggregated_text(self):
"""Aggregates and emits text fragments as a transcript message.
@@ -147,7 +141,7 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
Result: "Hello there how are you"
"""
if self._current_text_parts and self._aggregation_start_time:
content = concatenate_aggregated_text(self._current_text_parts, self._add_spaces)
content = concatenate_aggregated_text(self._current_text_parts)
if content:
logger.trace(f"Emitting aggregated assistant message: {content}")
message = TranscriptionMessage(
@@ -191,11 +185,11 @@ class AssistantTranscriptProcessor(BaseTranscriptProcessor):
if not self._aggregation_start_time:
self._aggregation_start_time = time_now_iso8601()
# Track whether we need to add spaces between text parts
# Assumption: we can just keep track of the latest frame's value
self._add_spaces = not frame.includes_inter_frame_spaces
self._current_text_parts.append(frame.text)
self._current_text_parts.append(
TextPartForConcatenation(
frame.text, includes_inter_part_spaces=frame.includes_inter_frame_spaces
)
)
# Push frame.
await self.push_frame(frame, direction)

View File

@@ -264,7 +264,10 @@ def _setup_webrtc_routes(
# Prepare runner arguments with the callback to run your bot
async def webrtc_connection_callback(connection):
bot_module = _get_bot_module()
runner_args = SmallWebRTCRunnerArguments(webrtc_connection=connection)
runner_args = SmallWebRTCRunnerArguments(
webrtc_connection=connection, body=request.request_data
)
background_tasks.add_task(bot_module.bot, runner_args)
# Delegate handling to SmallWebRTCRequestHandler
@@ -326,7 +329,8 @@ def _setup_webrtc_routes(
type=request_data["type"],
pc_id=request_data.get("pc_id"),
restart_pc=request_data.get("restart_pc"),
request_data=request_data,
request_data=request_data.get("request_data")
or request_data.get("requestData"),
)
return await offer(webrtc_request, background_tasks)
elif request.method == HTTPMethod.PATCH.value:

View File

@@ -281,6 +281,14 @@ async def maybe_capture_participant_camera(
except ImportError:
pass
try:
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
if isinstance(transport, SmallWebRTCTransport):
await transport.capture_participant_video(video_source="camera")
except ImportError:
pass
async def maybe_capture_participant_screen(
transport: BaseTransport, client: Any, framerate: int = 0
@@ -303,6 +311,14 @@ async def maybe_capture_participant_screen(
except ImportError:
pass
try:
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
if isinstance(transport, SmallWebRTCTransport):
await transport.capture_participant_video(video_source="screenVideo")
except ImportError:
pass
def _smallwebrtc_sdp_cleanup_ice_candidates(text: str, pattern: str) -> str:
"""Clean up ICE candidates in SDP text for SmallWebRTC.

View File

@@ -373,9 +373,7 @@ class AnthropicLLMService(LLMService):
if event.type == "content_block_delta":
if hasattr(event.delta, "text"):
frame = LLMTextFrame(event.delta.text)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self.push_frame(LLMTextFrame(event.delta.text))
completion_tokens_estimate += self._estimate_tokens(event.delta.text)
elif hasattr(event.delta, "partial_json") and tool_use_block:
json_accumulator += event.delta.partial_json

View File

@@ -146,15 +146,6 @@ class AsyncAITTSService(InterruptibleTTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that AsyncAI TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that AsyncAI's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Async language format.
@@ -433,15 +424,6 @@ class AsyncAIHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that AsyncAI TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that AsyncAI's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Async language format.

View File

@@ -1078,9 +1078,7 @@ class AWSBedrockLLMService(LLMService):
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"]["delta"]
if "text" in delta:
frame = LLMTextFrame(delta["text"])
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self.push_frame(LLMTextFrame(delta["text"]))
completion_tokens_estimate += self._estimate_tokens(delta["text"])
elif "toolUse" in delta and "input" in delta["toolUse"]:
# Handle partial JSON for tool use

View File

@@ -209,15 +209,6 @@ class AWSPollyTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that AWS TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that AWS's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to AWS Polly language format.

View File

@@ -151,15 +151,6 @@ class AzureBaseTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Azure TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Azure's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Azure language format.

View File

@@ -6,7 +6,9 @@
"""Deepgram Flux speech-to-text service implementation."""
import asyncio
import json
import time
from enum import Enum
from typing import Any, AsyncGenerator, Dict, Optional
from urllib.parse import urlencode
@@ -94,6 +96,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
mip_opt_out: Optional. Opts out requests from the Deepgram Model Improvement Program
(default False).
tag: List of tags to label requests for identification during usage reporting.
min_confidence: Optional. Minimum confidence required confidence to create a TranscriptionFrame
"""
eager_eot_threshold: Optional[float] = None
@@ -102,6 +105,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
keyterm: list = []
mip_opt_out: Optional[bool] = None
tag: list = []
min_confidence: Optional[float] = None # New parameter
def __init__(
self,
@@ -163,6 +167,13 @@ class DeepgramFluxSTTService(WebsocketSTTService):
self._register_event_handler("on_end_of_turn")
self._register_event_handler("on_eager_end_of_turn")
self._register_event_handler("on_update")
self._connection_established_event = asyncio.Event()
# Watchdog task to prevent dangling tasks
# If we stop sending audio to Flux after we have received that the User has started speaking
# we never receive the user stopped speaking event unless we resume sending audio to it.
self._last_stt_time = None
self._watchdog_task = None
self._user_is_speaking = False
async def _connect(self):
"""Connect to WebSocket and start background tasks.
@@ -172,9 +183,6 @@ class DeepgramFluxSTTService(WebsocketSTTService):
"""
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
"""Disconnect from WebSocket and clean up tasks.
@@ -182,14 +190,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
and cleans up resources to prevent memory leaks.
"""
try:
# Cancel background tasks BEFORE closing websocket
if self._receive_task:
await self.cancel_task(self._receive_task, timeout=2.0)
self._receive_task = None
# Now close the websocket
await self._disconnect_websocket()
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
@@ -197,6 +198,25 @@ class DeepgramFluxSTTService(WebsocketSTTService):
# Reset state only after everything is cleaned up
self._websocket = None
async def _send_silence(self, duration_secs: float = 0.5):
"""Send a block of silence of the specified duration (default 500 ms)."""
sample_width = 2 # bytes per sample for 16-bit PCM
num_channels = 1 # mono
num_samples = int(self.sample_rate * duration_secs)
silence = b"\x00" * (num_samples * sample_width * num_channels)
await self._websocket.send(silence)
async def _watchdog_task_handler(self):
while self._websocket and self._websocket.state is State.OPEN:
now = time.monotonic()
# More than 500 ms without sending new audio to Flux
if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > 0.5:
logger.warning("Sending silence to Flux to prevent dangling task")
await self._send_silence()
self._last_stt_time = time.monotonic()
# check every 100ms
await asyncio.sleep(0.1)
async def _connect_websocket(self):
"""Establish WebSocket connection to API.
@@ -208,10 +228,26 @@ class DeepgramFluxSTTService(WebsocketSTTService):
if self._websocket and self._websocket.state is State.OPEN:
return
self._connection_established_event.clear()
self._user_is_speaking = False
self._websocket = await websocket_connect(
self._websocket_url,
additional_headers={"Authorization": f"Token {self._api_key}"},
)
# Creating the receiver task
if not self._receive_task:
self._receive_task = self.create_task(
self._receive_task_handler(self._report_error)
)
# Creating the watchdog task
if not self._watchdog_task:
self._watchdog_task = self.create_task(self._watchdog_task_handler())
# Now wait for the connection established event
logger.debug("WebSocket connected, waiting for server confirmation...")
await self._connection_established_event.wait()
logger.debug("Connected to Deepgram Flux Websocket")
await self._call_event_handler("on_connected")
except Exception as e:
@@ -227,6 +263,16 @@ class DeepgramFluxSTTService(WebsocketSTTService):
metrics collection. Handles disconnection errors gracefully.
"""
try:
# Cancel background tasks BEFORE closing websocket
if self._receive_task:
await self.cancel_task(self._receive_task, timeout=2.0)
self._receive_task = None
if self._watchdog_task:
await self.cancel_task(self._watchdog_task, timeout=2.0)
self._watchdog_task = None
self._last_stt_time = None
self._connection_established_event.clear()
await self.stop_all_metrics()
if self._websocket:
@@ -340,7 +386,8 @@ class DeepgramFluxSTTService(WebsocketSTTService):
return
try:
await self._websocket.send(audio)
self._last_stt_time = time.monotonic()
await self.send_with_retry(audio, self._report_error)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
@@ -463,6 +510,8 @@ class DeepgramFluxSTTService(WebsocketSTTService):
transcription processing.
"""
logger.info("Connected to Flux - ready to stream audio")
# Notify connection is established
self._connection_established_event.set()
async def _handle_fatal_error(self, data: Dict[str, Any]):
"""Handle fatal error messages from Deepgram Flux.
@@ -530,6 +579,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
transcript: maybe the first few words of the turn.
"""
logger.debug("User started speaking")
self._user_is_speaking = True
await self.push_interruption_task_frame_and_wait()
await self.broadcast_frame(UserStartedSpeakingFrame)
await self.start_metrics()
@@ -550,6 +600,22 @@ class DeepgramFluxSTTService(WebsocketSTTService):
logger.trace(f"Received event TurnResumed: {event}")
await self._call_event_handler("on_turn_resumed")
def _calculate_average_confidence(self, transcript_data) -> Optional[float]:
"""Calculate the average confidence from transcript data.
Return None if the data is missing or invalid.
"""
# Example: Assume transcript_data has a list of words with confidence
words = transcript_data.get("words")
if not words or not isinstance(words, list):
return None
confidences = [
w.get("confidence") for w in words if isinstance(w.get("confidence"), (float, int))
]
if not confidences:
return None
return sum(confidences) / len(confidences)
async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]):
"""Handle EndOfTurn events from Deepgram Flux.
@@ -569,16 +635,26 @@ class DeepgramFluxSTTService(WebsocketSTTService):
data: The TurnInfo message data containing event type, transcript and some extra metadata.
"""
logger.debug("User stopped speaking")
self._user_is_speaking = False
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
self._language,
result=data,
# Compute the average confidence
average_confidence = self._calculate_average_confidence(data)
if not self._params.min_confidence or average_confidence > self._params.min_confidence:
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
self._language,
result=data,
)
)
)
else:
logger.warning(
f"Transcription confidence below min_confidence threshold: {average_confidence}"
)
await self._handle_transcription(transcript, True, self._language)
await self.stop_processing_metrics()
await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)

View File

@@ -79,15 +79,6 @@ class DeepgramTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Deepgram TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Deepgram's text frames include necessary inter-frame spaces.
"""
return True
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Deepgram's TTS API.
@@ -177,15 +168,6 @@ class DeepgramHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Deepgram TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Deepgram's text frames include necessary inter-frame spaces.
"""
return True
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Deepgram's TTS API.

View File

@@ -159,15 +159,6 @@ class FishAudioTTSService(InterruptibleTTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Fish Audio TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Fish Audio's text frames include necessary inter-frame spaces.
"""
return True
async def set_model(self, model: str):
"""Set the TTS model and reconnect.

View File

@@ -1452,8 +1452,6 @@ class GeminiLiveLLMService(LLMService):
self._bot_text_buffer += text
self._search_result_buffer += text # Also accumulate for grounding
frame = LLMTextFrame(text=text)
# Gemini Live text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
# Check for grounding metadata in server content

View File

@@ -920,9 +920,7 @@ class GoogleLLMService(LLMService):
for part in candidate.content.parts:
if not part.thought and part.text:
search_result += part.text
frame = LLMTextFrame(part.text)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self.push_frame(LLMTextFrame(part.text))
elif part.function_call:
function_call = part.function_call
id = function_call.id or str(uuid.uuid4())

View File

@@ -596,15 +596,6 @@ class GoogleHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Google TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Google's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Google TTS language format.
@@ -803,15 +794,6 @@ class GoogleBaseTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Google and Gemini TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Google's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Google TTS language format.

View File

@@ -111,15 +111,6 @@ class GroqTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Groq TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Groq's text frames include necessary inter-frame spaces.
"""
return True
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Groq's TTS API.

View File

@@ -14,12 +14,14 @@ from pydantic import BaseModel
from pipecat.frames.frames import (
ErrorFrame,
Frame,
InterruptionFrame,
StartFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.tts_service import TTSService
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.tts_service import WordTTSService
from pipecat.utils.tracing.service_decorators import traced_tts
try:
@@ -29,6 +31,7 @@ try:
PostedUtterance,
PostedUtteranceVoiceWithId,
)
from hume.tts.types import TimestampMessage
except ModuleNotFoundError as e: # pragma: no cover - import-time guidance
logger.error(f"Exception: {e}")
logger.error("In order to use Hume, you need to `pip install pipecat-ai[hume]`.")
@@ -38,7 +41,7 @@ except ModuleNotFoundError as e: # pragma: no cover - import-time guidance
HUME_SAMPLE_RATE = 48_000 # Hume TTS streams at 48 kHz
class HumeTTSService(TTSService):
class HumeTTSService(WordTTSService):
"""Hume Octave Text-to-Speech service.
Streams PCM audio via Hume's HTTP output streaming (JSON chunks) endpoint
@@ -48,6 +51,7 @@ class HumeTTSService(TTSService):
- Generates speech from text using Hume TTS.
- Streams PCM audio.
- Supports word-level timestamps for precise audio-text synchronization.
- Supports dynamic updates of voice and synthesis parameters at runtime.
- Provides metrics for Time To First Byte (TTFB) and TTS usage.
"""
@@ -92,7 +96,13 @@ class HumeTTSService(TTSService):
f"Hume TTS streams at {HUME_SAMPLE_RATE} Hz; configured sample_rate={sample_rate}"
)
super().__init__(sample_rate=sample_rate, **kwargs)
# WordTTSService sets push_text_frames=False by default, which we want
super().__init__(
sample_rate=sample_rate,
push_text_frames=False,
push_stop_frames=True,
**kwargs,
)
self._client = AsyncHumeClient(api_key=api_key)
self._params = params or HumeTTSService.InputParams()
@@ -102,6 +112,10 @@ class HumeTTSService(TTSService):
self._audio_bytes = b""
# Track cumulative time for word timestamps across utterances
self._cumulative_time = 0.0
self._started = False
def can_generate_metrics(self) -> bool:
"""Can generate metrics.
@@ -110,15 +124,6 @@ class HumeTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Hume TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Hume's text frames include necessary inter-frame spaces.
"""
return True
async def start(self, frame: StartFrame) -> None:
"""Start the service.
@@ -126,6 +131,27 @@ class HumeTTSService(TTSService):
frame: The start frame.
"""
await super().start(frame)
self._reset_state()
def _reset_state(self):
"""Reset internal state variables."""
self._cumulative_time = 0.0
self._started = False
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Push a frame and handle state changes.
Args:
frame: The frame to push.
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
if isinstance(frame, (InterruptionFrame, TTSStoppedFrame)):
# Reset timing on interruption or stop
self._reset_state()
if isinstance(frame, TTSStoppedFrame):
await self.add_word_timestamps([("Reset", 0)])
async def update_setting(self, key: str, value: Any) -> None:
"""Runtime updates via `TTSUpdateSettingsFrame`.
@@ -142,7 +168,7 @@ class HumeTTSService(TTSService):
if key_l == "voice_id":
self.set_voice(str(value))
logger.info(f"HumeTTSService voice_id set to: {self.voice}")
logger.debug(f"HumeTTSService voice_id set to: {self.voice}")
elif key_l == "description":
self._params.description = None if value is None else str(value)
elif key_l == "speed":
@@ -155,7 +181,7 @@ class HumeTTSService(TTSService):
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Hume TTS.
"""Generate speech from text using Hume TTS with word timestamps.
Args:
text: The text to be synthesized.
@@ -186,7 +212,12 @@ class HumeTTSService(TTSService):
await self.start_ttfb_metrics()
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
# Start TTS sequence if not already started
if not self._started:
self.start_word_timestamps()
yield TTSStartedFrame()
self._started = True
try:
# Instant mode is always enabled here (not user-configurable)
@@ -197,23 +228,50 @@ class HumeTTSService(TTSService):
# Use version "2" by default if no description is provided
# Version "1" is needed when description is used
version = "1" if self._params.description is not None else "2"
# Track the duration of this utterance based on the last timestamp
utterance_duration = 0.0
async for chunk in self._client.tts.synthesize_json_streaming(
utterances=[utterance],
format=pcm_fmt,
instant_mode=True,
version=version,
include_timestamp_types=["word"], # Request word-level timestamps
):
# Process audio chunks
audio_b64 = getattr(chunk, "audio", None)
if not audio_b64:
continue
if audio_b64:
await self.stop_ttfb_metrics()
pcm_bytes = base64.b64decode(audio_b64)
self._audio_bytes += pcm_bytes
pcm_bytes = base64.b64decode(audio_b64)
self._audio_bytes += pcm_bytes
# Buffer audio until we have enough to avoid glitches
if len(self._audio_bytes) >= self.chunk_size:
frame = TTSAudioRawFrame(
audio=self._audio_bytes,
sample_rate=self.sample_rate,
num_channels=1,
)
yield frame
self._audio_bytes = b""
# Buffer audio until we have enough to avoid glitches
if len(self._audio_bytes) < self.chunk_size:
continue
# Process timestamp messages
if isinstance(chunk, TimestampMessage):
timestamp = chunk.timestamp
if timestamp.type == "word":
# Convert milliseconds to seconds and add cumulative offset
word_start_time = self._cumulative_time + (timestamp.time.begin / 1000.0)
word_end_time = self._cumulative_time + (timestamp.time.end / 1000.0)
# Track the maximum end time for this utterance
utterance_duration = max(utterance_duration, word_end_time)
# Add word timestamp
await self.add_word_timestamps([(timestamp.text, word_start_time)])
# Flush any remaining audio bytes
if self._audio_bytes:
frame = TTSAudioRawFrame(
audio=self._audio_bytes,
sample_rate=self.sample_rate,
@@ -224,10 +282,14 @@ class HumeTTSService(TTSService):
self._audio_bytes = b""
# Update cumulative time for next utterance
if utterance_duration > 0:
self._cumulative_time = utterance_duration
except Exception as e:
logger.error(f"{self} exception: {e}")
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
finally:
# Ensure TTFB timer is stopped even on early failures
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()
# Let the parent class handle TTSStoppedFrame via push_stop_frames

View File

@@ -146,6 +146,8 @@ class InworldTTSService(TTSService):
Parameters:
temperature: Voice temperature control for synthesis variability (e.g., 1.1).
Valid range: [0, 2]. Higher values increase variability.
speaking_rate: Speaking speed control (range: [0.5, 1.5]). Defaults to 1.0 when
unset.
Note:
Language is automatically inferred from the input text by Inworld's TTS models,
@@ -153,6 +155,7 @@ class InworldTTSService(TTSService):
"""
temperature: Optional[float] = None # optional temperature control (range: [0, 2])
speaking_rate: Optional[float] = None # optional speaking rate control (range: [0.5, 1.5])
def __init__(
self,
@@ -198,6 +201,7 @@ class InworldTTSService(TTSService):
- Other formats as supported by Inworld API
params: Optional input parameters for additional configuration. Use this to specify:
- temperature: Voice temperature control for variability (range: [0, 2], e.g., 1.1, optional)
- speaking_rate: Set desired speaking speed (range: [0.5, 1.5], optional)
Language is automatically inferred from input text.
**kwargs: Additional arguments passed to the parent TTSService class.
@@ -228,15 +232,18 @@ class InworldTTSService(TTSService):
self._settings = {
"voiceId": voice_id, # Voice selection from direct parameter
"modelId": model, # TTS model selection from direct parameter
"audio_config": { # Audio format configuration
"audio_encoding": encoding, # Format: LINEAR16, MP3, etc.
"sample_rate_hertz": 0, # Will be set in start() from parent service
"audioConfig": { # Audio format configuration
"audioEncoding": encoding, # Format: LINEAR16, MP3, etc.
"sampleRateHertz": 0, # Will be set in start() from parent service
},
}
# Add optional temperature parameter if provided (valid range: [0, 2])
if params and params.temperature is not None:
self._settings["temperature"] = params.temperature
# Add optional speaking rate if provided (valid range: [0.5, 1.5])
if params and params.speaking_rate is not None:
self._settings["audioConfig"]["speakingRate"] = params.speaking_rate
# Register voice and model with parent service for metrics and tracking
self.set_voice(voice_id) # Used for logging and metrics
@@ -250,15 +257,6 @@ class InworldTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Inworld TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Inworld's text frames include necessary inter-frame spaces.
"""
return True
async def start(self, frame: StartFrame):
"""Start the Inworld TTS service.
@@ -266,7 +264,7 @@ class InworldTTSService(TTSService):
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._settings["audio_config"]["sample_rate_hertz"] = self.sample_rate
self._settings["audioConfig"]["sampleRateHertz"] = self.sample_rate
async def stop(self, frame: EndFrame):
"""Stop the Inworld TTS service.
@@ -332,9 +330,7 @@ class InworldTTSService(TTSService):
"text": text, # Text to synthesize
"voiceId": self._settings["voiceId"], # Voice selection (Ashley, Hades, etc.)
"modelId": self._settings["modelId"], # TTS model (inworld-tts-1)
"audio_config": self._settings[
"audio_config"
], # Audio format settings (LINEAR16, 48kHz)
"audioConfig": self._settings["audioConfig"], # Audio format settings (LINEAR16, 48kHz)
}
# Add optional temperature parameter if configured (valid range: [0, 2])

View File

@@ -124,15 +124,6 @@ class LmntTTSService(InterruptibleTTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that LMNT TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that LMNT's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to LMNT service language format.

View File

@@ -194,15 +194,6 @@ class MiniMaxHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that MiniMax TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that MiniMax's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to MiniMax service language format.

View File

@@ -151,15 +151,6 @@ class NeuphonicTTSService(InterruptibleTTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Neuphonic TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Neuphonic's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Neuphonic service language format.
@@ -449,15 +440,6 @@ class NeuphonicHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Neuphonic TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Neuphonic's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Neuphonic service language format.

View File

@@ -390,9 +390,7 @@ class BaseOpenAILLMService(LLMService):
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
elif chunk.choices[0].delta.content:
frame = LLMTextFrame(chunk.choices[0].delta.content)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.content))
# When gpt-4o-audio / gpt-4o-mini-audio is used for llm or stt+llm
# we need to get LLMTextFrame for the transcript

View File

@@ -678,8 +678,6 @@ class OpenAIRealtimeLLMService(LLMService):
# the output modality is "text"
if evt.delta:
frame = LLMTextFrame(evt.delta)
# OpenAI Realtime text already includes any necessary inter-chunk spaces
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
async def _handle_evt_audio_transcript_delta(self, evt):

View File

@@ -131,15 +131,6 @@ class OpenAITTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that OpenAI TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that OpenAI's text frames include necessary inter-frame spaces.
"""
return True
async def set_model(self, model: str):
"""Set the TTS model to use.

View File

@@ -66,15 +66,6 @@ class PiperTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Piper TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Piper's text frames include necessary inter-frame spaces.
"""
return True
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Piper's HTTP API.

View File

@@ -501,15 +501,6 @@ class RimeHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Rime TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Rime's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> str | None:
"""Convert pipecat language to Rime language code.

View File

@@ -113,15 +113,6 @@ class RivaTTSService(TTSService):
riva.client.proto.riva_tts_pb2.RivaSynthesisConfigRequest()
)
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Riva TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Riva's text frames include necessary inter-frame spaces.
"""
return True
async def set_model(self, model: str):
"""Attempt to set the TTS model.
@@ -166,7 +157,6 @@ class RivaTTSService(TTSService):
add_response(None)
except Exception as e:
logger.error(f"{self} exception: {e}")
yield ErrorFrame(error=f"{self} error: {e}")
add_response(None)
await self.start_ttfb_metrics()
@@ -191,6 +181,7 @@ class RivaTTSService(TTSService):
resp = await asyncio.wait_for(queue.get(), timeout=RIVA_TTS_TIMEOUT_SECS)
except asyncio.TimeoutError:
logger.error(f"{self} timeout waiting for audio response")
yield ErrorFrame(error=f"{self} error: {e}")
await self.start_tts_usage_metrics(text)
yield TTSStoppedFrame()

View File

@@ -176,9 +176,7 @@ class SambaNovaLLMService(OpenAILLMService): # type: ignore
# Keep iterating through the response to collect all the argument fragments
arguments += tool_call.function.arguments
elif chunk.choices[0].delta.content:
frame = LLMTextFrame(chunk.choices[0].delta.content)
frame.includes_inter_frame_spaces = True
await self.push_frame(frame)
await self.push_frame(LLMTextFrame(chunk.choices[0].delta.content))
# When gpt-4o-audio / gpt-4o-mini-audio is used for llm or stt+llm
# we need to get LLMTextFrame for the transcript

View File

@@ -195,15 +195,6 @@ class SarvamHttpTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Sarvam TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Sarvam's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Sarvam AI language format.
@@ -467,15 +458,6 @@ class SarvamTTSService(InterruptibleTTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Sarvam TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Sarvam's text frames include necessary inter-frame spaces.
"""
return True
def language_to_service_language(self, language: Language) -> Optional[str]:
"""Convert a Language enum to Sarvam AI language format.

View File

@@ -84,6 +84,10 @@ class SimliVideoService(FrameProcessor):
Please use 'api_key' and 'face_id' parameters instead.
use_turn_server: Whether to use TURN server for connection. Defaults to False.
.. deprecated:: 0.0.95
The 'use_turn_server' parameter is deprecated and will be removed in a future version.
latency_interval: Latency interval setting for sending health checks to check
the latency to Simli Servers. Defaults to 0.
simli_url: URL of the simli servers. Can be changed for custom deployments
@@ -135,14 +139,20 @@ class SimliVideoService(FrameProcessor):
config = SimliConfig(**config_kwargs)
if use_turn_server:
warnings.warn(
"The 'use_turn_server' parameter is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
self._initialized = False
# Add buffer time to session limits
config.maxIdleTime += 5
config.maxSessionLength += 5
self._simli_client = SimliClient(
config,
use_turn_server,
latency_interval,
config=config,
latencyInterval=latency_interval,
simliURL=simli_url,
)

View File

@@ -105,15 +105,6 @@ class SpeechmaticsTTSService(TTSService):
"""
return True
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates that Speechmatics TTSTextFrames include necessary inter-frame spaces.
Returns:
True, indicating that Speechmatics's text frames include necessary inter-frame spaces.
"""
return True
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Speechmatics' HTTP API.

View File

@@ -38,7 +38,7 @@ class STTService(AIService):
Event handlers:
on_connected: Called when connected to the STT service.
on_connected: Called when disconnected from the STT service.
on_disconnected: Called when disconnected from the STT service.
on_connection_error: Called when a connection to the STT service error occurs.
Example::

View File

@@ -142,6 +142,7 @@ class TTSService(AIService):
self._voice_id: str = ""
self._settings: Dict[str, Any] = {}
self._text_aggregator: BaseTextAggregator = text_aggregator or SimpleTextAggregator()
self._aggregated_text_includes_inter_frame_spaces: bool = False
self._text_filters: Sequence[BaseTextFilter] = text_filters or []
self._transport_destination: Optional[str] = transport_destination
self._tracing_enabled: bool = False
@@ -192,23 +193,6 @@ class TTSService(AIService):
CHUNK_SECONDS = 0.5
return int(self.sample_rate * CHUNK_SECONDS * 2) # 2 bytes/sample
@property
def includes_inter_frame_spaces(self) -> bool:
"""Indicates whether TTSTextFrames include necesary inter-frame spaces.
When True, the TTSTextFrame objects pushed by this service already
include all necessary spaces between subsequent frames. When False,
downstream processors (like the assistant context aggregator) may need
to add spacing.
Subclasses should override this property to return True if their text
generation process already includes necessary inter-frame spaces.
Returns:
False by default. Subclasses can override to return True.
"""
return False
async def set_model(self, model: str):
"""Set the TTS model to use.
@@ -369,9 +353,16 @@ class TTSService(AIService):
await self._maybe_pause_frame_processing()
sentence = self._text_aggregator.text
includes_inter_frame_spaces = self._aggregated_text_includes_inter_frame_spaces
# Reset aggregator state
await self._text_aggregator.reset()
self._processing_text = False
await self._push_tts_frames(sentence)
self._aggregated_text_includes_inter_frame_spaces = False
await self._push_tts_frames(
sentence, includes_inter_frame_spaces=includes_inter_frame_spaces
)
if isinstance(frame, LLMFullResponseEndFrame):
if self._push_text_frames:
await self.push_frame(frame, direction)
@@ -380,7 +371,8 @@ class TTSService(AIService):
elif isinstance(frame, TTSSpeakFrame):
# Store if we were processing text or not so we can set it back.
processing_text = self._processing_text
await self._push_tts_frames(frame.text)
# Assumption: text in TTSSpeakFrame does not include inter-frame spaces
await self._push_tts_frames(frame.text, includes_inter_frame_spaces=False)
# We pause processing incoming frames because we are sending data to
# the TTS. We pause to avoid audio overlapping.
await self._maybe_pause_frame_processing()
@@ -474,11 +466,17 @@ class TTSService(AIService):
text = frame.text
else:
text = await self._text_aggregator.aggregate(frame.text)
# Assumption: whether inter-frame spaces are included shouldn't
# change during aggregation, so we can just use the latest frame's
# value
self._aggregated_text_includes_inter_frame_spaces = frame.includes_inter_frame_spaces
if text:
await self._push_tts_frames(text)
await self._push_tts_frames(
text, includes_inter_frame_spaces=frame.includes_inter_frame_spaces
)
async def _push_tts_frames(self, text: str):
async def _push_tts_frames(self, text: str, includes_inter_frame_spaces: bool):
# Remove leading newlines only
text = text.lstrip("\n")
@@ -508,7 +506,7 @@ class TTSService(AIService):
# We send the original text after the audio. This way, if we are
# interrupted, the text is not added to the assistant context.
frame = TTSTextFrame(text)
frame.includes_inter_frame_spaces = self.includes_inter_frame_spaces
frame.includes_inter_frame_spaces = includes_inter_frame_spaces
await self.push_frame(frame)
async def _stop_frame_handler(self):
@@ -635,6 +633,8 @@ class WordTTSService(TTSService):
frame = TTSStoppedFrame()
frame.pts = last_pts
else:
# Assumption: word-by-word text frames don't include spaces, so
# we can rely on the default includes_inter_frame_spaces=False
frame = TTSTextFrame(word)
frame.pts = self._initial_word_timestamp + timestamp
if frame:

View File

@@ -36,6 +36,7 @@ class WebsocketService(ABC):
"""
self._websocket: Optional[websockets.WebSocketClientProtocol] = None
self._reconnect_on_error = reconnect_on_error
self._reconnect_in_progress: bool = False # Add this flag
async def _verify_connection(self) -> bool:
"""Verify the websocket connection is active and responsive.
@@ -66,6 +67,59 @@ class WebsocketService(ABC):
await self._connect_websocket()
return await self._verify_connection()
async def _try_reconnect(
self,
max_retries: int = 3,
report_error: Optional[Callable[[ErrorFrame], Awaitable[None]]] = None,
) -> bool:
# Prevent concurrent reconnection attempts
if self._reconnect_in_progress:
logger.warning(f"{self} reconnect attempt aborted: already in progress")
return False
self._reconnect_in_progress = True
last_exception: Optional[Exception] = None
try:
for attempt in range(1, max_retries + 1):
try:
logger.warning(f"{self} reconnecting, attempt {attempt}")
if await self._reconnect_websocket(attempt):
logger.info(f"{self} reconnected successfully on attempt {attempt}")
return True
except Exception as e:
last_exception = e
logger.error(f"{self} reconnection attempt {attempt} failed: {e}")
if report_error:
await report_error(
ErrorFrame(f"{self} reconnection attempt {attempt} failed: {e}")
)
wait_time = exponential_backoff_time(attempt)
await asyncio.sleep(wait_time)
fatal_msg = f"{self} failed to reconnect after {max_retries} attempts"
if last_exception:
fatal_msg += f": {last_exception}"
logger.error(fatal_msg)
if report_error:
await report_error(ErrorFrame(fatal_msg, fatal=True))
return False
finally:
self._reconnect_in_progress = False
async def send_with_retry(self, message, report_error: Callable[[ErrorFrame], Awaitable[None]]):
"""Attempt to send a message, retrying after reconnect if necessary."""
try:
await self._websocket.send(message)
except Exception as e:
logger.error(f"{self} send failed: {e}, will try to reconnect")
# Try to reconnect before retrying
success = await self._try_reconnect(report_error=report_error)
if success:
logger.info(f"{self} reconnected successfully, will retry send the message")
# trying to send the message one more time
await self._websocket.send(message)
else:
logger.error(f"{self} send failed; unable to reconnect")
async def _receive_task_handler(self, report_error: Callable[[ErrorFrame], Awaitable[None]]):
"""Handle websocket message receiving with automatic retry logic.
@@ -76,13 +130,9 @@ class WebsocketService(ABC):
Args:
report_error: Callback function to report connection errors.
"""
retry_count = 0
MAX_RETRIES = 3
while True:
try:
await self._receive_messages()
retry_count = 0 # Reset counter on successful message receive
except ConnectionClosedOK as e:
# Normal closure, don't retry
logger.debug(f"{self} connection closed normally: {e}")
@@ -92,21 +142,9 @@ class WebsocketService(ABC):
logger.error(message)
if self._reconnect_on_error:
retry_count += 1
if retry_count >= MAX_RETRIES:
await report_error(ErrorFrame(message))
success = await self._try_reconnect(report_error=report_error)
if not success:
break
logger.warning(f"{self} connection error, will retry: {e}")
await report_error(ErrorFrame(message))
try:
if await self._reconnect_websocket(retry_count):
retry_count = 0 # Reset counter on successful reconnection
wait_time = exponential_backoff_time(retry_count)
await asyncio.sleep(wait_time)
except Exception as reconnect_error:
logger.error(f"{self} reconnection failed: {reconnect_error}")
else:
await report_error(ErrorFrame(message))
break

View File

@@ -18,6 +18,7 @@ Dependencies:
"""
import re
from dataclasses import dataclass
from typing import FrozenSet, List, Optional, Sequence, Tuple
import nltk
@@ -198,7 +199,24 @@ def parse_start_end_tags(
return (None, current_tag_index)
def concatenate_aggregated_text(text_parts: List[str], add_spaces: bool) -> str:
@dataclass
class TextPartForConcatenation:
"""Class representing a part of text for concatenation with concatenate_aggregated_text.
Attributes:
text: The text content.
includes_inter_part_spaces: Whether any necessary inter-frame
(leading/trailing) spaces are already included in the text.
"""
text: str
includes_inter_part_spaces: bool
def __str__(self):
return f"{self.name}(text: [{self.text}], includes_inter_part_spaces: {self.includes_inter_part_spaces})"
def concatenate_aggregated_text(text_parts: List[TextPartForConcatenation]) -> str:
"""Concatenate a list of text parts into a single string.
This function joins the provided list of text parts into a single string,
@@ -208,15 +226,55 @@ def concatenate_aggregated_text(text_parts: List[str], add_spaces: bool) -> str:
transcription services.
Args:
text_parts: A list of strings representing parts of text to concatenate.
add_spaces: Whether to add spaces between text parts during concatenation.
text_parts: A list of text parts to concatenate.
Returns:
A single concatenated string.
"""
# Concatenate text parts with or without spaces based on the flag
separator = " " if add_spaces else ""
result = separator.join(text_parts)
result = ""
last_includes_inter_part_spaces = False
if not text_parts:
return result
def append_part(part: TextPartForConcatenation):
nonlocal result
nonlocal last_includes_inter_part_spaces
result += part.text
last_includes_inter_part_spaces = part.includes_inter_part_spaces
for part in text_parts:
# Part is empty.
# Skip.
if not part.text:
continue
# Result is as yet empty.
# Just append.
if not result:
append_part(part)
continue
if part.includes_inter_part_spaces and last_includes_inter_part_spaces:
# This part is part of an ongoing run that has spaces already included.
# Just append.
append_part(part)
elif not part.includes_inter_part_spaces and not last_includes_inter_part_spaces:
# This part is part of an ongoing run that has no spaces included.
# Add a space before appending.
result += " "
append_part(part)
else:
# This part represents a transition to a new run (spaces -> no spaces, or vice versa).
# Add a space if needed, before appending.
if not result[-1].isspace() and not part.text[0].isspace():
result += " "
append_part(part)
# NOTE: the above logic assumes that runs of text parts with
# includes_inter_part_spaces=True are well-formed, i.e. they're not
# actually multiple separate runs with a space-less boundary, like
# "hello ", "world.", "goodnight ", "moon."
# Clean up any excessive whitespace
result = result.strip()

View File

@@ -26,7 +26,6 @@ class BaseTextFilter(ABC):
behavior, settings management, and interruption handling logic.
"""
@abstractmethod
async def update_settings(self, settings: Mapping[str, Any]):
"""Update the filter's configuration settings.
@@ -53,7 +52,6 @@ class BaseTextFilter(ABC):
"""
pass
@abstractmethod
async def handle_interruption(self):
"""Handle interruption events in the processing pipeline.
@@ -62,7 +60,6 @@ class BaseTextFilter(ABC):
"""
pass
@abstractmethod
async def reset_interruption(self):
"""Reset the filter state after an interruption has been handled.

View File

@@ -23,7 +23,7 @@ if TYPE_CHECKING:
from opentelemetry import context as context_api
from opentelemetry import trace
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_context import NOT_GIVEN, LLMContext
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.utils.tracing.service_attributes import (
add_gemini_live_span_attributes,
@@ -399,11 +399,6 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -
if hasattr(self, "get_llm_adapter"):
adapter = self.get_llm_adapter()
messages = adapter.get_messages_for_logging(context)
elif hasattr(context, "get_messages"):
# Fallback for unknown context types
messages = context.get_messages()
elif hasattr(context, "messages"):
messages = context.messages
# Serialize messages if available
if messages:
@@ -424,15 +419,10 @@ def traced_llm(func: Optional[Callable] = None, *, name: Optional[str] = None) -
if hasattr(self, "get_llm_adapter") and hasattr(context, "tools"):
adapter = self.get_llm_adapter()
tools = adapter.from_standard_tools(context.tools)
elif hasattr(context, "tools"):
# Fallback for unknown context types
tools = context.tools
# Serialize and count tools if available
# Check if tools is not None and not NOT_GIVEN (using attribute check as fallback)
if tools is not None and not (
hasattr(tools, "__name__") and tools.__name__ == "NOT_GIVEN"
):
# Check if tools is not None and not NOT_GIVEN
if tools is not None and tools is not NOT_GIVEN:
serialized_tools = json.dumps(tools)
tool_count = len(tools) if isinstance(tools, list) else 1

View File

@@ -1005,3 +1005,53 @@ class TestLLMAssistantAggregator(
) -> Optional[LLMAssistantAggregatorParams]:
kwargs.pop("expect_stripped_words", None)
return LLMAssistantAggregatorParams(**kwargs) if kwargs else None
async def test_multiple_text_mixed(self):
assert self.CONTEXT_CLASS is not None, "CONTEXT_CLASS must be set in a subclass"
assert self.AGGREGATOR_CLASS is not None, "AGGREGATOR_CLASS must be set in a subclass"
context = self.CONTEXT_CLASS()
aggregator = self.AGGREGATOR_CLASS(
context, params=self.create_assistant_aggregator_params(expect_stripped_words=False)
)
# The newer LLMAssistantAggregator expects TextFrames to declare
# when they include inter-frame spaces.
def make_text_frame(text: str, includes_spaces: bool) -> TextFrame:
frame = TextFrame(text=text)
frame.includes_inter_frame_spaces = includes_spaces
return frame
frames_to_send = [
LLMFullResponseStartFrame(),
make_text_frame("Hello ", includes_spaces=True),
make_text_frame("Pipecat. ", includes_spaces=True),
make_text_frame("Here's some", includes_spaces=True),
make_text_frame(
" code:", includes_spaces=True
), # Validates ending includes_inter_frame_spaces run with no space
make_text_frame("```python\nprint('Hello, World!')\n```", includes_spaces=False),
make_text_frame(
"```javascript\nconsole.log('Hello, World!');\n```", includes_spaces=False
),
make_text_frame(
" And some more: ", includes_spaces=True
), # Validates starting includes_inter_frame_spaces run with a space and ending it with no space
make_text_frame("```html\n<div>Hello, World!</div>\n```", includes_spaces=False),
make_text_frame(
"Hope that ", includes_spaces=True
), # Validates starting includes_inter_frame_spaces run with no space
make_text_frame("helps!", includes_spaces=True),
LLMFullResponseEndFrame(),
]
expected_down_frames = [*self.EXPECTED_CONTEXT_FRAMES]
await run_test(
aggregator,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
self.check_message_content(
context,
0,
"Hello Pipecat. Here's some code: ```python\nprint('Hello, World!')\n``` ```javascript\nconsole.log('Hello, World!');\n``` And some more: ```html\n<div>Hello, World!</div>\n``` Hope that helps!",
)

15
uv.lock generated
View File

@@ -36,12 +36,12 @@ wheels = [
[[package]]
name = "aic-sdk"
version = "1.0.2"
version = "1.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy" },
]
sdist = { url = "https://files.pythonhosted.org/packages/51/90/b02e853e863c303f8456c689b42ac24ad403b781adc9642d0a91ed4bed7e/aic_sdk-1.0.2.tar.gz", hash = "sha256:239097dd3aaa8a8a0fd7542b75d2510cb34144caec796370639b7c636acbc56e", size = 32059, upload-time = "2025-08-24T09:20:03.9Z" }
sdist = { url = "https://files.pythonhosted.org/packages/99/83/bf38b95d98c67b8ebc574fb4a4f23c07a3740b51992d7522976173d30b98/aic_sdk-1.1.0.tar.gz", hash = "sha256:04e08df695581c8cb4db8acca20e73815e9f449e7bd08e0162fd55518c727963", size = 34954, upload-time = "2025-11-11T20:45:24.25Z" }
[[package]]
name = "aioboto3"
@@ -4647,7 +4647,7 @@ docs = [
[package.metadata]
requires-dist = [
{ name = "accelerate", marker = "extra == 'moondream'", specifier = "~=1.10.0" },
{ name = "aic-sdk", marker = "extra == 'aic'", specifier = "~=1.0.1" },
{ name = "aic-sdk", marker = "extra == 'aic'", specifier = "~=1.1.0" },
{ name = "aioboto3", marker = "extra == 'aws'", specifier = "~=15.0.0" },
{ name = "aiofiles", specifier = ">=24.1.0,<25" },
{ name = "aiohttp", specifier = ">=3.11.12,<4" },
@@ -4727,7 +4727,7 @@ requires-dist = [
{ name = "resampy", specifier = "~=0.4.3" },
{ name = "sarvamai", marker = "extra == 'sarvam'", specifier = "==0.1.21" },
{ name = "sentry-sdk", marker = "extra == 'sentry'", specifier = ">=2.28.0,<3" },
{ name = "simli-ai", marker = "extra == 'simli'", specifier = "~=0.1.25" },
{ name = "simli-ai", marker = "extra == 'simli'", specifier = "~=1.0.3" },
{ name = "soundfile", marker = "extra == 'soundfile'", specifier = "~=0.13.1" },
{ name = "soxr", specifier = "~=0.5.0" },
{ name = "speechmatics-rt", marker = "extra == 'speechmatics'", specifier = ">=0.5.0" },
@@ -6496,18 +6496,19 @@ wheels = [
[[package]]
name = "simli-ai"
version = "0.1.25"
version = "1.0.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiortc" },
{ name = "av" },
{ name = "httpx" },
{ name = "livekit" },
{ name = "numpy" },
{ name = "websockets" },
]
sdist = { url = "https://files.pythonhosted.org/packages/64/6a/b28f90baf76f6a60865985f6233ff44abc72d45b66b76658bff3961e20a7/simli_ai-0.1.25.tar.gz", hash = "sha256:7a00b3426dc26a6a421641072c3e49014b7950c621cf4544152f35c58d13fcff", size = 13182, upload-time = "2025-11-06T16:27:08.862Z" }
sdist = { url = "https://files.pythonhosted.org/packages/81/03/b0b3e12c68fd3f9c57f6afeee67841349e4866b88760f413357af3043ae4/simli_ai-1.0.3.tar.gz", hash = "sha256:e96b0621a1dbd9582b2ae3d51eefd4995983b49c1f1061eb9239707b15a1ee27", size = 13350, upload-time = "2025-11-13T12:22:32.514Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ac/57/ae1032fd88214ea4ee6d3028c817c12a999eb90a67766bbab31e9819385a/simli_ai-0.1.25-py3-none-any.whl", hash = "sha256:7d01f65321dc9052f25e15d0463af6a20a86c6d37d9a7b3a2c4b01cbec0a54ed", size = 13651, upload-time = "2025-11-06T16:27:07.765Z" },
{ url = "https://files.pythonhosted.org/packages/5e/d1/dc382ba529de0d2d51f35e9bfd20b41d8f5c96404a3aa24bae97a5a5e51f/simli_ai-1.0.3-py3-none-any.whl", hash = "sha256:ffafa7540aa28833e207be8f3b199367c7f500dac1a8ba0108395bfb7d8362bc", size = 13863, upload-time = "2025-11-13T12:22:31.218Z" },
]
[[package]]